Compare commits

...

1 Commits

Author SHA1 Message Date
dgtlmoon
fe45bfc27a more resilient same UUID being processed 2026-01-02 17:25:58 +01:00
2 changed files with 30 additions and 20 deletions

View File

@@ -65,20 +65,22 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem
if worker_handler.is_watch_running(uuid):
logger.trace(f"Worker {worker_id} skipping UUID {uuid} - already being processed, re-queuing for later")
# Re-queue with MUCH lower priority (higher number = processed later)
# This prevents tight loop where high-priority item keeps getting picked immediately
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
# Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(10.0)
# Re-queue with lower priority so it gets checked again after current processing finishes
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
await asyncio.sleep(0.1) # Brief pause to avoid tight loop
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
continue
fetch_start_time = round(time.time())
# Mark this UUID as being processed
worker_handler.set_uuid_processing(uuid, processing=True)
# Mark this UUID as being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
try:
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
@@ -421,8 +423,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
# Always cleanup - this runs whether there was an exception or not
if uuid:
try:
# Mark UUID as no longer being processed
worker_handler.set_uuid_processing(uuid, processing=False)
# Mark UUID as no longer being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
# Send completion signal
if watch:

View File

@@ -16,8 +16,8 @@ running_async_tasks = []
async_loop = None
async_loop_thread = None
# Track currently processing UUIDs for async workers
currently_processing_uuids = set()
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
currently_processing_uuids = {}
# Configuration - async workers only
USE_ASYNC_WORKERS = True
@@ -168,23 +168,31 @@ def get_worker_count():
def get_running_uuids():
"""Get list of UUIDs currently being processed by async workers"""
return list(currently_processing_uuids)
return list(currently_processing_uuids.keys())
def set_uuid_processing(uuid, processing=True):
"""Mark a UUID as being processed or completed"""
def set_uuid_processing(uuid, worker_id=None, processing=True):
"""Mark a UUID as being processed or completed by a specific worker"""
global currently_processing_uuids
if processing:
currently_processing_uuids.add(uuid)
logger.debug(f"Started processing UUID: {uuid}")
currently_processing_uuids[uuid] = worker_id
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
else:
currently_processing_uuids.discard(uuid)
logger.debug(f"Finished processing UUID: {uuid}")
currently_processing_uuids.pop(uuid, None)
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
def is_watch_running(watch_uuid):
"""Check if a specific watch is currently being processed"""
return watch_uuid in get_running_uuids()
"""Check if a specific watch is currently being processed by any worker"""
return watch_uuid in currently_processing_uuids
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
"""Check if a specific watch is currently being processed by a different worker"""
if watch_uuid not in currently_processing_uuids:
return False
processing_worker_id = currently_processing_uuids[watch_uuid]
return processing_worker_id != current_worker_id
def queue_item_async_safe(update_q, item, silent=False):