mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-13 02:30:40 +00:00
Compare commits
2 Commits
2600-mode-
...
redirect
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a3dba3d5c | ||
|
|
2e447b47f1 |
@@ -65,22 +65,20 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
|
||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
|
||||
# Sleep to avoid tight loop and give the other worker time to finish
|
||||
await asyncio.sleep(10.0)
|
||||
|
||||
# Re-queue with lower priority so it gets checked again after current processing finishes
|
||||
if worker_handler.is_watch_running(uuid):
|
||||
logger.trace(f"Worker {worker_id} skipping UUID {uuid} - already being processed, re-queuing for later")
|
||||
# Re-queue with MUCH lower priority (higher number = processed later)
|
||||
# This prevents tight loop where high-priority item keeps getting picked immediately
|
||||
deferred_priority = max(1000, queued_item_data.priority * 10)
|
||||
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
|
||||
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
|
||||
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
|
||||
await asyncio.sleep(0.1) # Brief pause to avoid tight loop
|
||||
continue
|
||||
|
||||
fetch_start_time = round(time.time())
|
||||
|
||||
# Mark this UUID as being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
|
||||
# Mark this UUID as being processed
|
||||
worker_handler.set_uuid_processing(uuid, processing=True)
|
||||
|
||||
try:
|
||||
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
||||
@@ -423,8 +421,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
# Always cleanup - this runs whether there was an exception or not
|
||||
if uuid:
|
||||
try:
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
# Mark UUID as no longer being processed
|
||||
worker_handler.set_uuid_processing(uuid, processing=False)
|
||||
|
||||
# Send completion signal
|
||||
if watch:
|
||||
|
||||
@@ -204,7 +204,7 @@ class fetcher(Fetcher):
|
||||
import re
|
||||
self.delete_browser_steps_screenshots()
|
||||
|
||||
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 12)) + self.render_extract_delay
|
||||
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
|
||||
extra_wait = min(n, 15)
|
||||
|
||||
logger.debug(f"Extra wait set to {extra_wait}s, requested was {n}s.")
|
||||
@@ -288,27 +288,28 @@ class fetcher(Fetcher):
|
||||
# Enable Network domain to detect when first bytes arrive
|
||||
await self.page._client.send('Network.enable')
|
||||
|
||||
# Now set up the frame navigation handlers
|
||||
async def handle_frame_navigation(event=None):
|
||||
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
|
||||
logger.debug(f"Frame navigated: {event}")
|
||||
w = extra_wait - 2 if extra_wait > 4 else 2
|
||||
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
|
||||
await asyncio.sleep(w)
|
||||
logger.debug("Issuing stopLoading command...")
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
logger.debug("stopLoading command sent!")
|
||||
|
||||
async def setup_frame_handlers_on_first_response(event):
|
||||
# Only trigger for the main document response
|
||||
if event.get('type') == 'Document':
|
||||
logger.debug("First response received, setting up frame handlers for forced page stop load.")
|
||||
|
||||
# De-register this listener - we only need it once
|
||||
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
|
||||
# Now set up the frame navigation handlers
|
||||
async def handle_frame_navigation(event):
|
||||
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
|
||||
logger.debug(f"Frame navigated: {event}")
|
||||
w = extra_wait - 2 if extra_wait > 4 else 2
|
||||
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
|
||||
await asyncio.sleep(w)
|
||||
logger.debug("Issuing stopLoading command...")
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
logger.debug("stopLoading command sent!")
|
||||
|
||||
self.page._client.on('Page.frameStartedNavigating', lambda e: asyncio.create_task(handle_frame_navigation(e)))
|
||||
self.page._client.on('Page.frameStartedLoading', lambda e: asyncio.create_task(handle_frame_navigation(e)))
|
||||
self.page._client.on('Page.frameStoppedLoading', lambda e: logger.debug(f"Frame stopped loading: {e}"))
|
||||
logger.debug("First response received, setting up frame handlers for forced page stop load DONE SETUP")
|
||||
# De-register this listener - we only need it once
|
||||
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
|
||||
# Listen for first response to trigger frame handler setup
|
||||
self.page._client.on('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
@@ -317,11 +318,8 @@ class fetcher(Fetcher):
|
||||
attempt=0
|
||||
while not response:
|
||||
logger.debug(f"Attempting page fetch {url} attempt {attempt}")
|
||||
asyncio.create_task(handle_frame_navigation())
|
||||
response = await self.page.goto(url, timeout=0)
|
||||
await asyncio.sleep(1 + extra_wait)
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
|
||||
if response:
|
||||
break
|
||||
if not response:
|
||||
|
||||
@@ -240,6 +240,7 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
|
||||
|
||||
|
||||
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -298,26 +299,7 @@ def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, d
|
||||
assert b'has-unread-changes' not in res.data
|
||||
|
||||
|
||||
# Re #2600 - Switch the mode to normal type and back, and see if the values stick..
|
||||
###################################################################################
|
||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={"restock_settings-follow_price_changes": "y",
|
||||
"restock_settings-price_change_threshold_percent": 5.05,
|
||||
"processor": "text_json_diff",
|
||||
"url": test_url,
|
||||
'fetch_backend': "html_requests",
|
||||
"time_between_check_use_default": "y"
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch." in res.data
|
||||
# And back again
|
||||
live_server.app.config['DATASTORE'].data['watching'][uuid]['processor'] = 'restock_diff'
|
||||
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||
assert b'type="text" value="5.05"' in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -461,4 +443,3 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'ception' not in res.data
|
||||
assert b'155.55' in res.data
|
||||
|
||||
|
||||
@@ -16,8 +16,8 @@ running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
# Track currently processing UUIDs for async workers
|
||||
currently_processing_uuids = set()
|
||||
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
@@ -168,31 +168,23 @@ def get_worker_count():
|
||||
|
||||
def get_running_uuids():
|
||||
"""Get list of UUIDs currently being processed by async workers"""
|
||||
return list(currently_processing_uuids.keys())
|
||||
return list(currently_processing_uuids)
|
||||
|
||||
|
||||
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
||||
"""Mark a UUID as being processed or completed by a specific worker"""
|
||||
def set_uuid_processing(uuid, processing=True):
|
||||
"""Mark a UUID as being processed or completed"""
|
||||
global currently_processing_uuids
|
||||
if processing:
|
||||
currently_processing_uuids[uuid] = worker_id
|
||||
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||
currently_processing_uuids.add(uuid)
|
||||
logger.debug(f"Started processing UUID: {uuid}")
|
||||
else:
|
||||
currently_processing_uuids.pop(uuid, None)
|
||||
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
|
||||
currently_processing_uuids.discard(uuid)
|
||||
logger.debug(f"Finished processing UUID: {uuid}")
|
||||
|
||||
|
||||
def is_watch_running(watch_uuid):
|
||||
"""Check if a specific watch is currently being processed by any worker"""
|
||||
return watch_uuid in currently_processing_uuids
|
||||
|
||||
|
||||
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
||||
"""Check if a specific watch is currently being processed by a different worker"""
|
||||
if watch_uuid not in currently_processing_uuids:
|
||||
return False
|
||||
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||
return processing_worker_id != current_worker_id
|
||||
"""Check if a specific watch is currently being processed"""
|
||||
return watch_uuid in get_running_uuids()
|
||||
|
||||
|
||||
def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
Reference in New Issue
Block a user