mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-31 20:30:57 +00:00
Compare commits
5 Commits
3376-clean
...
janus-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
213a18c061 | ||
|
|
1633b94511 | ||
|
|
928b97e6e5 | ||
|
|
ea09168650 | ||
|
|
f0588a9dd1 |
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Checkout repository
|
||||
uses: actions/checkout@v5
|
||||
uses: actions/checkout@v4
|
||||
|
||||
# Initializes the CodeQL tools for scanning.
|
||||
- name: Initialize CodeQL
|
||||
|
||||
2
.github/workflows/containers.yml
vendored
2
.github/workflows/containers.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
||||
# Or if we are in a tagged release scenario.
|
||||
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
|
||||
2
.github/workflows/pypi-release.yml
vendored
2
.github/workflows/pypi-release.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
|
||||
2
.github/workflows/test-container-build.yml
vendored
2
.github/workflows/test-container-build.yml
vendored
@@ -46,7 +46,7 @@ jobs:
|
||||
- platform: linux/arm64
|
||||
dockerfile: ./.github/test/Dockerfile-alpine
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v4
|
||||
- name: Set up Python 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
|
||||
2
.github/workflows/test-only.yml
vendored
2
.github/workflows/test-only.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
||||
lint-code:
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v4
|
||||
- name: Lint with Ruff
|
||||
run: |
|
||||
pip install ruff
|
||||
|
||||
@@ -20,7 +20,7 @@ jobs:
|
||||
env:
|
||||
PYTHON_VERSION: ${{ inputs.python-version }}
|
||||
steps:
|
||||
- uses: actions/checkout@v5
|
||||
- uses: actions/checkout@v4
|
||||
|
||||
# Mainly just for link/flake8
|
||||
- name: Set up Python ${{ env.PYTHON_VERSION }}
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.9'
|
||||
__version__ = '0.50.8'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -1,14 +1,13 @@
|
||||
import heapq
|
||||
import threading
|
||||
from typing import Dict, List, Any, Optional
|
||||
from blinker import signal
|
||||
from loguru import logger
|
||||
from typing import Dict, List, Any, Optional
|
||||
import heapq
|
||||
import queue
|
||||
import threading
|
||||
|
||||
try:
|
||||
import janus
|
||||
except ImportError:
|
||||
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
|
||||
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
|
||||
raise
|
||||
|
||||
|
||||
@@ -52,7 +51,7 @@ class RecheckPriorityQueue:
|
||||
|
||||
logger.debug("RecheckPriorityQueue initialized successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
|
||||
raise
|
||||
|
||||
# SYNC INTERFACE (for ticker thread)
|
||||
@@ -73,7 +72,7 @@ class RecheckPriorityQueue:
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
|
||||
# Remove from priority storage if janus put failed
|
||||
try:
|
||||
with self._lock:
|
||||
@@ -81,7 +80,7 @@ class RecheckPriorityQueue:
|
||||
self._priority_items.remove(item)
|
||||
heapq.heapify(self._priority_items)
|
||||
except Exception as cleanup_e:
|
||||
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
|
||||
return False
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
@@ -93,7 +92,7 @@ class RecheckPriorityQueue:
|
||||
# Get highest priority item
|
||||
with self._lock:
|
||||
if not self._priority_items:
|
||||
logger.critical(f"CRITICAL: Queue notification received but no priority items available")
|
||||
logger.critical("CRITICAL: Queue notification received but no priority items available")
|
||||
raise Exception("Priority queue inconsistency")
|
||||
item = heapq.heappop(self._priority_items)
|
||||
|
||||
@@ -104,7 +103,7 @@ class RecheckPriorityQueue:
|
||||
return item
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
|
||||
raise
|
||||
|
||||
# ASYNC INTERFACE (for workers)
|
||||
@@ -125,7 +124,7 @@ class RecheckPriorityQueue:
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
|
||||
# Remove from priority storage if janus put failed
|
||||
try:
|
||||
with self._lock:
|
||||
@@ -133,7 +132,7 @@ class RecheckPriorityQueue:
|
||||
self._priority_items.remove(item)
|
||||
heapq.heapify(self._priority_items)
|
||||
except Exception as cleanup_e:
|
||||
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
|
||||
return False
|
||||
|
||||
async def async_get(self):
|
||||
@@ -145,7 +144,7 @@ class RecheckPriorityQueue:
|
||||
# Get highest priority item
|
||||
with self._lock:
|
||||
if not self._priority_items:
|
||||
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
|
||||
logger.critical("CRITICAL: Async queue notification received but no priority items available")
|
||||
raise Exception("Priority queue inconsistency")
|
||||
item = heapq.heappop(self._priority_items)
|
||||
|
||||
@@ -156,7 +155,7 @@ class RecheckPriorityQueue:
|
||||
return item
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
|
||||
raise
|
||||
|
||||
# UTILITY METHODS
|
||||
@@ -166,7 +165,7 @@ class RecheckPriorityQueue:
|
||||
with self._lock:
|
||||
return len(self._priority_items)
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
|
||||
return 0
|
||||
|
||||
def empty(self) -> bool:
|
||||
@@ -179,7 +178,7 @@ class RecheckPriorityQueue:
|
||||
self._janus_queue.close()
|
||||
logger.debug("RecheckPriorityQueue closed successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
|
||||
|
||||
# COMPATIBILITY METHODS (from original implementation)
|
||||
@property
|
||||
@@ -189,7 +188,7 @@ class RecheckPriorityQueue:
|
||||
with self._lock:
|
||||
return list(self._priority_items)
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
|
||||
return []
|
||||
|
||||
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
||||
@@ -219,7 +218,7 @@ class RecheckPriorityQueue:
|
||||
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
|
||||
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
||||
|
||||
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
||||
@@ -254,7 +253,7 @@ class RecheckPriorityQueue:
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
|
||||
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
||||
|
||||
def get_queue_summary(self) -> Dict[str, Any]:
|
||||
@@ -295,7 +294,7 @@ class RecheckPriorityQueue:
|
||||
}
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
|
||||
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
||||
'clone_items': 0, 'scheduled_items': 0}
|
||||
|
||||
@@ -323,7 +322,7 @@ class RecheckPriorityQueue:
|
||||
self.queue_length_signal.send(length=self.qsize())
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
|
||||
|
||||
def _emit_get_signals(self):
|
||||
"""Emit signals when item is removed"""
|
||||
@@ -331,7 +330,7 @@ class RecheckPriorityQueue:
|
||||
if self.queue_length_signal:
|
||||
self.queue_length_signal.send(length=self.qsize())
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
|
||||
|
||||
|
||||
class NotificationQueue:
|
||||
@@ -357,7 +356,7 @@ class NotificationQueue:
|
||||
self.notification_event_signal = signal('notification_event')
|
||||
logger.debug("NotificationQueue initialized successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
|
||||
raise
|
||||
|
||||
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
||||
@@ -368,7 +367,7 @@ class NotificationQueue:
|
||||
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
|
||||
return False
|
||||
|
||||
async def async_put(self, item: Dict[str, Any]):
|
||||
@@ -379,35 +378,31 @@ class NotificationQueue:
|
||||
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
|
||||
return False
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get"""
|
||||
try:
|
||||
return self.sync_q.get(block=block, timeout=timeout)
|
||||
except queue.Empty as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
|
||||
raise e
|
||||
logger.critical(f"CRITICAL: Failed to get notification: {e}")
|
||||
raise
|
||||
|
||||
async def async_get(self):
|
||||
"""Pure async get"""
|
||||
try:
|
||||
return await self.async_q.get()
|
||||
except queue.Empty as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
|
||||
raise e
|
||||
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
|
||||
raise
|
||||
|
||||
def qsize(self) -> int:
|
||||
"""Get current queue size"""
|
||||
try:
|
||||
return self.sync_q.qsize()
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
|
||||
return 0
|
||||
|
||||
def empty(self) -> bool:
|
||||
@@ -420,7 +415,7 @@ class NotificationQueue:
|
||||
self._janus_queue.close()
|
||||
logger.debug("NotificationQueue closed successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
|
||||
|
||||
def _emit_notification_signal(self, item: Dict[str, Any]):
|
||||
"""Emit notification signal"""
|
||||
@@ -432,4 +427,4 @@ class NotificationQueue:
|
||||
else:
|
||||
self.notification_event_signal.send()
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")
|
||||
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")
|
||||
@@ -46,7 +46,7 @@ apprise==1.9.3
|
||||
paho-mqtt!=2.0.*
|
||||
|
||||
# Requires extra wheel for rPi
|
||||
#cryptography~=42.0.8
|
||||
cryptography~=42.0.8
|
||||
|
||||
# Used for CSS filtering
|
||||
beautifulsoup4>=4.0.0
|
||||
|
||||
Reference in New Issue
Block a user