mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-10-30 14:17:40 +00:00
Refactoring queue handling (#3363)
This commit is contained in:
@@ -188,15 +188,54 @@ def is_watch_running(watch_uuid):
|
||||
|
||||
|
||||
def queue_item_async_safe(update_q, item):
|
||||
"""Queue an item for async queue processing"""
|
||||
if async_loop and not async_loop.is_closed():
|
||||
"""Bulletproof queue operation with comprehensive error handling"""
|
||||
item_uuid = 'unknown'
|
||||
|
||||
try:
|
||||
# Safely extract UUID for logging
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict):
|
||||
item_uuid = item.item.get('uuid', 'unknown')
|
||||
except Exception as uuid_e:
|
||||
logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
|
||||
|
||||
# Validate inputs
|
||||
if not update_q:
|
||||
logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
|
||||
return False
|
||||
|
||||
if not item:
|
||||
logger.critical(f"CRITICAL: Item is None/invalid")
|
||||
return False
|
||||
|
||||
# Attempt queue operation with multiple fallbacks
|
||||
try:
|
||||
# Primary: Use sync interface (thread-safe)
|
||||
success = update_q.put(item, block=True, timeout=5.0)
|
||||
if success is False: # Explicit False return means failure
|
||||
logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
|
||||
return False
|
||||
|
||||
logger.debug(f"Successfully queued item: {item_uuid}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
|
||||
|
||||
# Secondary: Attempt queue health check
|
||||
try:
|
||||
# For async queue, schedule the put operation
|
||||
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
|
||||
except RuntimeError as e:
|
||||
logger.error(f"Failed to queue item: {e}")
|
||||
else:
|
||||
logger.error("Async loop not available or closed for queueing item")
|
||||
queue_size = update_q.qsize()
|
||||
is_empty = update_q.empty()
|
||||
logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
|
||||
except Exception as health_e:
|
||||
logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
|
||||
|
||||
# Log queue type for debugging
|
||||
try:
|
||||
logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
|
||||
except Exception:
|
||||
logger.critical(f"CRITICAL: Cannot determine queue type")
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def shutdown_workers():
|
||||
|
||||
Reference in New Issue
Block a user