Compare commits

...

1 Commits

Author SHA1 Message Date
dgtlmoon
2366e5d1b9 Cleanup empty queue messages Re #3376 2025-08-19 15:30:58 +02:00

View File

@@ -1,13 +1,14 @@
import heapq
import threading
from typing import Dict, List, Any, Optional
from blinker import signal from blinker import signal
from loguru import logger from loguru import logger
from typing import Dict, List, Any, Optional
import heapq
import queue
import threading
try: try:
import janus import janus
except ImportError: except ImportError:
logger.critical("CRITICAL: janus library is required. Install with: pip install janus") logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
raise raise
@@ -51,7 +52,7 @@ class RecheckPriorityQueue:
logger.debug("RecheckPriorityQueue initialized successfully") logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}") logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
raise raise
# SYNC INTERFACE (for ticker thread) # SYNC INTERFACE (for ticker thread)
@@ -72,7 +73,7 @@ class RecheckPriorityQueue:
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}") logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed # Remove from priority storage if janus put failed
try: try:
with self._lock: with self._lock:
@@ -80,7 +81,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item) self._priority_items.remove(item)
heapq.heapify(self._priority_items) heapq.heapify(self._priority_items)
except Exception as cleanup_e: except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}") logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
@@ -92,7 +93,7 @@ class RecheckPriorityQueue:
# Get highest priority item # Get highest priority item
with self._lock: with self._lock:
if not self._priority_items: if not self._priority_items:
logger.critical("CRITICAL: Queue notification received but no priority items available") logger.critical(f"CRITICAL: Queue notification received but no priority items available")
raise Exception("Priority queue inconsistency") raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items) item = heapq.heappop(self._priority_items)
@@ -103,7 +104,7 @@ class RecheckPriorityQueue:
return item return item
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get item from queue: {e}") logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
raise raise
# ASYNC INTERFACE (for workers) # ASYNC INTERFACE (for workers)
@@ -124,7 +125,7 @@ class RecheckPriorityQueue:
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}") logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed # Remove from priority storage if janus put failed
try: try:
with self._lock: with self._lock:
@@ -132,7 +133,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item) self._priority_items.remove(item)
heapq.heapify(self._priority_items) heapq.heapify(self._priority_items)
except Exception as cleanup_e: except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}") logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
return False return False
async def async_get(self): async def async_get(self):
@@ -144,7 +145,7 @@ class RecheckPriorityQueue:
# Get highest priority item # Get highest priority item
with self._lock: with self._lock:
if not self._priority_items: if not self._priority_items:
logger.critical("CRITICAL: Async queue notification received but no priority items available") logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency") raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items) item = heapq.heappop(self._priority_items)
@@ -155,7 +156,7 @@ class RecheckPriorityQueue:
return item return item
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}") logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
raise raise
# UTILITY METHODS # UTILITY METHODS
@@ -165,7 +166,7 @@ class RecheckPriorityQueue:
with self._lock: with self._lock:
return len(self._priority_items) return len(self._priority_items)
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue size: {e}") logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
return 0 return 0
def empty(self) -> bool: def empty(self) -> bool:
@@ -178,7 +179,7 @@ class RecheckPriorityQueue:
self._janus_queue.close() self._janus_queue.close()
logger.debug("RecheckPriorityQueue closed successfully") logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}") logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
# COMPATIBILITY METHODS (from original implementation) # COMPATIBILITY METHODS (from original implementation)
@property @property
@@ -188,7 +189,7 @@ class RecheckPriorityQueue:
with self._lock: with self._lock:
return list(self._priority_items) return list(self._priority_items)
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue list: {e}") logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
return [] return []
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]: def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
@@ -218,7 +219,7 @@ class RecheckPriorityQueue:
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False} return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}") logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
return {'position': None, 'total_items': 0, 'priority': None, 'found': False} return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
@@ -253,7 +254,7 @@ class RecheckPriorityQueue:
} }
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}") logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
def get_queue_summary(self) -> Dict[str, Any]: def get_queue_summary(self) -> Dict[str, Any]:
@@ -294,7 +295,7 @@ class RecheckPriorityQueue:
} }
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue summary: {e}") logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0, return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
'clone_items': 0, 'scheduled_items': 0} 'clone_items': 0, 'scheduled_items': 0}
@@ -322,7 +323,7 @@ class RecheckPriorityQueue:
self.queue_length_signal.send(length=self.qsize()) self.queue_length_signal.send(length=self.qsize())
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {e}") logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
def _emit_get_signals(self): def _emit_get_signals(self):
"""Emit signals when item is removed""" """Emit signals when item is removed"""
@@ -330,7 +331,7 @@ class RecheckPriorityQueue:
if self.queue_length_signal: if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize()) self.queue_length_signal.send(length=self.qsize())
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit get signals: {e}") logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
class NotificationQueue: class NotificationQueue:
@@ -356,7 +357,7 @@ class NotificationQueue:
self.notification_event_signal = signal('notification_event') self.notification_event_signal = signal('notification_event')
logger.debug("NotificationQueue initialized successfully") logger.debug("NotificationQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}") logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
raise raise
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None): def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
@@ -367,7 +368,7 @@ class NotificationQueue:
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}") logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}") logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False return False
async def async_put(self, item: Dict[str, Any]): async def async_put(self, item: Dict[str, Any]):
@@ -378,31 +379,35 @@ class NotificationQueue:
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}") logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}") logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get""" """Thread-safe sync get"""
try: try:
return self.sync_q.get(block=block, timeout=timeout) return self.sync_q.get(block=block, timeout=timeout)
except queue.Empty as e:
raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {e}") logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
raise raise e
async def async_get(self): async def async_get(self):
"""Pure async get""" """Pure async get"""
try: try:
return await self.async_q.get() return await self.async_q.get()
except queue.Empty as e:
raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {e}") logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
raise raise e
def qsize(self) -> int: def qsize(self) -> int:
"""Get current queue size""" """Get current queue size"""
try: try:
return self.sync_q.qsize() return self.sync_q.qsize()
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}") logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
return 0 return 0
def empty(self) -> bool: def empty(self) -> bool:
@@ -415,7 +420,7 @@ class NotificationQueue:
self._janus_queue.close() self._janus_queue.close()
logger.debug("NotificationQueue closed successfully") logger.debug("NotificationQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}") logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
def _emit_notification_signal(self, item: Dict[str, Any]): def _emit_notification_signal(self, item: Dict[str, Any]):
"""Emit notification signal""" """Emit notification signal"""
@@ -427,4 +432,4 @@ class NotificationQueue:
else: else:
self.notification_event_signal.send() self.notification_event_signal.send()
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}") logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")