mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-03-10 22:14:38 +00:00
Compare commits
5 Commits
3376-clean
...
janus-queu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
213a18c061 | ||
|
|
1633b94511 | ||
|
|
928b97e6e5 | ||
|
|
ea09168650 | ||
|
|
f0588a9dd1 |
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
- name: Checkout repository
|
||||||
uses: actions/checkout@v5
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
# Initializes the CodeQL tools for scanning.
|
# Initializes the CodeQL tools for scanning.
|
||||||
- name: Initialize CodeQL
|
- name: Initialize CodeQL
|
||||||
|
|||||||
2
.github/workflows/containers.yml
vendored
2
.github/workflows/containers.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
|||||||
# Or if we are in a tagged release scenario.
|
# Or if we are in a tagged release scenario.
|
||||||
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
|
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python 3.11
|
- name: Set up Python 3.11
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/pypi-release.yml
vendored
2
.github/workflows/pypi-release.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/test-container-build.yml
vendored
2
.github/workflows/test-container-build.yml
vendored
@@ -46,7 +46,7 @@ jobs:
|
|||||||
- platform: linux/arm64
|
- platform: linux/arm64
|
||||||
dockerfile: ./.github/test/Dockerfile-alpine
|
dockerfile: ./.github/test/Dockerfile-alpine
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python 3.11
|
- name: Set up Python 3.11
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/test-only.yml
vendored
2
.github/workflows/test-only.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
|||||||
lint-code:
|
lint-code:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Lint with Ruff
|
- name: Lint with Ruff
|
||||||
run: |
|
run: |
|
||||||
pip install ruff
|
pip install ruff
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
PYTHON_VERSION: ${{ inputs.python-version }}
|
PYTHON_VERSION: ${{ inputs.python-version }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
# Mainly just for link/flake8
|
# Mainly just for link/flake8
|
||||||
- name: Set up Python ${{ env.PYTHON_VERSION }}
|
- name: Set up Python ${{ env.PYTHON_VERSION }}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||||
|
|
||||||
__version__ = '0.50.9'
|
__version__ = '0.50.8'
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
|
|||||||
@@ -1,14 +1,13 @@
|
|||||||
|
import heapq
|
||||||
|
import threading
|
||||||
|
from typing import Dict, List, Any, Optional
|
||||||
from blinker import signal
|
from blinker import signal
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
from typing import Dict, List, Any, Optional
|
|
||||||
import heapq
|
|
||||||
import queue
|
|
||||||
import threading
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
import janus
|
import janus
|
||||||
except ImportError:
|
except ImportError:
|
||||||
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
|
logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
|
|
||||||
@@ -52,7 +51,7 @@ class RecheckPriorityQueue:
|
|||||||
|
|
||||||
logger.debug("RecheckPriorityQueue initialized successfully")
|
logger.debug("RecheckPriorityQueue initialized successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# SYNC INTERFACE (for ticker thread)
|
# SYNC INTERFACE (for ticker thread)
|
||||||
@@ -73,7 +72,7 @@ class RecheckPriorityQueue:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
|
||||||
# Remove from priority storage if janus put failed
|
# Remove from priority storage if janus put failed
|
||||||
try:
|
try:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
@@ -81,7 +80,7 @@ class RecheckPriorityQueue:
|
|||||||
self._priority_items.remove(item)
|
self._priority_items.remove(item)
|
||||||
heapq.heapify(self._priority_items)
|
heapq.heapify(self._priority_items)
|
||||||
except Exception as cleanup_e:
|
except Exception as cleanup_e:
|
||||||
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||||
@@ -93,7 +92,7 @@ class RecheckPriorityQueue:
|
|||||||
# Get highest priority item
|
# Get highest priority item
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if not self._priority_items:
|
if not self._priority_items:
|
||||||
logger.critical(f"CRITICAL: Queue notification received but no priority items available")
|
logger.critical("CRITICAL: Queue notification received but no priority items available")
|
||||||
raise Exception("Priority queue inconsistency")
|
raise Exception("Priority queue inconsistency")
|
||||||
item = heapq.heappop(self._priority_items)
|
item = heapq.heappop(self._priority_items)
|
||||||
|
|
||||||
@@ -104,7 +103,7 @@ class RecheckPriorityQueue:
|
|||||||
return item
|
return item
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# ASYNC INTERFACE (for workers)
|
# ASYNC INTERFACE (for workers)
|
||||||
@@ -125,7 +124,7 @@ class RecheckPriorityQueue:
|
|||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
|
||||||
# Remove from priority storage if janus put failed
|
# Remove from priority storage if janus put failed
|
||||||
try:
|
try:
|
||||||
with self._lock:
|
with self._lock:
|
||||||
@@ -133,7 +132,7 @@ class RecheckPriorityQueue:
|
|||||||
self._priority_items.remove(item)
|
self._priority_items.remove(item)
|
||||||
heapq.heapify(self._priority_items)
|
heapq.heapify(self._priority_items)
|
||||||
except Exception as cleanup_e:
|
except Exception as cleanup_e:
|
||||||
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def async_get(self):
|
async def async_get(self):
|
||||||
@@ -145,7 +144,7 @@ class RecheckPriorityQueue:
|
|||||||
# Get highest priority item
|
# Get highest priority item
|
||||||
with self._lock:
|
with self._lock:
|
||||||
if not self._priority_items:
|
if not self._priority_items:
|
||||||
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
|
logger.critical("CRITICAL: Async queue notification received but no priority items available")
|
||||||
raise Exception("Priority queue inconsistency")
|
raise Exception("Priority queue inconsistency")
|
||||||
item = heapq.heappop(self._priority_items)
|
item = heapq.heappop(self._priority_items)
|
||||||
|
|
||||||
@@ -156,7 +155,7 @@ class RecheckPriorityQueue:
|
|||||||
return item
|
return item
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# UTILITY METHODS
|
# UTILITY METHODS
|
||||||
@@ -166,7 +165,7 @@ class RecheckPriorityQueue:
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
return len(self._priority_items)
|
return len(self._priority_items)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get queue size: {e}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def empty(self) -> bool:
|
def empty(self) -> bool:
|
||||||
@@ -179,7 +178,7 @@ class RecheckPriorityQueue:
|
|||||||
self._janus_queue.close()
|
self._janus_queue.close()
|
||||||
logger.debug("RecheckPriorityQueue closed successfully")
|
logger.debug("RecheckPriorityQueue closed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
|
||||||
|
|
||||||
# COMPATIBILITY METHODS (from original implementation)
|
# COMPATIBILITY METHODS (from original implementation)
|
||||||
@property
|
@property
|
||||||
@@ -189,7 +188,7 @@ class RecheckPriorityQueue:
|
|||||||
with self._lock:
|
with self._lock:
|
||||||
return list(self._priority_items)
|
return list(self._priority_items)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get queue list: {e}")
|
||||||
return []
|
return []
|
||||||
|
|
||||||
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
||||||
@@ -219,7 +218,7 @@ class RecheckPriorityQueue:
|
|||||||
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
|
||||||
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
||||||
|
|
||||||
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
||||||
@@ -254,7 +253,7 @@ class RecheckPriorityQueue:
|
|||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
|
||||||
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
||||||
|
|
||||||
def get_queue_summary(self) -> Dict[str, Any]:
|
def get_queue_summary(self) -> Dict[str, Any]:
|
||||||
@@ -295,7 +294,7 @@ class RecheckPriorityQueue:
|
|||||||
}
|
}
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
|
||||||
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
||||||
'clone_items': 0, 'scheduled_items': 0}
|
'clone_items': 0, 'scheduled_items': 0}
|
||||||
|
|
||||||
@@ -323,7 +322,7 @@ class RecheckPriorityQueue:
|
|||||||
self.queue_length_signal.send(length=self.qsize())
|
self.queue_length_signal.send(length=self.qsize())
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
|
||||||
|
|
||||||
def _emit_get_signals(self):
|
def _emit_get_signals(self):
|
||||||
"""Emit signals when item is removed"""
|
"""Emit signals when item is removed"""
|
||||||
@@ -331,7 +330,7 @@ class RecheckPriorityQueue:
|
|||||||
if self.queue_length_signal:
|
if self.queue_length_signal:
|
||||||
self.queue_length_signal.send(length=self.qsize())
|
self.queue_length_signal.send(length=self.qsize())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
|
||||||
|
|
||||||
|
|
||||||
class NotificationQueue:
|
class NotificationQueue:
|
||||||
@@ -357,7 +356,7 @@ class NotificationQueue:
|
|||||||
self.notification_event_signal = signal('notification_event')
|
self.notification_event_signal = signal('notification_event')
|
||||||
logger.debug("NotificationQueue initialized successfully")
|
logger.debug("NotificationQueue initialized successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
|
||||||
raise
|
raise
|
||||||
|
|
||||||
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
||||||
@@ -368,7 +367,7 @@ class NotificationQueue:
|
|||||||
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
async def async_put(self, item: Dict[str, Any]):
|
async def async_put(self, item: Dict[str, Any]):
|
||||||
@@ -379,35 +378,31 @@ class NotificationQueue:
|
|||||||
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
||||||
return True
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||||
"""Thread-safe sync get"""
|
"""Thread-safe sync get"""
|
||||||
try:
|
try:
|
||||||
return self.sync_q.get(block=block, timeout=timeout)
|
return self.sync_q.get(block=block, timeout=timeout)
|
||||||
except queue.Empty as e:
|
|
||||||
raise e
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get notification: {e}")
|
||||||
raise e
|
raise
|
||||||
|
|
||||||
async def async_get(self):
|
async def async_get(self):
|
||||||
"""Pure async get"""
|
"""Pure async get"""
|
||||||
try:
|
try:
|
||||||
return await self.async_q.get()
|
return await self.async_q.get()
|
||||||
except queue.Empty as e:
|
|
||||||
raise e
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to async get notification: {e}")
|
||||||
raise e
|
raise
|
||||||
|
|
||||||
def qsize(self) -> int:
|
def qsize(self) -> int:
|
||||||
"""Get current queue size"""
|
"""Get current queue size"""
|
||||||
try:
|
try:
|
||||||
return self.sync_q.qsize()
|
return self.sync_q.qsize()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
|
||||||
return 0
|
return 0
|
||||||
|
|
||||||
def empty(self) -> bool:
|
def empty(self) -> bool:
|
||||||
@@ -420,7 +415,7 @@ class NotificationQueue:
|
|||||||
self._janus_queue.close()
|
self._janus_queue.close()
|
||||||
logger.debug("NotificationQueue closed successfully")
|
logger.debug("NotificationQueue closed successfully")
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
|
||||||
|
|
||||||
def _emit_notification_signal(self, item: Dict[str, Any]):
|
def _emit_notification_signal(self, item: Dict[str, Any]):
|
||||||
"""Emit notification signal"""
|
"""Emit notification signal"""
|
||||||
@@ -432,4 +427,4 @@ class NotificationQueue:
|
|||||||
else:
|
else:
|
||||||
self.notification_event_signal.send()
|
self.notification_event_signal.send()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")
|
logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")
|
||||||
@@ -46,7 +46,7 @@ apprise==1.9.3
|
|||||||
paho-mqtt!=2.0.*
|
paho-mqtt!=2.0.*
|
||||||
|
|
||||||
# Requires extra wheel for rPi
|
# Requires extra wheel for rPi
|
||||||
#cryptography~=42.0.8
|
cryptography~=42.0.8
|
||||||
|
|
||||||
# Used for CSS filtering
|
# Used for CSS filtering
|
||||||
beautifulsoup4>=4.0.0
|
beautifulsoup4>=4.0.0
|
||||||
|
|||||||
Reference in New Issue
Block a user