|
|
|
@@ -1,14 +1,13 @@
|
|
|
|
|
import heapq
|
|
|
|
|
import threading
|
|
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
from blinker import signal
|
|
|
|
|
from loguru import logger
|
|
|
|
|
from typing import Dict, List, Any, Optional
|
|
|
|
|
import heapq
|
|
|
|
|
import queue
|
|
|
|
|
import threading
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import janus
|
|
|
|
|
except ImportError:
|
|
|
|
|
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
|
|
|
|
|
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
|
|
|
|
@@ -52,7 +51,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
|
|
|
|
|
logger.debug("RecheckPriorityQueue initialized successfully")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# SYNC INTERFACE (for ticker thread)
|
|
|
|
@@ -73,7 +72,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
|
|
|
|
|
# Remove from priority storage if janus put failed
|
|
|
|
|
try:
|
|
|
|
|
with self._lock:
|
|
|
|
@@ -81,7 +80,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
self._priority_items.remove(item)
|
|
|
|
|
heapq.heapify(self._priority_items)
|
|
|
|
|
except Exception as cleanup_e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
|
|
|
@@ -93,7 +92,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
# Get highest priority item
|
|
|
|
|
with self._lock:
|
|
|
|
|
if not self._priority_items:
|
|
|
|
|
logger.critical(f"CRITICAL: Queue notification received but no priority items available")
|
|
|
|
|
logger.critical("CRITICAL: Queue notification received but no priority items available")
|
|
|
|
|
raise Exception("Priority queue inconsistency")
|
|
|
|
|
item = heapq.heappop(self._priority_items)
|
|
|
|
|
|
|
|
|
@@ -104,7 +103,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# ASYNC INTERFACE (for workers)
|
|
|
|
@@ -125,7 +124,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
return True
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
|
|
|
|
|
# Remove from priority storage if janus put failed
|
|
|
|
|
try:
|
|
|
|
|
with self._lock:
|
|
|
|
@@ -133,7 +132,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
self._priority_items.remove(item)
|
|
|
|
|
heapq.heapify(self._priority_items)
|
|
|
|
|
except Exception as cleanup_e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
async def async_get(self):
|
|
|
|
@@ -145,7 +144,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
# Get highest priority item
|
|
|
|
|
with self._lock:
|
|
|
|
|
if not self._priority_items:
|
|
|
|
|
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
|
|
|
|
|
logger.critical("CRITICAL: Async queue notification received but no priority items available")
|
|
|
|
|
raise Exception("Priority queue inconsistency")
|
|
|
|
|
item = heapq.heappop(self._priority_items)
|
|
|
|
|
|
|
|
|
@@ -156,7 +155,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
return item
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
# UTILITY METHODS
|
|
|
|
@@ -166,7 +165,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
with self._lock:
|
|
|
|
|
return len(self._priority_items)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def empty(self) -> bool:
|
|
|
|
@@ -179,7 +178,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
self._janus_queue.close()
|
|
|
|
|
logger.debug("RecheckPriorityQueue closed successfully")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
|
|
|
|
|
|
|
|
|
|
# COMPATIBILITY METHODS (from original implementation)
|
|
|
|
|
@property
|
|
|
|
@@ -189,7 +188,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
with self._lock:
|
|
|
|
|
return list(self._priority_items)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
|
|
|
|
|
return []
|
|
|
|
|
|
|
|
|
|
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
|
|
|
@@ -219,7 +218,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
|
|
|
|
|
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
|
|
|
|
|
|
|
|
|
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
|
|
|
@@ -254,7 +253,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
|
|
|
|
|
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
|
|
|
|
|
|
|
|
|
def get_queue_summary(self) -> Dict[str, Any]:
|
|
|
|
@@ -295,7 +294,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
|
|
|
|
|
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
|
|
|
|
'clone_items': 0, 'scheduled_items': 0}
|
|
|
|
|
|
|
|
|
@@ -323,7 +322,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
|
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
|
|
|
|
|
|
|
|
|
|
def _emit_get_signals(self):
|
|
|
|
|
"""Emit signals when item is removed"""
|
|
|
|
@@ -331,7 +330,7 @@ class RecheckPriorityQueue:
|
|
|
|
|
if self.queue_length_signal:
|
|
|
|
|
self.queue_length_signal.send(length=self.qsize())
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class NotificationQueue:
|
|
|
|
@@ -357,7 +356,7 @@ class NotificationQueue:
|
|
|
|
|
self.notification_event_signal = signal('notification_event')
|
|
|
|
|
logger.debug("NotificationQueue initialized successfully")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
|
|
|
@@ -368,7 +367,7 @@ class NotificationQueue:
|
|
|
|
|
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
|
|
|
|
return True
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
async def async_put(self, item: Dict[str, Any]):
|
|
|
|
@@ -379,35 +378,31 @@ class NotificationQueue:
|
|
|
|
|
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
|
|
|
|
return True
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
|
|
|
|
"""Thread-safe sync get"""
|
|
|
|
|
try:
|
|
|
|
|
return self.sync_q.get(block=block, timeout=timeout)
|
|
|
|
|
except queue.Empty as e:
|
|
|
|
|
raise e
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
|
|
|
|
|
raise e
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get notification: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
async def async_get(self):
|
|
|
|
|
"""Pure async get"""
|
|
|
|
|
try:
|
|
|
|
|
return await self.async_q.get()
|
|
|
|
|
except queue.Empty as e:
|
|
|
|
|
raise e
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
|
|
|
|
|
raise e
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
|
|
|
|
|
raise
|
|
|
|
|
|
|
|
|
|
def qsize(self) -> int:
|
|
|
|
|
"""Get current queue size"""
|
|
|
|
|
try:
|
|
|
|
|
return self.sync_q.qsize()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
|
|
|
|
|
return 0
|
|
|
|
|
|
|
|
|
|
def empty(self) -> bool:
|
|
|
|
@@ -420,7 +415,7 @@ class NotificationQueue:
|
|
|
|
|
self._janus_queue.close()
|
|
|
|
|
logger.debug("NotificationQueue closed successfully")
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
|
|
|
|
|
|
|
|
|
|
def _emit_notification_signal(self, item: Dict[str, Any]):
|
|
|
|
|
"""Emit notification signal"""
|
|
|
|
@@ -432,4 +427,4 @@ class NotificationQueue:
|
|
|
|
|
else:
|
|
|
|
|
self.notification_event_signal.send()
|
|
|
|
|
except Exception as e:
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")
|
|
|
|
|
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")
|