Compare commits

..

5 Commits

Author SHA1 Message Date
dgtlmoon
213a18c061 Update changedetectionio/queue_handlers.py
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-08-18 09:21:28 +02:00
dgtlmoon
1633b94511 Merge branch 'master' into janus-queue
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64/v8 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-08-14 00:14:51 +02:00
dgtlmoon
928b97e6e5 Improve docs
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-08-11 17:42:34 +02:00
dgtlmoon
ea09168650 Rename queue workers 2025-08-11 17:36:51 +02:00
dgtlmoon
f0588a9dd1 Slightly more reliable queue handlers 2025-08-10 18:21:56 +02:00
9 changed files with 39 additions and 44 deletions

View File

@@ -30,7 +30,7 @@ jobs:
steps: steps:
- name: Checkout repository - name: Checkout repository
uses: actions/checkout@v5 uses: actions/checkout@v4
# Initializes the CodeQL tools for scanning. # Initializes the CodeQL tools for scanning.
- name: Initialize CodeQL - name: Initialize CodeQL

View File

@@ -39,7 +39,7 @@ jobs:
# Or if we are in a tagged release scenario. # Or if we are in a tagged release scenario.
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != '' if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v4
- name: Set up Python 3.11 - name: Set up Python 3.11
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:

View File

@@ -7,7 +7,7 @@ jobs:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v4
- name: Set up Python - name: Set up Python
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:

View File

@@ -46,7 +46,7 @@ jobs:
- platform: linux/arm64 - platform: linux/arm64
dockerfile: ./.github/test/Dockerfile-alpine dockerfile: ./.github/test/Dockerfile-alpine
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v4
- name: Set up Python 3.11 - name: Set up Python 3.11
uses: actions/setup-python@v5 uses: actions/setup-python@v5
with: with:

View File

@@ -7,7 +7,7 @@ jobs:
lint-code: lint-code:
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v4
- name: Lint with Ruff - name: Lint with Ruff
run: | run: |
pip install ruff pip install ruff

View File

@@ -20,7 +20,7 @@ jobs:
env: env:
PYTHON_VERSION: ${{ inputs.python-version }} PYTHON_VERSION: ${{ inputs.python-version }}
steps: steps:
- uses: actions/checkout@v5 - uses: actions/checkout@v4
# Mainly just for link/flake8 # Mainly just for link/flake8
- name: Set up Python ${{ env.PYTHON_VERSION }} - name: Set up Python ${{ env.PYTHON_VERSION }}

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki # Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.9' __version__ = '0.50.8'
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError

View File

@@ -1,14 +1,13 @@
import heapq
import threading
from typing import Dict, List, Any, Optional
from blinker import signal from blinker import signal
from loguru import logger from loguru import logger
from typing import Dict, List, Any, Optional
import heapq
import queue
import threading
try: try:
import janus import janus
except ImportError: except ImportError:
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus") logger.critical("CRITICAL: janus library is required. Install with: pip install janus")
raise raise
@@ -52,7 +51,7 @@ class RecheckPriorityQueue:
logger.debug("RecheckPriorityQueue initialized successfully") logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {e}")
raise raise
# SYNC INTERFACE (for ticker thread) # SYNC INTERFACE (for ticker thread)
@@ -73,7 +72,7 @@ class RecheckPriorityQueue:
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}") logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {e}")
# Remove from priority storage if janus put failed # Remove from priority storage if janus put failed
try: try:
with self._lock: with self._lock:
@@ -81,7 +80,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item) self._priority_items.remove(item)
heapq.heapify(self._priority_items) heapq.heapify(self._priority_items)
except Exception as cleanup_e: except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}") logger.critical(f"CRITICAL: Failed to cleanup after put failure: {cleanup_e}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
@@ -93,7 +92,7 @@ class RecheckPriorityQueue:
# Get highest priority item # Get highest priority item
with self._lock: with self._lock:
if not self._priority_items: if not self._priority_items:
logger.critical(f"CRITICAL: Queue notification received but no priority items available") logger.critical("CRITICAL: Queue notification received but no priority items available")
raise Exception("Priority queue inconsistency") raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items) item = heapq.heappop(self._priority_items)
@@ -104,7 +103,7 @@ class RecheckPriorityQueue:
return item return item
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}") logger.critical(f"CRITICAL: Failed to get item from queue: {e}")
raise raise
# ASYNC INTERFACE (for workers) # ASYNC INTERFACE (for workers)
@@ -125,7 +124,7 @@ class RecheckPriorityQueue:
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}") logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {e}")
# Remove from priority storage if janus put failed # Remove from priority storage if janus put failed
try: try:
with self._lock: with self._lock:
@@ -133,7 +132,7 @@ class RecheckPriorityQueue:
self._priority_items.remove(item) self._priority_items.remove(item)
heapq.heapify(self._priority_items) heapq.heapify(self._priority_items)
except Exception as cleanup_e: except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}") logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {cleanup_e}")
return False return False
async def async_get(self): async def async_get(self):
@@ -145,7 +144,7 @@ class RecheckPriorityQueue:
# Get highest priority item # Get highest priority item
with self._lock: with self._lock:
if not self._priority_items: if not self._priority_items:
logger.critical(f"CRITICAL: Async queue notification received but no priority items available") logger.critical("CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency") raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items) item = heapq.heappop(self._priority_items)
@@ -156,7 +155,7 @@ class RecheckPriorityQueue:
return item return item
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}") logger.critical(f"CRITICAL: Failed to async get item from queue: {e}")
raise raise
# UTILITY METHODS # UTILITY METHODS
@@ -166,7 +165,7 @@ class RecheckPriorityQueue:
with self._lock: with self._lock:
return len(self._priority_items) return len(self._priority_items)
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}") logger.critical(f"CRITICAL: Failed to get queue size: {e}")
return 0 return 0
def empty(self) -> bool: def empty(self) -> bool:
@@ -179,7 +178,7 @@ class RecheckPriorityQueue:
self._janus_queue.close() self._janus_queue.close()
logger.debug("RecheckPriorityQueue closed successfully") logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {e}")
# COMPATIBILITY METHODS (from original implementation) # COMPATIBILITY METHODS (from original implementation)
@property @property
@@ -189,7 +188,7 @@ class RecheckPriorityQueue:
with self._lock: with self._lock:
return list(self._priority_items) return list(self._priority_items)
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}") logger.critical(f"CRITICAL: Failed to get queue list: {e}")
return [] return []
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]: def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
@@ -219,7 +218,7 @@ class RecheckPriorityQueue:
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False} return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}") logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {e}")
return {'position': None, 'total_items': 0, 'priority': None, 'found': False} return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]: def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
@@ -254,7 +253,7 @@ class RecheckPriorityQueue:
} }
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}") logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {e}")
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False} return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
def get_queue_summary(self) -> Dict[str, Any]: def get_queue_summary(self) -> Dict[str, Any]:
@@ -295,7 +294,7 @@ class RecheckPriorityQueue:
} }
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}") logger.critical(f"CRITICAL: Failed to get queue summary: {e}")
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0, return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
'clone_items': 0, 'scheduled_items': 0} 'clone_items': 0, 'scheduled_items': 0}
@@ -323,7 +322,7 @@ class RecheckPriorityQueue:
self.queue_length_signal.send(length=self.qsize()) self.queue_length_signal.send(length=self.qsize())
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}") logger.critical(f"CRITICAL: Failed to emit put signals: {e}")
def _emit_get_signals(self): def _emit_get_signals(self):
"""Emit signals when item is removed""" """Emit signals when item is removed"""
@@ -331,7 +330,7 @@ class RecheckPriorityQueue:
if self.queue_length_signal: if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize()) self.queue_length_signal.send(length=self.qsize())
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}") logger.critical(f"CRITICAL: Failed to emit get signals: {e}")
class NotificationQueue: class NotificationQueue:
@@ -357,7 +356,7 @@ class NotificationQueue:
self.notification_event_signal = signal('notification_event') self.notification_event_signal = signal('notification_event')
logger.debug("NotificationQueue initialized successfully") logger.debug("NotificationQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {e}")
raise raise
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None): def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
@@ -368,7 +367,7 @@ class NotificationQueue:
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}") logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}") logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {e}")
return False return False
async def async_put(self, item: Dict[str, Any]): async def async_put(self, item: Dict[str, Any]):
@@ -379,35 +378,31 @@ class NotificationQueue:
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}") logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}") logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {e}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get""" """Thread-safe sync get"""
try: try:
return self.sync_q.get(block=block, timeout=timeout) return self.sync_q.get(block=block, timeout=timeout)
except queue.Empty as e:
raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}") logger.critical(f"CRITICAL: Failed to get notification: {e}")
raise e raise
async def async_get(self): async def async_get(self):
"""Pure async get""" """Pure async get"""
try: try:
return await self.async_q.get() return await self.async_q.get()
except queue.Empty as e:
raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}") logger.critical(f"CRITICAL: Failed to async get notification: {e}")
raise e raise
def qsize(self) -> int: def qsize(self) -> int:
"""Get current queue size""" """Get current queue size"""
try: try:
return self.sync_q.qsize() return self.sync_q.qsize()
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}") logger.critical(f"CRITICAL: Failed to get notification queue size: {e}")
return 0 return 0
def empty(self) -> bool: def empty(self) -> bool:
@@ -420,7 +415,7 @@ class NotificationQueue:
self._janus_queue.close() self._janus_queue.close()
logger.debug("NotificationQueue closed successfully") logger.debug("NotificationQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to close NotificationQueue: {e}")
def _emit_notification_signal(self, item: Dict[str, Any]): def _emit_notification_signal(self, item: Dict[str, Any]):
"""Emit notification signal""" """Emit notification signal"""
@@ -432,4 +427,4 @@ class NotificationQueue:
else: else:
self.notification_event_signal.send() self.notification_event_signal.send()
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}") logger.critical(f"CRITICAL: Failed to emit notification signal: {e}")

View File

@@ -46,7 +46,7 @@ apprise==1.9.3
paho-mqtt!=2.0.* paho-mqtt!=2.0.*
# Requires extra wheel for rPi # Requires extra wheel for rPi
#cryptography~=42.0.8 cryptography~=42.0.8
# Used for CSS filtering # Used for CSS filtering
beautifulsoup4>=4.0.0 beautifulsoup4>=4.0.0