mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-12 02:00:20 +00:00
Compare commits
1 Commits
layout-css
...
resilient-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe45bfc27a |
@@ -65,20 +65,22 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||
if worker_handler.is_watch_running(uuid):
|
||||
logger.trace(f"Worker {worker_id} skipping UUID {uuid} - already being processed, re-queuing for later")
|
||||
# Re-queue with MUCH lower priority (higher number = processed later)
|
||||
# This prevents tight loop where high-priority item keeps getting picked immediately
|
||||
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
|
||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
|
||||
# Sleep to avoid tight loop and give the other worker time to finish
|
||||
await asyncio.sleep(10.0)
|
||||
|
||||
# Re-queue with lower priority so it gets checked again after current processing finishes
|
||||
deferred_priority = max(1000, queued_item_data.priority * 10)
|
||||
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
|
||||
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
|
||||
await asyncio.sleep(0.1) # Brief pause to avoid tight loop
|
||||
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
|
||||
continue
|
||||
|
||||
fetch_start_time = round(time.time())
|
||||
|
||||
# Mark this UUID as being processed
|
||||
worker_handler.set_uuid_processing(uuid, processing=True)
|
||||
# Mark this UUID as being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
|
||||
|
||||
try:
|
||||
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
||||
@@ -421,8 +423,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
# Always cleanup - this runs whether there was an exception or not
|
||||
if uuid:
|
||||
try:
|
||||
# Mark UUID as no longer being processed
|
||||
worker_handler.set_uuid_processing(uuid, processing=False)
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
|
||||
# Send completion signal
|
||||
if watch:
|
||||
|
||||
@@ -16,8 +16,8 @@ running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
|
||||
# Track currently processing UUIDs for async workers
|
||||
currently_processing_uuids = set()
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
@@ -168,23 +168,31 @@ def get_worker_count():
|
||||
|
||||
def get_running_uuids():
|
||||
"""Get list of UUIDs currently being processed by async workers"""
|
||||
return list(currently_processing_uuids)
|
||||
return list(currently_processing_uuids.keys())
|
||||
|
||||
|
||||
def set_uuid_processing(uuid, processing=True):
|
||||
"""Mark a UUID as being processed or completed"""
|
||||
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
||||
"""Mark a UUID as being processed or completed by a specific worker"""
|
||||
global currently_processing_uuids
|
||||
if processing:
|
||||
currently_processing_uuids.add(uuid)
|
||||
logger.debug(f"Started processing UUID: {uuid}")
|
||||
currently_processing_uuids[uuid] = worker_id
|
||||
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||
else:
|
||||
currently_processing_uuids.discard(uuid)
|
||||
logger.debug(f"Finished processing UUID: {uuid}")
|
||||
currently_processing_uuids.pop(uuid, None)
|
||||
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
|
||||
|
||||
|
||||
def is_watch_running(watch_uuid):
|
||||
"""Check if a specific watch is currently being processed"""
|
||||
return watch_uuid in get_running_uuids()
|
||||
"""Check if a specific watch is currently being processed by any worker"""
|
||||
return watch_uuid in currently_processing_uuids
|
||||
|
||||
|
||||
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
||||
"""Check if a specific watch is currently being processed by a different worker"""
|
||||
if watch_uuid not in currently_processing_uuids:
|
||||
return False
|
||||
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||
return processing_worker_id != current_worker_id
|
||||
|
||||
|
||||
def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
Reference in New Issue
Block a user