mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-22 17:36:09 +00:00
Compare commits
1 Commits
0.50.10
...
cryptograp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
767168a9bf |
2
.github/workflows/codeql-analysis.yml
vendored
2
.github/workflows/codeql-analysis.yml
vendored
@@ -30,7 +30,7 @@ jobs:
|
|||||||
|
|
||||||
steps:
|
steps:
|
||||||
- name: Checkout repository
|
- name: Checkout repository
|
||||||
uses: actions/checkout@v5
|
uses: actions/checkout@v4
|
||||||
|
|
||||||
# Initializes the CodeQL tools for scanning.
|
# Initializes the CodeQL tools for scanning.
|
||||||
- name: Initialize CodeQL
|
- name: Initialize CodeQL
|
||||||
|
|||||||
2
.github/workflows/containers.yml
vendored
2
.github/workflows/containers.yml
vendored
@@ -39,7 +39,7 @@ jobs:
|
|||||||
# Or if we are in a tagged release scenario.
|
# Or if we are in a tagged release scenario.
|
||||||
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
|
if: ${{ github.event.workflow_run.conclusion == 'success' }} || ${{ github.event.release.tag_name }} != ''
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python 3.11
|
- name: Set up Python 3.11
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/pypi-release.yml
vendored
2
.github/workflows/pypi-release.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
|||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python
|
- name: Set up Python
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/test-container-build.yml
vendored
2
.github/workflows/test-container-build.yml
vendored
@@ -46,7 +46,7 @@ jobs:
|
|||||||
- platform: linux/arm64
|
- platform: linux/arm64
|
||||||
dockerfile: ./.github/test/Dockerfile-alpine
|
dockerfile: ./.github/test/Dockerfile-alpine
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Set up Python 3.11
|
- name: Set up Python 3.11
|
||||||
uses: actions/setup-python@v5
|
uses: actions/setup-python@v5
|
||||||
with:
|
with:
|
||||||
|
|||||||
2
.github/workflows/test-only.yml
vendored
2
.github/workflows/test-only.yml
vendored
@@ -7,7 +7,7 @@ jobs:
|
|||||||
lint-code:
|
lint-code:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
- name: Lint with Ruff
|
- name: Lint with Ruff
|
||||||
run: |
|
run: |
|
||||||
pip install ruff
|
pip install ruff
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ jobs:
|
|||||||
env:
|
env:
|
||||||
PYTHON_VERSION: ${{ inputs.python-version }}
|
PYTHON_VERSION: ${{ inputs.python-version }}
|
||||||
steps:
|
steps:
|
||||||
- uses: actions/checkout@v5
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
# Mainly just for link/flake8
|
# Mainly just for link/flake8
|
||||||
- name: Set up Python ${{ env.PYTHON_VERSION }}
|
- name: Set up Python ${{ env.PYTHON_VERSION }}
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||||
|
|
||||||
__version__ = '0.50.10'
|
__version__ = '0.50.8'
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
@@ -35,22 +35,13 @@ def sigshutdown_handler(_signo, _stack_frame):
|
|||||||
app.config.exit.set()
|
app.config.exit.set()
|
||||||
datastore.stop_thread = True
|
datastore.stop_thread = True
|
||||||
|
|
||||||
# Shutdown workers and queues immediately
|
# Shutdown workers immediately
|
||||||
try:
|
try:
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
worker_handler.shutdown_workers()
|
worker_handler.shutdown_workers()
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error shutting down workers: {str(e)}")
|
logger.error(f"Error shutting down workers: {str(e)}")
|
||||||
|
|
||||||
# Close janus queues properly
|
|
||||||
try:
|
|
||||||
from changedetectionio.flask_app import update_q, notification_q
|
|
||||||
update_q.close()
|
|
||||||
notification_q.close()
|
|
||||||
logger.debug("Janus queues closed successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to close janus queues: {e}")
|
|
||||||
|
|
||||||
# Shutdown socketio server fast
|
# Shutdown socketio server fast
|
||||||
from changedetectionio.flask_app import socketio_server
|
from changedetectionio.flask_app import socketio_server
|
||||||
if socketio_server and hasattr(socketio_server, 'shutdown'):
|
if socketio_server and hasattr(socketio_server, 'shutdown'):
|
||||||
|
|||||||
@@ -1,8 +1,5 @@
|
|||||||
from changedetectionio import queuedWatchMetaData
|
|
||||||
from changedetectionio import worker_handler
|
|
||||||
from flask_expects_json import expects_json
|
from flask_expects_json import expects_json
|
||||||
from flask_restful import abort, Resource
|
from flask_restful import abort, Resource
|
||||||
|
|
||||||
from flask import request
|
from flask import request
|
||||||
from . import auth
|
from . import auth
|
||||||
|
|
||||||
@@ -14,24 +11,21 @@ class Tag(Resource):
|
|||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
# datastore is a black box dependency
|
# datastore is a black box dependency
|
||||||
self.datastore = kwargs['datastore']
|
self.datastore = kwargs['datastore']
|
||||||
self.update_q = kwargs['update_q']
|
|
||||||
|
|
||||||
# Get information about a single tag
|
# Get information about a single tag
|
||||||
# curl http://localhost:5000/api/v1/tag/<string:uuid>
|
# curl http://localhost:5000/api/v1/tag/<string:uuid>
|
||||||
@auth.check_token
|
@auth.check_token
|
||||||
def get(self, uuid):
|
def get(self, uuid):
|
||||||
"""
|
"""
|
||||||
@api {get} /api/v1/tag/:uuid Single tag - Get data, toggle notification muting, recheck all.
|
@api {get} /api/v1/tag/:uuid Single tag - get data or toggle notification muting.
|
||||||
@apiDescription Retrieve tag information, set notification_muted status, recheck all in tag.
|
@apiDescription Retrieve tag information and set notification_muted status
|
||||||
@apiExample {curl} Example usage:
|
@apiExample {curl} Example usage:
|
||||||
curl http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091 -H"x-api-key:813031b16330fe25e3780cf0325daa45"
|
curl http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091 -H"x-api-key:813031b16330fe25e3780cf0325daa45"
|
||||||
curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?muted=muted" -H"x-api-key:813031b16330fe25e3780cf0325daa45"
|
curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?muted=muted" -H"x-api-key:813031b16330fe25e3780cf0325daa45"
|
||||||
curl "http://localhost:5000/api/v1/tag/cc0cfffa-f449-477b-83ea-0caafd1dc091?recheck=true" -H"x-api-key:813031b16330fe25e3780cf0325daa45"
|
|
||||||
@apiName Tag
|
@apiName Tag
|
||||||
@apiGroup Tag
|
@apiGroup Tag
|
||||||
@apiParam {uuid} uuid Tag unique ID.
|
@apiParam {uuid} uuid Tag unique ID.
|
||||||
@apiQuery {String} [muted] =`muted` or =`unmuted` , Sets the MUTE NOTIFICATIONS state
|
@apiQuery {String} [muted] =`muted` or =`unmuted` , Sets the MUTE NOTIFICATIONS state
|
||||||
@apiQuery {String} [recheck] = True, Queue all watches with this tag for recheck
|
|
||||||
@apiSuccess (200) {String} OK When muted operation OR full JSON object of the tag
|
@apiSuccess (200) {String} OK When muted operation OR full JSON object of the tag
|
||||||
@apiSuccess (200) {JSON} TagJSON JSON Full JSON object of the tag
|
@apiSuccess (200) {JSON} TagJSON JSON Full JSON object of the tag
|
||||||
"""
|
"""
|
||||||
@@ -40,20 +34,6 @@ class Tag(Resource):
|
|||||||
if not tag:
|
if not tag:
|
||||||
abort(404, message=f'No tag exists with the UUID of {uuid}')
|
abort(404, message=f'No tag exists with the UUID of {uuid}')
|
||||||
|
|
||||||
if request.args.get('recheck'):
|
|
||||||
# Recheck all, including muted
|
|
||||||
# Get most overdue first
|
|
||||||
i=0
|
|
||||||
for k in sorted(self.datastore.data['watching'].items(), key=lambda item: item[1].get('last_checked', 0)):
|
|
||||||
watch_uuid = k[0]
|
|
||||||
watch = k[1]
|
|
||||||
if not watch['paused'] and tag['uuid'] not in watch['tags']:
|
|
||||||
continue
|
|
||||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
|
||||||
i+=1
|
|
||||||
|
|
||||||
return f"OK, {i} watches queued", 200
|
|
||||||
|
|
||||||
if request.args.get('muted', '') == 'muted':
|
if request.args.get('muted', '') == 'muted':
|
||||||
self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True
|
self.datastore.data['settings']['application']['tags'][uuid]['notification_muted'] = True
|
||||||
return "OK", 200
|
return "OK", 200
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ from changedetectionio.flask_app import watch_check_update
|
|||||||
import asyncio
|
import asyncio
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
import queue
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@@ -38,23 +37,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
watch = None
|
watch = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use native janus async interface - no threads needed!
|
# Use asyncio wait_for to make queue.get() cancellable
|
||||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
queued_item_data = await asyncio.wait_for(q.get(), timeout=1.0)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# No jobs available, continue loop
|
# No jobs available, continue loop
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
logger.error(f"Worker {worker_id} error getting queue item: {e}")
|
||||||
|
|
||||||
# Log queue health for debugging
|
|
||||||
try:
|
|
||||||
queue_size = q.qsize()
|
|
||||||
is_empty = q.empty()
|
|
||||||
logger.critical(f"CRITICAL: Worker {worker_id} queue health - size: {queue_size}, empty: {is_empty}")
|
|
||||||
except Exception as health_e:
|
|
||||||
logger.critical(f"CRITICAL: Worker {worker_id} queue health check failed: {health_e}")
|
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(0.1)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|||||||
@@ -12,7 +12,7 @@ from blinker import signal
|
|||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from threading import Event
|
from threading import Event
|
||||||
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
|
from changedetectionio.custom_queue import SignalPriorityQueue, AsyncSignalPriorityQueue, NotificationQueue
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
|
|
||||||
from flask import (
|
from flask import (
|
||||||
@@ -48,8 +48,8 @@ datastore = None
|
|||||||
ticker_thread = None
|
ticker_thread = None
|
||||||
extra_stylesheets = []
|
extra_stylesheets = []
|
||||||
|
|
||||||
# Use bulletproof janus-based queues for sync/async reliability
|
# Use async queue by default, keep sync for backward compatibility
|
||||||
update_q = RecheckPriorityQueue()
|
update_q = AsyncSignalPriorityQueue() if worker_handler.USE_ASYNC_WORKERS else SignalPriorityQueue()
|
||||||
notification_q = NotificationQueue()
|
notification_q = NotificationQueue()
|
||||||
MAX_QUEUE_SIZE = 2000
|
MAX_QUEUE_SIZE = 2000
|
||||||
|
|
||||||
@@ -329,7 +329,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
resource_class_kwargs={'datastore': datastore})
|
resource_class_kwargs={'datastore': datastore})
|
||||||
|
|
||||||
watch_api.add_resource(Tag, '/api/v1/tag', '/api/v1/tag/<string:uuid>',
|
watch_api.add_resource(Tag, '/api/v1/tag', '/api/v1/tag/<string:uuid>',
|
||||||
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
|
resource_class_kwargs={'datastore': datastore})
|
||||||
|
|
||||||
watch_api.add_resource(Search, '/api/v1/search',
|
watch_api.add_resource(Search, '/api/v1/search',
|
||||||
resource_class_kwargs={'datastore': datastore})
|
resource_class_kwargs={'datastore': datastore})
|
||||||
@@ -844,21 +844,15 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
|
|
||||||
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
||||||
priority = int(time.time())
|
priority = int(time.time())
|
||||||
|
|
||||||
# Into the queue with you
|
|
||||||
queued_successfully = worker_handler.queue_item_async_safe(update_q,
|
|
||||||
queuedWatchMetaData.PrioritizedItem(priority=priority,
|
|
||||||
item={'uuid': uuid})
|
|
||||||
)
|
|
||||||
if queued_successfully:
|
|
||||||
logger.debug(
|
logger.debug(
|
||||||
f"> Queued watch UUID {uuid} "
|
f"> Queued watch UUID {uuid} "
|
||||||
f"last checked at {watch['last_checked']} "
|
f"last checked at {watch['last_checked']} "
|
||||||
f"queued at {now:0.2f} priority {priority} "
|
f"queued at {now:0.2f} priority {priority} "
|
||||||
f"jitter {watch.jitter_seconds:0.2f}s, "
|
f"jitter {watch.jitter_seconds:0.2f}s, "
|
||||||
f"{now - watch['last_checked']:0.2f}s since last checked")
|
f"{now - watch['last_checked']:0.2f}s since last checked")
|
||||||
else:
|
|
||||||
logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!")
|
# Into the queue with you
|
||||||
|
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}))
|
||||||
|
|
||||||
# Reset for next time
|
# Reset for next time
|
||||||
watch.jitter_seconds = 0
|
watch.jitter_seconds = 0
|
||||||
|
|||||||
@@ -1,435 +0,0 @@
|
|||||||
from blinker import signal
|
|
||||||
from loguru import logger
|
|
||||||
from typing import Dict, List, Any, Optional
|
|
||||||
import heapq
|
|
||||||
import queue
|
|
||||||
import threading
|
|
||||||
|
|
||||||
try:
|
|
||||||
import janus
|
|
||||||
except ImportError:
|
|
||||||
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
|
|
||||||
raise
|
|
||||||
|
|
||||||
|
|
||||||
class RecheckPriorityQueue:
|
|
||||||
"""
|
|
||||||
Ultra-reliable priority queue using janus for async/sync bridging.
|
|
||||||
|
|
||||||
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
|
|
||||||
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
|
|
||||||
- async_q: Used by async workers (the actual fetchers/processors) and coroutines
|
|
||||||
|
|
||||||
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
|
|
||||||
- Synchronous code (Flask, threads) cannot use async methods without blocking
|
|
||||||
- Async code cannot use sync methods without blocking the event loop
|
|
||||||
- janus provides the only safe bridge between these two worlds
|
|
||||||
|
|
||||||
Attempting to unify to async-only would require:
|
|
||||||
- Converting all Flask routes to async (major breaking change)
|
|
||||||
- Using asyncio.run() in sync contexts (causes deadlocks)
|
|
||||||
- Thread-pool wrapping (adds complexity and overhead)
|
|
||||||
|
|
||||||
Minimal implementation focused on reliability:
|
|
||||||
- Pure janus for sync/async bridge
|
|
||||||
- Thread-safe priority ordering
|
|
||||||
- Bulletproof error handling with critical logging
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, maxsize: int = 0):
|
|
||||||
try:
|
|
||||||
self._janus_queue = janus.Queue(maxsize=maxsize)
|
|
||||||
# BOTH interfaces required - see class docstring for why
|
|
||||||
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread
|
|
||||||
self.async_q = self._janus_queue.async_q # Async workers
|
|
||||||
|
|
||||||
# Priority storage - thread-safe
|
|
||||||
self._priority_items = []
|
|
||||||
self._lock = threading.RLock()
|
|
||||||
|
|
||||||
# Signals for UI updates
|
|
||||||
self.queue_length_signal = signal('queue_length')
|
|
||||||
|
|
||||||
logger.debug("RecheckPriorityQueue initialized successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
# SYNC INTERFACE (for ticker thread)
|
|
||||||
def put(self, item, block: bool = True, timeout: Optional[float] = None):
|
|
||||||
"""Thread-safe sync put with priority ordering"""
|
|
||||||
try:
|
|
||||||
# Add to priority storage
|
|
||||||
with self._lock:
|
|
||||||
heapq.heappush(self._priority_items, item)
|
|
||||||
|
|
||||||
# Notify via janus sync queue
|
|
||||||
self.sync_q.put(True, block=block, timeout=timeout)
|
|
||||||
|
|
||||||
# Emit signals
|
|
||||||
self._emit_put_signals(item)
|
|
||||||
|
|
||||||
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
|
|
||||||
# Remove from priority storage if janus put failed
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
if item in self._priority_items:
|
|
||||||
self._priority_items.remove(item)
|
|
||||||
heapq.heapify(self._priority_items)
|
|
||||||
except Exception as cleanup_e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
|
||||||
"""Thread-safe sync get with priority ordering"""
|
|
||||||
try:
|
|
||||||
# Wait for notification
|
|
||||||
self.sync_q.get(block=block, timeout=timeout)
|
|
||||||
|
|
||||||
# Get highest priority item
|
|
||||||
with self._lock:
|
|
||||||
if not self._priority_items:
|
|
||||||
logger.critical(f"CRITICAL: Queue notification received but no priority items available")
|
|
||||||
raise Exception("Priority queue inconsistency")
|
|
||||||
item = heapq.heappop(self._priority_items)
|
|
||||||
|
|
||||||
# Emit signals
|
|
||||||
self._emit_get_signals()
|
|
||||||
|
|
||||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
|
||||||
return item
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
# ASYNC INTERFACE (for workers)
|
|
||||||
async def async_put(self, item):
|
|
||||||
"""Pure async put with priority ordering"""
|
|
||||||
try:
|
|
||||||
# Add to priority storage
|
|
||||||
with self._lock:
|
|
||||||
heapq.heappush(self._priority_items, item)
|
|
||||||
|
|
||||||
# Notify via janus async queue
|
|
||||||
await self.async_q.put(True)
|
|
||||||
|
|
||||||
# Emit signals
|
|
||||||
self._emit_put_signals(item)
|
|
||||||
|
|
||||||
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
|
|
||||||
# Remove from priority storage if janus put failed
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
if item in self._priority_items:
|
|
||||||
self._priority_items.remove(item)
|
|
||||||
heapq.heapify(self._priority_items)
|
|
||||||
except Exception as cleanup_e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def async_get(self):
|
|
||||||
"""Pure async get with priority ordering"""
|
|
||||||
try:
|
|
||||||
# Wait for notification
|
|
||||||
await self.async_q.get()
|
|
||||||
|
|
||||||
# Get highest priority item
|
|
||||||
with self._lock:
|
|
||||||
if not self._priority_items:
|
|
||||||
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
|
|
||||||
raise Exception("Priority queue inconsistency")
|
|
||||||
item = heapq.heappop(self._priority_items)
|
|
||||||
|
|
||||||
# Emit signals
|
|
||||||
self._emit_get_signals()
|
|
||||||
|
|
||||||
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
|
|
||||||
return item
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
# UTILITY METHODS
|
|
||||||
def qsize(self) -> int:
|
|
||||||
"""Get current queue size"""
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
return len(self._priority_items)
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get queue size: {str(e)}")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def empty(self) -> bool:
|
|
||||||
"""Check if queue is empty"""
|
|
||||||
return self.qsize() == 0
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Close the janus queue"""
|
|
||||||
try:
|
|
||||||
self._janus_queue.close()
|
|
||||||
logger.debug("RecheckPriorityQueue closed successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
|
|
||||||
|
|
||||||
# COMPATIBILITY METHODS (from original implementation)
|
|
||||||
@property
|
|
||||||
def queue(self):
|
|
||||||
"""Provide compatibility with original queue access"""
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
return list(self._priority_items)
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get queue list: {str(e)}")
|
|
||||||
return []
|
|
||||||
|
|
||||||
def get_uuid_position(self, target_uuid: str) -> Dict[str, Any]:
|
|
||||||
"""Find position of UUID in queue"""
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
queue_list = list(self._priority_items)
|
|
||||||
total_items = len(queue_list)
|
|
||||||
|
|
||||||
if total_items == 0:
|
|
||||||
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
|
||||||
|
|
||||||
# Find target item
|
|
||||||
for item in queue_list:
|
|
||||||
if (hasattr(item, 'item') and isinstance(item.item, dict) and
|
|
||||||
item.item.get('uuid') == target_uuid):
|
|
||||||
|
|
||||||
# Count items with higher priority
|
|
||||||
position = sum(1 for other in queue_list if other.priority < item.priority)
|
|
||||||
return {
|
|
||||||
'position': position,
|
|
||||||
'total_items': total_items,
|
|
||||||
'priority': item.priority,
|
|
||||||
'found': True
|
|
||||||
}
|
|
||||||
|
|
||||||
return {'position': None, 'total_items': total_items, 'priority': None, 'found': False}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get UUID position for {target_uuid}: {str(e)}")
|
|
||||||
return {'position': None, 'total_items': 0, 'priority': None, 'found': False}
|
|
||||||
|
|
||||||
def get_all_queued_uuids(self, limit: Optional[int] = None, offset: int = 0) -> Dict[str, Any]:
|
|
||||||
"""Get all queued UUIDs with pagination"""
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
queue_list = sorted(self._priority_items) # Sort by priority
|
|
||||||
total_items = len(queue_list)
|
|
||||||
|
|
||||||
if total_items == 0:
|
|
||||||
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
|
||||||
|
|
||||||
# Apply pagination
|
|
||||||
end_idx = min(offset + limit, total_items) if limit else total_items
|
|
||||||
items_to_process = queue_list[offset:end_idx]
|
|
||||||
|
|
||||||
result = []
|
|
||||||
for position, item in enumerate(items_to_process, start=offset):
|
|
||||||
if (hasattr(item, 'item') and isinstance(item.item, dict) and
|
|
||||||
'uuid' in item.item):
|
|
||||||
result.append({
|
|
||||||
'uuid': item.item['uuid'],
|
|
||||||
'position': position,
|
|
||||||
'priority': item.priority
|
|
||||||
})
|
|
||||||
|
|
||||||
return {
|
|
||||||
'items': result,
|
|
||||||
'total_items': total_items,
|
|
||||||
'returned_items': len(result),
|
|
||||||
'has_more': (offset + len(result)) < total_items
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get all queued UUIDs: {str(e)}")
|
|
||||||
return {'items': [], 'total_items': 0, 'returned_items': 0, 'has_more': False}
|
|
||||||
|
|
||||||
def get_queue_summary(self) -> Dict[str, Any]:
|
|
||||||
"""Get queue summary statistics"""
|
|
||||||
try:
|
|
||||||
with self._lock:
|
|
||||||
queue_list = list(self._priority_items)
|
|
||||||
total_items = len(queue_list)
|
|
||||||
|
|
||||||
if total_items == 0:
|
|
||||||
return {
|
|
||||||
'total_items': 0, 'priority_breakdown': {},
|
|
||||||
'immediate_items': 0, 'clone_items': 0, 'scheduled_items': 0
|
|
||||||
}
|
|
||||||
|
|
||||||
immediate_items = clone_items = scheduled_items = 0
|
|
||||||
priority_counts = {}
|
|
||||||
|
|
||||||
for item in queue_list:
|
|
||||||
priority = item.priority
|
|
||||||
priority_counts[priority] = priority_counts.get(priority, 0) + 1
|
|
||||||
|
|
||||||
if priority == 1:
|
|
||||||
immediate_items += 1
|
|
||||||
elif priority == 5:
|
|
||||||
clone_items += 1
|
|
||||||
elif priority > 100:
|
|
||||||
scheduled_items += 1
|
|
||||||
|
|
||||||
return {
|
|
||||||
'total_items': total_items,
|
|
||||||
'priority_breakdown': priority_counts,
|
|
||||||
'immediate_items': immediate_items,
|
|
||||||
'clone_items': clone_items,
|
|
||||||
'scheduled_items': scheduled_items,
|
|
||||||
'min_priority': min(priority_counts.keys()) if priority_counts else None,
|
|
||||||
'max_priority': max(priority_counts.keys()) if priority_counts else None
|
|
||||||
}
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get queue summary: {str(e)}")
|
|
||||||
return {'total_items': 0, 'priority_breakdown': {}, 'immediate_items': 0,
|
|
||||||
'clone_items': 0, 'scheduled_items': 0}
|
|
||||||
|
|
||||||
# PRIVATE METHODS
|
|
||||||
def _get_item_uuid(self, item) -> str:
|
|
||||||
"""Safely extract UUID from item for logging"""
|
|
||||||
try:
|
|
||||||
if hasattr(item, 'item') and isinstance(item.item, dict):
|
|
||||||
return item.item.get('uuid', 'unknown')
|
|
||||||
except Exception:
|
|
||||||
pass
|
|
||||||
return 'unknown'
|
|
||||||
|
|
||||||
def _emit_put_signals(self, item):
|
|
||||||
"""Emit signals when item is added"""
|
|
||||||
try:
|
|
||||||
# Watch update signal
|
|
||||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
|
||||||
watch_check_update = signal('watch_check_update')
|
|
||||||
if watch_check_update:
|
|
||||||
watch_check_update.send(watch_uuid=item.item['uuid'])
|
|
||||||
|
|
||||||
# Queue length signal
|
|
||||||
if self.queue_length_signal:
|
|
||||||
self.queue_length_signal.send(length=self.qsize())
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
|
|
||||||
|
|
||||||
def _emit_get_signals(self):
|
|
||||||
"""Emit signals when item is removed"""
|
|
||||||
try:
|
|
||||||
if self.queue_length_signal:
|
|
||||||
self.queue_length_signal.send(length=self.qsize())
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to emit get signals: {str(e)}")
|
|
||||||
|
|
||||||
|
|
||||||
class NotificationQueue:
|
|
||||||
"""
|
|
||||||
Ultra-reliable notification queue using pure janus.
|
|
||||||
|
|
||||||
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
|
|
||||||
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
|
|
||||||
- async_q: Used by async workers and coroutines
|
|
||||||
|
|
||||||
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts.
|
|
||||||
See RecheckPriorityQueue docstring above for detailed explanation.
|
|
||||||
|
|
||||||
Simple wrapper around janus with bulletproof error handling.
|
|
||||||
"""
|
|
||||||
|
|
||||||
def __init__(self, maxsize: int = 0):
|
|
||||||
try:
|
|
||||||
self._janus_queue = janus.Queue(maxsize=maxsize)
|
|
||||||
# BOTH interfaces required - see class docstring for why
|
|
||||||
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
|
|
||||||
self.async_q = self._janus_queue.async_q # Async workers
|
|
||||||
self.notification_event_signal = signal('notification_event')
|
|
||||||
logger.debug("NotificationQueue initialized successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
|
|
||||||
raise
|
|
||||||
|
|
||||||
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
|
|
||||||
"""Thread-safe sync put with signal emission"""
|
|
||||||
try:
|
|
||||||
self.sync_q.put(item, block=block, timeout=timeout)
|
|
||||||
self._emit_notification_signal(item)
|
|
||||||
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
async def async_put(self, item: Dict[str, Any]):
|
|
||||||
"""Pure async put with signal emission"""
|
|
||||||
try:
|
|
||||||
await self.async_q.put(item)
|
|
||||||
self._emit_notification_signal(item)
|
|
||||||
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
|
||||||
return True
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
|
||||||
"""Thread-safe sync get"""
|
|
||||||
try:
|
|
||||||
return self.sync_q.get(block=block, timeout=timeout)
|
|
||||||
except queue.Empty as e:
|
|
||||||
raise e
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
async def async_get(self):
|
|
||||||
"""Pure async get"""
|
|
||||||
try:
|
|
||||||
return await self.async_q.get()
|
|
||||||
except queue.Empty as e:
|
|
||||||
raise e
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
|
|
||||||
raise e
|
|
||||||
|
|
||||||
def qsize(self) -> int:
|
|
||||||
"""Get current queue size"""
|
|
||||||
try:
|
|
||||||
return self.sync_q.qsize()
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
|
|
||||||
return 0
|
|
||||||
|
|
||||||
def empty(self) -> bool:
|
|
||||||
"""Check if queue is empty"""
|
|
||||||
return self.qsize() == 0
|
|
||||||
|
|
||||||
def close(self):
|
|
||||||
"""Close the janus queue"""
|
|
||||||
try:
|
|
||||||
self._janus_queue.close()
|
|
||||||
logger.debug("NotificationQueue closed successfully")
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
|
|
||||||
|
|
||||||
def _emit_notification_signal(self, item: Dict[str, Any]):
|
|
||||||
"""Emit notification signal"""
|
|
||||||
try:
|
|
||||||
if self.notification_event_signal and isinstance(item, dict):
|
|
||||||
watch_uuid = item.get('uuid')
|
|
||||||
if watch_uuid:
|
|
||||||
self.notification_event_signal.send(watch_uuid=watch_uuid)
|
|
||||||
else:
|
|
||||||
self.notification_event_signal.send()
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Failed to emit notification signal: {str(e)}")
|
|
||||||
@@ -1,18 +1,15 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, wait_for_all_checks, set_original_response
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
import json
|
import json
|
||||||
import time
|
|
||||||
|
|
||||||
def test_api_tags_listing(client, live_server, measure_memory_usage):
|
def test_api_tags_listing(client, live_server, measure_memory_usage):
|
||||||
# live_server_setup(live_server) # Setup on conftest per function
|
# live_server_setup(live_server) # Setup on conftest per function
|
||||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
tag_title = 'Test Tag'
|
tag_title = 'Test Tag'
|
||||||
|
|
||||||
|
# Get a listing
|
||||||
set_original_response()
|
|
||||||
|
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("tags"),
|
url_for("tags"),
|
||||||
headers={'x-api-key': api_key}
|
headers={'x-api-key': api_key}
|
||||||
@@ -107,8 +104,6 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
|
|||||||
assert res.status_code == 201
|
assert res.status_code == 201
|
||||||
watch_uuid = res.json.get('uuid')
|
watch_uuid = res.json.get('uuid')
|
||||||
|
|
||||||
|
|
||||||
wait_for_all_checks()
|
|
||||||
# Verify tag is associated with watch by name if need be
|
# Verify tag is associated with watch by name if need be
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("watch", uuid=watch_uuid),
|
url_for("watch", uuid=watch_uuid),
|
||||||
@@ -117,21 +112,6 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
|
|||||||
assert res.status_code == 200
|
assert res.status_code == 200
|
||||||
assert new_tag_uuid in res.json.get('tags', [])
|
assert new_tag_uuid in res.json.get('tags', [])
|
||||||
|
|
||||||
# Check recheck by tag
|
|
||||||
before_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked')
|
|
||||||
time.sleep(1)
|
|
||||||
res = client.get(
|
|
||||||
url_for("tag", uuid=new_tag_uuid) + "?recheck=true",
|
|
||||||
headers={'x-api-key': api_key}
|
|
||||||
)
|
|
||||||
wait_for_all_checks()
|
|
||||||
assert res.status_code == 200
|
|
||||||
assert b'OK, 1 watches' in res.data
|
|
||||||
|
|
||||||
after_check_time = live_server.app.config['DATASTORE'].data['watching'][watch_uuid].get('last_checked')
|
|
||||||
|
|
||||||
assert before_check_time != after_check_time
|
|
||||||
|
|
||||||
# Delete tag
|
# Delete tag
|
||||||
res = client.delete(
|
res = client.delete(
|
||||||
url_for("tag", uuid=new_tag_uuid),
|
url_for("tag", uuid=new_tag_uuid),
|
||||||
@@ -161,6 +141,3 @@ def test_api_tags_listing(client, live_server, measure_memory_usage):
|
|||||||
headers={'x-api-key': api_key},
|
headers={'x-api-key': api_key},
|
||||||
)
|
)
|
||||||
assert res.status_code == 204
|
assert res.status_code == 204
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -188,54 +188,15 @@ def is_watch_running(watch_uuid):
|
|||||||
|
|
||||||
|
|
||||||
def queue_item_async_safe(update_q, item):
|
def queue_item_async_safe(update_q, item):
|
||||||
"""Bulletproof queue operation with comprehensive error handling"""
|
"""Queue an item for async queue processing"""
|
||||||
item_uuid = 'unknown'
|
if async_loop and not async_loop.is_closed():
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Safely extract UUID for logging
|
# For async queue, schedule the put operation
|
||||||
if hasattr(item, 'item') and isinstance(item.item, dict):
|
asyncio.run_coroutine_threadsafe(update_q.put(item), async_loop)
|
||||||
item_uuid = item.item.get('uuid', 'unknown')
|
except RuntimeError as e:
|
||||||
except Exception as uuid_e:
|
logger.error(f"Failed to queue item: {e}")
|
||||||
logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
|
else:
|
||||||
|
logger.error("Async loop not available or closed for queueing item")
|
||||||
# Validate inputs
|
|
||||||
if not update_q:
|
|
||||||
logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
if not item:
|
|
||||||
logger.critical(f"CRITICAL: Item is None/invalid")
|
|
||||||
return False
|
|
||||||
|
|
||||||
# Attempt queue operation with multiple fallbacks
|
|
||||||
try:
|
|
||||||
# Primary: Use sync interface (thread-safe)
|
|
||||||
success = update_q.put(item, block=True, timeout=5.0)
|
|
||||||
if success is False: # Explicit False return means failure
|
|
||||||
logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
|
|
||||||
return False
|
|
||||||
|
|
||||||
logger.debug(f"Successfully queued item: {item_uuid}")
|
|
||||||
return True
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
|
|
||||||
|
|
||||||
# Secondary: Attempt queue health check
|
|
||||||
try:
|
|
||||||
queue_size = update_q.qsize()
|
|
||||||
is_empty = update_q.empty()
|
|
||||||
logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
|
|
||||||
except Exception as health_e:
|
|
||||||
logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
|
|
||||||
|
|
||||||
# Log queue type for debugging
|
|
||||||
try:
|
|
||||||
logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
|
|
||||||
except Exception:
|
|
||||||
logger.critical(f"CRITICAL: Cannot determine queue type")
|
|
||||||
|
|
||||||
return False
|
|
||||||
|
|
||||||
|
|
||||||
def shutdown_workers():
|
def shutdown_workers():
|
||||||
|
|||||||
@@ -7,7 +7,6 @@ flask-paginate
|
|||||||
flask_expects_json~=1.7
|
flask_expects_json~=1.7
|
||||||
flask_restful
|
flask_restful
|
||||||
flask_cors # For the Chrome extension to operate
|
flask_cors # For the Chrome extension to operate
|
||||||
janus # Thread-safe async/sync queue bridge
|
|
||||||
flask_wtf~=1.2
|
flask_wtf~=1.2
|
||||||
flask~=2.3
|
flask~=2.3
|
||||||
flask-socketio~=5.5.1
|
flask-socketio~=5.5.1
|
||||||
|
|||||||
Reference in New Issue
Block a user