Compare commits

..

2 Commits

Author SHA1 Message Date
dgtlmoon
9a3dba3d5c Hardening 2026-01-02 17:13:21 +01:00
dgtlmoon
2e447b47f1 UI - Handling redirects on login to the correct page 2026-01-02 16:58:42 +01:00
4 changed files with 37 additions and 68 deletions

View File

@@ -65,22 +65,20 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
# Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(10.0)
# Re-queue with lower priority so it gets checked again after current processing finishes
if worker_handler.is_watch_running(uuid):
logger.trace(f"Worker {worker_id} skipping UUID {uuid} - already being processed, re-queuing for later")
# Re-queue with MUCH lower priority (higher number = processed later)
# This prevents tight loop where high-priority item keeps getting picked immediately
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
await asyncio.sleep(0.1) # Brief pause to avoid tight loop
continue
fetch_start_time = round(time.time())
# Mark this UUID as being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
# Mark this UUID as being processed
worker_handler.set_uuid_processing(uuid, processing=True)
try:
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
@@ -423,8 +421,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# Always cleanup - this runs whether there was an exception or not
if uuid:
try:
# Mark UUID as no longer being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
# Mark UUID as no longer being processed
worker_handler.set_uuid_processing(uuid, processing=False)
# Send completion signal
if watch:

View File

@@ -204,7 +204,7 @@ class fetcher(Fetcher):
import re
self.delete_browser_steps_screenshots()
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 12)) + self.render_extract_delay
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
extra_wait = min(n, 15)
logger.debug(f"Extra wait set to {extra_wait}s, requested was {n}s.")
@@ -288,27 +288,28 @@ class fetcher(Fetcher):
# Enable Network domain to detect when first bytes arrive
await self.page._client.send('Network.enable')
# Now set up the frame navigation handlers
async def handle_frame_navigation(event=None):
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
logger.debug(f"Frame navigated: {event}")
w = extra_wait - 2 if extra_wait > 4 else 2
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
await asyncio.sleep(w)
logger.debug("Issuing stopLoading command...")
await self.page._client.send('Page.stopLoading')
logger.debug("stopLoading command sent!")
async def setup_frame_handlers_on_first_response(event):
# Only trigger for the main document response
if event.get('type') == 'Document':
logger.debug("First response received, setting up frame handlers for forced page stop load.")
# De-register this listener - we only need it once
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
# Now set up the frame navigation handlers
async def handle_frame_navigation(event):
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
logger.debug(f"Frame navigated: {event}")
w = extra_wait - 2 if extra_wait > 4 else 2
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
await asyncio.sleep(w)
logger.debug("Issuing stopLoading command...")
await self.page._client.send('Page.stopLoading')
logger.debug("stopLoading command sent!")
self.page._client.on('Page.frameStartedNavigating', lambda e: asyncio.create_task(handle_frame_navigation(e)))
self.page._client.on('Page.frameStartedLoading', lambda e: asyncio.create_task(handle_frame_navigation(e)))
self.page._client.on('Page.frameStoppedLoading', lambda e: logger.debug(f"Frame stopped loading: {e}"))
logger.debug("First response received, setting up frame handlers for forced page stop load DONE SETUP")
# De-register this listener - we only need it once
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
# Listen for first response to trigger frame handler setup
self.page._client.on('Network.responseReceived', setup_frame_handlers_on_first_response)
@@ -317,11 +318,8 @@ class fetcher(Fetcher):
attempt=0
while not response:
logger.debug(f"Attempting page fetch {url} attempt {attempt}")
asyncio.create_task(handle_frame_navigation())
response = await self.page.goto(url, timeout=0)
await asyncio.sleep(1 + extra_wait)
await self.page._client.send('Page.stopLoading')
if response:
break
if not response:

View File

@@ -240,6 +240,7 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, datastore_path):
delete_all_watches(client)
@@ -298,26 +299,7 @@ def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, d
assert b'has-unread-changes' not in res.data
# Re #2600 - Switch the mode to normal type and back, and see if the values stick..
###################################################################################
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"restock_settings-follow_price_changes": "y",
"restock_settings-price_change_threshold_percent": 5.05,
"processor": "text_json_diff",
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# And back again
live_server.app.config['DATASTORE'].data['watching'][uuid]['processor'] = 'restock_diff'
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
assert b'type="text" value="5.05"' in res.data
delete_all_watches(client)
@@ -461,4 +443,3 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
res = client.get(url_for("watchlist.index"))
assert b'ception' not in res.data
assert b'155.55' in res.data

View File

@@ -16,8 +16,8 @@ running_async_tasks = []
async_loop = None
async_loop_thread = None
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
currently_processing_uuids = {}
# Track currently processing UUIDs for async workers
currently_processing_uuids = set()
# Configuration - async workers only
USE_ASYNC_WORKERS = True
@@ -168,31 +168,23 @@ def get_worker_count():
def get_running_uuids():
"""Get list of UUIDs currently being processed by async workers"""
return list(currently_processing_uuids.keys())
return list(currently_processing_uuids)
def set_uuid_processing(uuid, worker_id=None, processing=True):
"""Mark a UUID as being processed or completed by a specific worker"""
def set_uuid_processing(uuid, processing=True):
"""Mark a UUID as being processed or completed"""
global currently_processing_uuids
if processing:
currently_processing_uuids[uuid] = worker_id
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
currently_processing_uuids.add(uuid)
logger.debug(f"Started processing UUID: {uuid}")
else:
currently_processing_uuids.pop(uuid, None)
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
currently_processing_uuids.discard(uuid)
logger.debug(f"Finished processing UUID: {uuid}")
def is_watch_running(watch_uuid):
"""Check if a specific watch is currently being processed by any worker"""
return watch_uuid in currently_processing_uuids
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
"""Check if a specific watch is currently being processed by a different worker"""
if watch_uuid not in currently_processing_uuids:
return False
processing_worker_id = currently_processing_uuids[watch_uuid]
return processing_worker_id != current_worker_id
"""Check if a specific watch is currently being processed"""
return watch_uuid in get_running_uuids()
def queue_item_async_safe(update_q, item, silent=False):