Cleanup empty queue messages Re #3376 (#3377)

This commit is contained in:
dgtlmoon
2025-08-19 16:25:32 +02:00
committed by GitHub
parent b7da6f0ca7
commit d7f00679a0

View File

@@ -1,13 +1,14 @@
import heapq
import threading
from typing import Dict, List, Any, Optional
from blinker import signal
from loguru import logger
from typing import Dict, List, Any, Optional
import heapq
import queue
import threading
try:
import janus
except ImportError:
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
raise
@@ -51,7 +52,7 @@ class RecheckPriorityQueue:
logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
raise
# SYNC INTERFACE (for ticker thread)
@@ -72,7 +73,7 @@ class RecheckPriorityQueue:
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
@@ -80,7 +81,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
@@ -92,7 +93,7 @@ class RecheckPriorityQueue:
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical("CRITICAL: Queue notification received but no priority items available")
logger.critical(f"CRITICAL: Queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
@@ -103,7 +104,7 @@ class RecheckPriorityQueue:
return item
except Exception as e:
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
raise
# ASYNC INTERFACE (for workers)
@@ -124,7 +125,7 @@ class RecheckPriorityQueue:
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
@@ -132,7 +133,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
return False
async def async_get(self):
@@ -144,7 +145,7 @@ class RecheckPriorityQueue:
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical("CRITICAL: Async queue notification received but no priority items available")
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
@@ -155,7 +156,7 @@ class RecheckPriorityQueue:
return item
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
raise
# UTILITY METHODS
@@ -165,7 +166,7 @@ class RecheckPriorityQueue:
with self._lock:
return len(self._priority_items)
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
return 0
def empty(self) -> bool:
@@ -178,7 +179,7 @@ class RecheckPriorityQueue:
self._janus_queue.close()
logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
# COMPATIBILITY METHODS (from original implementation)
@property
@@ -188,7 +189,7 @@ class RecheckPriorityQueue:
with self._lock:
return list(self._priority_items)
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
return []
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
@@ -218,7 +219,7 @@ class RecheckPriorityQueue:
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
@@ -253,7 +254,7 @@ class RecheckPriorityQueue:
}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
def get_queue_summary(self) -> Dict[str, Any]:
@@ -294,7 +295,7 @@ class RecheckPriorityQueue:
}
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
'clone_items': 0, 'scheduled_items': 0}
@@ -322,7 +323,7 @@ class RecheckPriorityQueue:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
def _emit_get_signals(self):
"""Emit signals when item is removed"""
@@ -330,7 +331,7 @@ class RecheckPriorityQueue:
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
class NotificationQueue:
@@ -356,7 +357,7 @@ class NotificationQueue:
self.notification_event_signal = signal('notification_event')
logger.debug("NotificationQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
raise
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
@@ -367,7 +368,7 @@ class NotificationQueue:
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
async def async_put(self, item: Dict[str, Any]):
@@ -378,31 +379,35 @@ class NotificationQueue:
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get"""
try:
return self.sync_q.get(block=block, timeout=timeout)
except queue.Empty as e:
raise e
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {e}")
raise
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
raise e
async def async_get(self):
"""Pure async get"""
try:
return await self.async_q.get()
except queue.Empty as e:
raise e
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
raise
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
raise e
def qsize(self) -> int:
"""Get current queue size"""
try:
return self.sync_q.qsize()
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
return 0
def empty(self) -> bool:
@@ -415,7 +420,7 @@ class NotificationQueue:
self._janus_queue.close()
logger.debug("NotificationQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
def _emit_notification_signal(self, item: Dict[str, Any]):
"""Emit notification signal"""
@@ -427,4 +432,4 @@ class NotificationQueue:
else:
self.notification_event_signal.send()
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")