mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-23 15:40:19 +00:00
Compare commits
1 Commits
api-diff-e
...
resilient-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe45bfc27a |
@@ -65,20 +65,22 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||||
if worker_handler.is_watch_running(uuid):
|
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
|
||||||
logger.trace(f"Worker {worker_id} skipping UUID {uuid} - already being processed, re-queuing for later")
|
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
|
||||||
# Re-queue with MUCH lower priority (higher number = processed later)
|
# Sleep to avoid tight loop and give the other worker time to finish
|
||||||
# This prevents tight loop where high-priority item keeps getting picked immediately
|
await asyncio.sleep(10.0)
|
||||||
|
|
||||||
|
# Re-queue with lower priority so it gets checked again after current processing finishes
|
||||||
deferred_priority = max(1000, queued_item_data.priority * 10)
|
deferred_priority = max(1000, queued_item_data.priority * 10)
|
||||||
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
|
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
|
||||||
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
|
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
|
||||||
await asyncio.sleep(0.1) # Brief pause to avoid tight loop
|
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
fetch_start_time = round(time.time())
|
fetch_start_time = round(time.time())
|
||||||
|
|
||||||
# Mark this UUID as being processed
|
# Mark this UUID as being processed by this worker
|
||||||
worker_handler.set_uuid_processing(uuid, processing=True)
|
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
||||||
@@ -421,8 +423,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
# Always cleanup - this runs whether there was an exception or not
|
# Always cleanup - this runs whether there was an exception or not
|
||||||
if uuid:
|
if uuid:
|
||||||
try:
|
try:
|
||||||
# Mark UUID as no longer being processed
|
# Mark UUID as no longer being processed by this worker
|
||||||
worker_handler.set_uuid_processing(uuid, processing=False)
|
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||||
|
|
||||||
# Send completion signal
|
# Send completion signal
|
||||||
if watch:
|
if watch:
|
||||||
|
|||||||
@@ -16,8 +16,8 @@ running_async_tasks = []
|
|||||||
async_loop = None
|
async_loop = None
|
||||||
async_loop_thread = None
|
async_loop_thread = None
|
||||||
|
|
||||||
# Track currently processing UUIDs for async workers
|
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||||
currently_processing_uuids = set()
|
currently_processing_uuids = {}
|
||||||
|
|
||||||
# Configuration - async workers only
|
# Configuration - async workers only
|
||||||
USE_ASYNC_WORKERS = True
|
USE_ASYNC_WORKERS = True
|
||||||
@@ -168,23 +168,31 @@ def get_worker_count():
|
|||||||
|
|
||||||
def get_running_uuids():
|
def get_running_uuids():
|
||||||
"""Get list of UUIDs currently being processed by async workers"""
|
"""Get list of UUIDs currently being processed by async workers"""
|
||||||
return list(currently_processing_uuids)
|
return list(currently_processing_uuids.keys())
|
||||||
|
|
||||||
|
|
||||||
def set_uuid_processing(uuid, processing=True):
|
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
||||||
"""Mark a UUID as being processed or completed"""
|
"""Mark a UUID as being processed or completed by a specific worker"""
|
||||||
global currently_processing_uuids
|
global currently_processing_uuids
|
||||||
if processing:
|
if processing:
|
||||||
currently_processing_uuids.add(uuid)
|
currently_processing_uuids[uuid] = worker_id
|
||||||
logger.debug(f"Started processing UUID: {uuid}")
|
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||||
else:
|
else:
|
||||||
currently_processing_uuids.discard(uuid)
|
currently_processing_uuids.pop(uuid, None)
|
||||||
logger.debug(f"Finished processing UUID: {uuid}")
|
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
|
||||||
|
|
||||||
|
|
||||||
def is_watch_running(watch_uuid):
|
def is_watch_running(watch_uuid):
|
||||||
"""Check if a specific watch is currently being processed"""
|
"""Check if a specific watch is currently being processed by any worker"""
|
||||||
return watch_uuid in get_running_uuids()
|
return watch_uuid in currently_processing_uuids
|
||||||
|
|
||||||
|
|
||||||
|
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
||||||
|
"""Check if a specific watch is currently being processed by a different worker"""
|
||||||
|
if watch_uuid not in currently_processing_uuids:
|
||||||
|
return False
|
||||||
|
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||||
|
return processing_worker_id != current_worker_id
|
||||||
|
|
||||||
|
|
||||||
def queue_item_async_safe(update_q, item, silent=False):
|
def queue_item_async_safe(update_q, item, silent=False):
|
||||||
|
|||||||
Reference in New Issue
Block a user