mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-02 12:26:03 +00:00
Compare commits
1 Commits
event-loop
...
watch-coun
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
71eb1daf70 |
@@ -102,8 +102,8 @@ def sigshutdown_handler(_signo, _stack_frame):
|
||||
|
||||
# Shutdown workers and queues immediately
|
||||
try:
|
||||
from changedetectionio import worker_pool
|
||||
worker_pool.shutdown_workers()
|
||||
from changedetectionio import worker_handler
|
||||
worker_handler.shutdown_workers()
|
||||
except Exception as e:
|
||||
logger.error(f"Error shutting down workers: {str(e)}")
|
||||
|
||||
@@ -415,12 +415,12 @@ def main():
|
||||
# This must happen AFTER app initialization so update_q is available
|
||||
if batch_mode and added_watch_uuids:
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio import queuedWatchMetaData, worker_pool
|
||||
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||
|
||||
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
|
||||
for watch_uuid in added_watch_uuids:
|
||||
try:
|
||||
worker_pool.queue_item_async_safe(
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
@@ -432,7 +432,7 @@ def main():
|
||||
# This must happen AFTER app initialization so update_q is available
|
||||
if recheck_watches is not None:
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio import queuedWatchMetaData, worker_pool
|
||||
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||
|
||||
watches_to_queue = []
|
||||
if recheck_watches == 'all':
|
||||
@@ -454,7 +454,7 @@ def main():
|
||||
for watch_uuid in watches_to_queue:
|
||||
if watch_uuid in datastore.data['watching']:
|
||||
try:
|
||||
worker_pool.queue_item_async_safe(
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
@@ -516,7 +516,7 @@ def main():
|
||||
for watch_uuid in watches_to_queue:
|
||||
if watch_uuid in datastore.data['watching']:
|
||||
try:
|
||||
worker_pool.queue_item_async_safe(
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
@@ -549,7 +549,7 @@ def main():
|
||||
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
|
||||
|
||||
# Use the shared wait_for_all_checks function
|
||||
completed = worker_pool.wait_for_all_checks(update_q, timeout=300)
|
||||
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
|
||||
|
||||
if not completed:
|
||||
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from flask_expects_json import expects_json
|
||||
from flask_restful import abort, Resource
|
||||
from loguru import logger
|
||||
@@ -42,7 +42,7 @@ class Tag(Resource):
|
||||
# If less than 20 watches, queue synchronously for immediate feedback
|
||||
if len(watches_to_queue) < 20:
|
||||
for watch_uuid in watches_to_queue:
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200
|
||||
else:
|
||||
# 20+ watches - queue in background thread to avoid blocking API response
|
||||
@@ -50,7 +50,7 @@ class Tag(Resource):
|
||||
"""Background thread to queue watches - discarded after completion."""
|
||||
try:
|
||||
for watch_uuid in watches_to_queue:
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued")
|
||||
except Exception as e:
|
||||
logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}")
|
||||
|
||||
@@ -6,7 +6,7 @@ from changedetectionio.favicon_utils import get_favicon_mime_type
|
||||
|
||||
from . import auth
|
||||
from changedetectionio import queuedWatchMetaData, strtobool
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from flask import request, make_response, send_from_directory
|
||||
from flask_expects_json import expects_json
|
||||
from flask_restful import abort, Resource
|
||||
@@ -85,7 +85,7 @@ class Watch(Resource):
|
||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||
|
||||
if request.args.get('recheck'):
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
return "OK", 200
|
||||
if request.args.get('paused', '') == 'paused':
|
||||
self.datastore.data['watching'].get(uuid).pause()
|
||||
@@ -477,9 +477,19 @@ class CreateWatch(Resource):
|
||||
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
|
||||
if new_uuid:
|
||||
# Dont queue because the scheduler will check that it hasnt been checked before anyway
|
||||
# worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
|
||||
# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
|
||||
return {'uuid': new_uuid}, 201
|
||||
else:
|
||||
# Check if it was a limit issue
|
||||
page_watch_limit = os.getenv('PAGE_WATCH_LIMIT')
|
||||
if page_watch_limit:
|
||||
try:
|
||||
page_watch_limit = int(page_watch_limit)
|
||||
current_watch_count = len(self.datastore.data['watching'])
|
||||
if current_watch_count >= page_watch_limit:
|
||||
return f"Watch limit reached ({current_watch_count}/{page_watch_limit} watches). Cannot add more watches.", 429
|
||||
except ValueError:
|
||||
pass
|
||||
return "Invalid or unsupported URL", 400
|
||||
|
||||
@auth.check_token
|
||||
@@ -514,7 +524,7 @@ class CreateWatch(Resource):
|
||||
if len(watches_to_queue) < 20:
|
||||
# Get already queued/running UUIDs once (efficient)
|
||||
queued_uuids = set(self.update_q.get_queued_uuids())
|
||||
running_uuids = set(worker_pool.get_running_uuids())
|
||||
running_uuids = set(worker_handler.get_running_uuids())
|
||||
|
||||
# Filter out watches that are already queued or running
|
||||
watches_to_queue_filtered = [
|
||||
@@ -524,7 +534,7 @@ class CreateWatch(Resource):
|
||||
|
||||
# Queue only the filtered watches
|
||||
for uuid in watches_to_queue_filtered:
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
|
||||
# Provide feedback about skipped watches
|
||||
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
|
||||
@@ -536,7 +546,7 @@ class CreateWatch(Resource):
|
||||
# 20+ watches - queue in background thread to avoid blocking API response
|
||||
# Capture queued/running state before background thread
|
||||
queued_uuids = set(self.update_q.get_queued_uuids())
|
||||
running_uuids = set(worker_pool.get_running_uuids())
|
||||
running_uuids = set(worker_handler.get_running_uuids())
|
||||
|
||||
def queue_all_watches_background():
|
||||
"""Background thread to queue all watches - discarded after completion."""
|
||||
@@ -546,7 +556,7 @@ class CreateWatch(Resource):
|
||||
for uuid in watches_to_queue:
|
||||
# Check if already queued or running (state captured at start)
|
||||
if uuid not in queued_uuids and uuid not in running_uuids:
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
queued_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
|
||||
@@ -53,11 +53,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
watch = None
|
||||
|
||||
try:
|
||||
# Use async interface with custom executor to avoid thread pool exhaustion
|
||||
# With 30+ workers, we need executor sized to match (see worker_pool.py)
|
||||
# Use sync interface via run_in_executor since each worker has its own event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
queued_item_data = await asyncio.wait_for(
|
||||
q.async_get(executor=executor),
|
||||
timeout=1.0
|
||||
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
|
||||
timeout=1.5
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
@@ -67,17 +67,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
|
||||
return "restart"
|
||||
continue
|
||||
except RuntimeError as e:
|
||||
# Handle executor shutdown gracefully - this is expected during shutdown
|
||||
if "cannot schedule new futures after shutdown" in str(e):
|
||||
# Executor shut down - exit gracefully without logging in pytest
|
||||
if not IN_PYTEST:
|
||||
logger.debug(f"Worker {worker_id} detected executor shutdown, exiting")
|
||||
break
|
||||
# Other RuntimeError - log and continue
|
||||
logger.error(f"Worker {worker_id} runtime error: {e}")
|
||||
await asyncio.sleep(0.1)
|
||||
continue
|
||||
except Exception as e:
|
||||
# Handle expected Empty exception from queue timeout
|
||||
import queue
|
||||
@@ -102,11 +91,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
uuid = queued_item_data.item.get('uuid')
|
||||
|
||||
# RACE CONDITION FIX: Atomically claim this UUID for processing
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||
|
||||
# Try to claim the UUID atomically - prevents duplicate processing
|
||||
if not worker_pool.claim_uuid_for_processing(uuid, worker_id):
|
||||
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
|
||||
# Already being processed by another worker
|
||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
|
||||
|
||||
@@ -116,7 +105,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
# Re-queue with lower priority so it gets checked again after current processing finishes
|
||||
deferred_priority = max(1000, queued_item_data.priority * 10)
|
||||
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
|
||||
worker_pool.queue_item_async_safe(q, deferred_item, silent=True)
|
||||
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
|
||||
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
|
||||
continue
|
||||
|
||||
@@ -501,7 +490,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
try:
|
||||
# Release UUID from processing (thread-safe)
|
||||
worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id)
|
||||
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
|
||||
|
||||
# Send completion signal
|
||||
if watch:
|
||||
@@ -14,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
from changedetectionio import forms
|
||||
#
|
||||
if request.method == 'POST':
|
||||
# from changedetectionio import worker_pool
|
||||
# from changedetectionio import worker_handler
|
||||
|
||||
from changedetectionio.blueprint.imports.importer import (
|
||||
import_url_list,
|
||||
@@ -31,7 +31,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs")
|
||||
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
|
||||
# for uuid in importer_handler.new_uuids:
|
||||
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
|
||||
if len(importer_handler.remaining_data) == 0:
|
||||
return redirect(url_for('watchlist.index'))
|
||||
@@ -45,7 +45,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
|
||||
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
|
||||
# for uuid in importer_handler.new_uuids:
|
||||
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
|
||||
|
||||
# XLSX importer
|
||||
@@ -70,7 +70,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
|
||||
# for uuid in importer_handler.new_uuids:
|
||||
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
|
||||
|
||||
# Could be some remaining, or we could be on GET
|
||||
|
||||
@@ -4,7 +4,7 @@ from flask import Blueprint, flash, redirect, url_for
|
||||
from flask_login import login_required
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from queue import PriorityQueue
|
||||
|
||||
PRICE_DATA_TRACK_ACCEPT = 'accepted'
|
||||
@@ -20,7 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
|
||||
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
|
||||
datastore.data['watching'][uuid]['processor'] = 'restock_diff'
|
||||
datastore.data['watching'][uuid].clear_watch()
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
return redirect(url_for("watchlist.index"))
|
||||
|
||||
@login_required
|
||||
|
||||
@@ -37,8 +37,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
|
||||
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
|
||||
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
# Get the watch by UUID
|
||||
watch = datastore.data['watching'].get(uuid)
|
||||
if not watch:
|
||||
|
||||
@@ -83,7 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
# Adjust worker count if it changed
|
||||
if new_worker_count != old_worker_count:
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds
|
||||
|
||||
# Check CPU core availability and warn if worker count is high
|
||||
@@ -92,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format(
|
||||
new_worker_count, cpu_count), 'warning')
|
||||
|
||||
result = worker_pool.adjust_async_worker_count(
|
||||
result = worker_handler.adjust_async_worker_count(
|
||||
new_count=new_worker_count,
|
||||
update_q=update_q,
|
||||
notification_q=notification_q,
|
||||
|
||||
@@ -10,7 +10,7 @@ from changedetectionio.blueprint.ui.notification import construct_blueprint as c
|
||||
from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint
|
||||
from changedetectionio.blueprint.ui import diff, preview
|
||||
|
||||
def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
|
||||
def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
|
||||
from flask import request, flash
|
||||
|
||||
if op == 'delete':
|
||||
@@ -63,7 +63,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
|
||||
for uuid in uuids:
|
||||
if datastore.data['watching'].get(uuid):
|
||||
# Recheck and require a full reprocessing
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
if emit_flash:
|
||||
flash(gettext("{} watches queued for rechecking").format(len(uuids)))
|
||||
|
||||
@@ -114,7 +114,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
|
||||
for uuid in uuids:
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update):
|
||||
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
|
||||
|
||||
# Register the edit blueprint
|
||||
@@ -222,14 +222,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
@login_optionally_required
|
||||
def form_delete():
|
||||
uuid = request.args.get('uuid')
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
if uuid != 'all' and not uuid in datastore.data['watching'].keys():
|
||||
flash(gettext('The watch by UUID {} does not exist.').format(uuid), 'error')
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
datastore.delete(uuid)
|
||||
flash(gettext('Deleted.'))
|
||||
|
||||
@@ -239,14 +239,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
@login_optionally_required
|
||||
def form_clone():
|
||||
uuid = request.args.get('uuid')
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
new_uuid = datastore.clone(uuid)
|
||||
|
||||
if not datastore.data['watching'].get(uuid).get('paused'):
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
|
||||
|
||||
flash(gettext('Cloned, you are editing the new watch.'))
|
||||
|
||||
@@ -262,10 +262,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
|
||||
if uuid:
|
||||
# Single watch - check if already queued or running
|
||||
if worker_pool.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
|
||||
if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
|
||||
flash(gettext("Watch is already queued or being checked."))
|
||||
else:
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
flash(gettext("Queued 1 watch for rechecking."))
|
||||
else:
|
||||
# Multiple watches - first count how many need to be queued
|
||||
@@ -284,7 +284,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
if len(watches_to_queue) < 20:
|
||||
# Get already queued/running UUIDs once (efficient)
|
||||
queued_uuids = set(update_q.get_queued_uuids())
|
||||
running_uuids = set(worker_pool.get_running_uuids())
|
||||
running_uuids = set(worker_handler.get_running_uuids())
|
||||
|
||||
# Filter out watches that are already queued or running
|
||||
watches_to_queue_filtered = []
|
||||
@@ -294,7 +294,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
|
||||
# Queue only the filtered watches
|
||||
for watch_uuid in watches_to_queue_filtered:
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
|
||||
# Provide feedback about skipped watches
|
||||
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
|
||||
@@ -310,7 +310,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
# 20+ watches - queue in background thread to avoid blocking HTTP response
|
||||
# Capture queued/running state before background thread
|
||||
queued_uuids = set(update_q.get_queued_uuids())
|
||||
running_uuids = set(worker_pool.get_running_uuids())
|
||||
running_uuids = set(worker_handler.get_running_uuids())
|
||||
|
||||
def queue_watches_background():
|
||||
"""Background thread to queue watches - discarded after completion."""
|
||||
@@ -320,7 +320,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
for watch_uuid in watches_to_queue:
|
||||
# Check if already queued or running (state captured at start)
|
||||
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
|
||||
queued_count += 1
|
||||
else:
|
||||
skipped_count += 1
|
||||
@@ -349,7 +349,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
extra_data=extra_data,
|
||||
queuedWatchMetaData=queuedWatchMetaData,
|
||||
uuids=uuids,
|
||||
worker_pool=worker_pool,
|
||||
worker_handler=worker_handler,
|
||||
update_q=update_q,
|
||||
watch_check_update=watch_check_update,
|
||||
op=op,
|
||||
@@ -367,6 +367,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
|
||||
import json
|
||||
from copy import deepcopy
|
||||
|
||||
# more for testing
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
# copy it to memory as trim off what we dont need (history)
|
||||
watch = deepcopy(datastore.data['watching'].get(uuid))
|
||||
|
||||
@@ -83,6 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
If a processor doesn't have a difference module, falls back to text_json_diff.
|
||||
"""
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
@@ -143,10 +144,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
Each processor implements processors/{type}/extract.py::render_form()
|
||||
If a processor doesn't have an extract module, falls back to text_json_diff.
|
||||
"""
|
||||
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
try:
|
||||
watch = datastore.data['watching'][uuid]
|
||||
except KeyError:
|
||||
@@ -199,7 +200,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
Each processor implements processors/{type}/extract.py::process_extraction()
|
||||
If a processor doesn't have an extract module, falls back to text_json_diff.
|
||||
"""
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
@@ -266,7 +267,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
- /diff/{uuid}/processor-asset/after
|
||||
- /diff/{uuid}/processor-asset/rendered_diff
|
||||
"""
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
|
||||
@@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.auth_decorator import login_optionally_required
|
||||
from changedetectionio.time_handler import is_within_schedule
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
|
||||
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
|
||||
@@ -30,13 +30,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
from changedetectionio import processors
|
||||
import importlib
|
||||
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
# More for testing, possible to return the first/only
|
||||
if not datastore.data['watching'].keys():
|
||||
flash(gettext("No watches to edit"), "error")
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
if not uuid in datastore.data['watching']:
|
||||
flash(gettext("No watch with the UUID {} found.").format(uuid), "error")
|
||||
return redirect(url_for('watchlist.index'))
|
||||
@@ -282,7 +283,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
#############################
|
||||
if not datastore.data['watching'][uuid].get('paused') and is_in_schedule:
|
||||
# Queue the watch for immediate recheck, with a higher priority
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
|
||||
# Diff page [edit] link should go back to diff page
|
||||
if request.args.get("next") and request.args.get("next") == 'diff':
|
||||
@@ -313,7 +314,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
|
||||
|
||||
c = [f"processor-{watch.get('processor')}"]
|
||||
if worker_pool.is_watch_running(uuid):
|
||||
if worker_handler.is_watch_running(uuid):
|
||||
c.append('checking-now')
|
||||
|
||||
template_args = {
|
||||
@@ -370,8 +371,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
from flask import send_file
|
||||
import brotli
|
||||
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
watch = datastore.data['watching'].get(uuid)
|
||||
if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir):
|
||||
latest_filename = list(watch.history.keys())[-1]
|
||||
@@ -396,9 +395,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
def watch_get_preview_rendered(uuid):
|
||||
'''For when viewing the "preview" of the rendered text from inside of Edit'''
|
||||
from flask import jsonify
|
||||
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
from changedetectionio.processors.text_json_diff import prepare_filter_prevew
|
||||
result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore)
|
||||
return jsonify(result)
|
||||
|
||||
@@ -26,9 +26,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
Each processor implements processors/{type}/preview.py::render()
|
||||
If a processor doesn't have a preview module, falls back to default text preview.
|
||||
"""
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
try:
|
||||
watch = datastore.data['watching'][uuid]
|
||||
except KeyError:
|
||||
@@ -149,8 +150,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
"""
|
||||
from flask import make_response
|
||||
|
||||
# More for testing, possible to return the first/only
|
||||
if uuid == 'first':
|
||||
uuid = list(datastore.data['watching'].keys()).pop()
|
||||
|
||||
try:
|
||||
watch = datastore.data['watching'][uuid]
|
||||
except KeyError:
|
||||
|
||||
@@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, url_for, flash
|
||||
from flask_babel import gettext
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.auth_decorator import login_optionally_required
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update):
|
||||
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag')))
|
||||
else:
|
||||
# Straight into the queue.
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
|
||||
flash(gettext("Watch added."))
|
||||
|
||||
return redirect(url_for('watchlist.index', tag=request.args.get('tag','')))
|
||||
|
||||
@@ -14,7 +14,7 @@ from pathlib import Path
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from threading import Event
|
||||
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
from flask import (
|
||||
Flask,
|
||||
@@ -195,7 +195,7 @@ def _jinja2_filter_format_number_locale(value: float) -> str:
|
||||
|
||||
@app.template_global('is_checking_now')
|
||||
def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"):
|
||||
return worker_pool.is_watch_running(watch_obj['uuid'])
|
||||
return worker_handler.is_watch_running(watch_obj['uuid'])
|
||||
|
||||
@app.template_global('get_watch_queue_position')
|
||||
def _get_watch_queue_position(watch_obj):
|
||||
@@ -206,13 +206,13 @@ def _get_watch_queue_position(watch_obj):
|
||||
@app.template_global('get_current_worker_count')
|
||||
def _get_current_worker_count():
|
||||
"""Get the current number of operational workers"""
|
||||
return worker_pool.get_worker_count()
|
||||
return worker_handler.get_worker_count()
|
||||
|
||||
@app.template_global('get_worker_status_info')
|
||||
def _get_worker_status_info():
|
||||
"""Get detailed worker status information for display"""
|
||||
status = worker_pool.get_worker_status()
|
||||
running_uuids = worker_pool.get_running_uuids()
|
||||
status = worker_handler.get_worker_status()
|
||||
running_uuids = worker_handler.get_running_uuids()
|
||||
|
||||
return {
|
||||
'count': status['worker_count'],
|
||||
@@ -801,7 +801,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
# watchlist UI buttons etc
|
||||
import changedetectionio.blueprint.ui as ui
|
||||
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
|
||||
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update))
|
||||
|
||||
import changedetectionio.blueprint.watchlist as watchlist
|
||||
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
|
||||
@@ -838,10 +838,10 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
|
||||
# Get basic status
|
||||
status = worker_pool.get_worker_status()
|
||||
status = worker_handler.get_worker_status()
|
||||
|
||||
# Perform health check
|
||||
health_result = worker_pool.check_worker_health(
|
||||
health_result = worker_handler.check_worker_health(
|
||||
expected_count=expected_workers,
|
||||
update_q=update_q,
|
||||
notification_q=notification_q,
|
||||
@@ -905,24 +905,14 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# Can be overridden by ENV or use the default settings
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
logger.info(f"Starting {n_workers} workers during app initialization")
|
||||
worker_pool.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
# Skip background threads in batch mode (just process queue and exit)
|
||||
batch_mode = app.config.get('batch_mode', False)
|
||||
if not batch_mode:
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
|
||||
# Start configurable number of notification workers (default 1)
|
||||
notification_workers = int(os.getenv("NOTIFICATION_WORKERS", "1"))
|
||||
for i in range(notification_workers):
|
||||
threading.Thread(
|
||||
target=notification_runner,
|
||||
args=(i,),
|
||||
daemon=True,
|
||||
name=f"NotificationRunner-{i}"
|
||||
).start()
|
||||
logger.info(f"Started {notification_workers} notification worker(s)")
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
@@ -964,14 +954,14 @@ def check_for_new_version():
|
||||
app.config.exit.wait(86400)
|
||||
|
||||
|
||||
def notification_runner(worker_id=0):
|
||||
def notification_runner():
|
||||
global notification_debug_log
|
||||
from datetime import datetime
|
||||
import json
|
||||
with app.app_context():
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
# Multiple workers can run concurrently (configurable via NOTIFICATION_WORKERS)
|
||||
# At the moment only one thread runs (single runner)
|
||||
n_object = notification_q.get(block=False)
|
||||
except queue.Empty:
|
||||
app.config.exit.wait(1)
|
||||
@@ -997,7 +987,7 @@ def notification_runner(worker_id=0):
|
||||
sent_obj = process_notification(n_object, datastore)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Notification worker {worker_id} - Watch URL: {n_object['watch_url']} Error {str(e)}")
|
||||
logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}")
|
||||
|
||||
# UUID wont be present when we submit a 'test' from the global settings
|
||||
if 'uuid' in n_object:
|
||||
@@ -1038,7 +1028,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
now = time.time()
|
||||
if now - last_health_check > 60:
|
||||
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
health_result = worker_pool.check_worker_health(
|
||||
health_result = worker_handler.check_worker_health(
|
||||
expected_count=expected_workers,
|
||||
update_q=update_q,
|
||||
notification_q=notification_q,
|
||||
@@ -1057,7 +1047,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
continue
|
||||
|
||||
# Get a list of watches by UUID that are currently fetching data
|
||||
running_uuids = worker_pool.get_running_uuids()
|
||||
running_uuids = worker_handler.get_running_uuids()
|
||||
|
||||
# Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch
|
||||
queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue}
|
||||
@@ -1163,7 +1153,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
priority = int(time.time())
|
||||
|
||||
# Into the queue with you
|
||||
queued_successfully = worker_pool.queue_item_async_safe(update_q,
|
||||
queued_successfully = worker_handler.queue_item_async_safe(update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=priority,
|
||||
item={'uuid': uuid})
|
||||
)
|
||||
|
||||
@@ -5,52 +5,51 @@ import heapq
|
||||
import queue
|
||||
import threading
|
||||
|
||||
# Janus is no longer required - we use pure threading.Queue for multi-loop support
|
||||
# try:
|
||||
# import janus
|
||||
# except ImportError:
|
||||
# pass # Not needed anymore
|
||||
try:
|
||||
import janus
|
||||
except ImportError:
|
||||
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
|
||||
raise
|
||||
|
||||
|
||||
class RecheckPriorityQueue:
|
||||
"""
|
||||
Thread-safe priority queue supporting multiple async event loops.
|
||||
|
||||
ARCHITECTURE:
|
||||
- Multiple async workers, each with its own event loop in its own thread
|
||||
- One shared queue accessed safely via threading primitives
|
||||
- Workers use run_in_executor to access queue without blocking their event loop
|
||||
|
||||
IMPLEMENTATION:
|
||||
- Pure threading.Queue for notifications (no event loop binding)
|
||||
- Heapq-based priority storage (thread-safe with RLock)
|
||||
- Async methods wrap sync methods via run_in_executor
|
||||
- Supports both sync and async access patterns
|
||||
|
||||
WHY NOT JANUS:
|
||||
- Janus binds to ONE event loop at creation time
|
||||
- Our architecture has 15+ workers, each with separate event loops
|
||||
- Workers in different threads/loops cannot share janus async interface
|
||||
- Pure threading approach works across all event loops
|
||||
Ultra-reliable priority queue using janus for async/sync bridging.
|
||||
|
||||
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
|
||||
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
|
||||
- async_q: Used by async workers (the actual fetchers/processors) and coroutines
|
||||
|
||||
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
|
||||
- Synchronous code (Flask, threads) cannot use async methods without blocking
|
||||
- Async code cannot use sync methods without blocking the event loop
|
||||
- janus provides the only safe bridge between these two worlds
|
||||
|
||||
Attempting to unify to async-only would require:
|
||||
- Converting all Flask routes to async (major breaking change)
|
||||
- Using asyncio.run() in sync contexts (causes deadlocks)
|
||||
- Thread-pool wrapping (adds complexity and overhead)
|
||||
|
||||
Minimal implementation focused on reliability:
|
||||
- Pure janus for sync/async bridge
|
||||
- Thread-safe priority ordering
|
||||
- Bulletproof error handling with critical logging
|
||||
"""
|
||||
|
||||
def __init__(self, maxsize: int = 0):
|
||||
try:
|
||||
# Use pure threading.Queue for notification - no janus needed
|
||||
# This avoids event loop binding issues with multiple worker threads
|
||||
import asyncio
|
||||
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
|
||||
|
||||
self._janus_queue = janus.Queue(maxsize=maxsize)
|
||||
# BOTH interfaces required - see class docstring for why
|
||||
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread
|
||||
self.async_q = self._janus_queue.async_q # Async workers
|
||||
|
||||
# Priority storage - thread-safe
|
||||
self._priority_items = []
|
||||
self._lock = threading.RLock()
|
||||
|
||||
# Condition variable for async wait support
|
||||
self._condition = threading.Condition(self._lock)
|
||||
|
||||
|
||||
# Signals for UI updates
|
||||
self.queue_length_signal = signal('queue_length')
|
||||
|
||||
|
||||
logger.debug("RecheckPriorityQueue initialized successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
|
||||
@@ -60,23 +59,24 @@ class RecheckPriorityQueue:
|
||||
def put(self, item, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync put with priority ordering"""
|
||||
try:
|
||||
# Add to priority storage and notify waiters
|
||||
with self._condition:
|
||||
# Add to priority storage
|
||||
with self._lock:
|
||||
heapq.heappush(self._priority_items, item)
|
||||
self._notification_queue.put(True, block=False) # Notification only
|
||||
self._condition.notify_all() # Wake up any async waiters
|
||||
|
||||
|
||||
# Notify via janus sync queue
|
||||
self.sync_q.put(True, block=block, timeout=timeout)
|
||||
|
||||
# Emit signals
|
||||
self._emit_put_signals(item)
|
||||
|
||||
|
||||
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
|
||||
return True
|
||||
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
|
||||
# Remove from priority storage if put failed
|
||||
# Remove from priority storage if janus put failed
|
||||
try:
|
||||
with self._condition:
|
||||
with self._lock:
|
||||
if item in self._priority_items:
|
||||
self._priority_items.remove(item)
|
||||
heapq.heapify(self._priority_items)
|
||||
@@ -86,10 +86,10 @@ class RecheckPriorityQueue:
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get with priority ordering"""
|
||||
import queue as queue_module
|
||||
import queue
|
||||
try:
|
||||
# Wait for notification (this doesn't return the actual item, just signals availability)
|
||||
self._notification_queue.get(block=block, timeout=timeout)
|
||||
# Wait for notification
|
||||
self.sync_q.get(block=block, timeout=timeout)
|
||||
|
||||
# Get highest priority item
|
||||
with self._lock:
|
||||
@@ -104,53 +104,57 @@ class RecheckPriorityQueue:
|
||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||
return item
|
||||
|
||||
except queue_module.Empty:
|
||||
except queue.Empty:
|
||||
# Queue is empty with timeout - expected behavior, re-raise without logging
|
||||
raise # noqa
|
||||
raise
|
||||
except Exception as e:
|
||||
# Re-raise without logging - caller (worker) will handle and log appropriately
|
||||
raise
|
||||
|
||||
# ASYNC INTERFACE (for workers)
|
||||
async def async_put(self, item, executor=None):
|
||||
"""Async put with priority ordering - uses thread pool to avoid blocking
|
||||
|
||||
Args:
|
||||
item: Item to add to queue
|
||||
executor: Optional ThreadPoolExecutor. If None, uses default pool.
|
||||
"""
|
||||
import asyncio
|
||||
async def async_put(self, item):
|
||||
"""Pure async put with priority ordering"""
|
||||
try:
|
||||
# Use run_in_executor to call sync put without blocking event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
result = await loop.run_in_executor(
|
||||
executor, # Use provided executor or default
|
||||
lambda: self.put(item, block=True, timeout=5.0)
|
||||
)
|
||||
|
||||
# Add to priority storage
|
||||
with self._lock:
|
||||
heapq.heappush(self._priority_items, item)
|
||||
|
||||
# Notify via janus async queue
|
||||
await self.async_q.put(True)
|
||||
|
||||
# Emit signals
|
||||
self._emit_put_signals(item)
|
||||
|
||||
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
|
||||
return result
|
||||
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
|
||||
# Remove from priority storage if janus put failed
|
||||
try:
|
||||
with self._lock:
|
||||
if item in self._priority_items:
|
||||
self._priority_items.remove(item)
|
||||
heapq.heapify(self._priority_items)
|
||||
except Exception as cleanup_e:
|
||||
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
|
||||
return False
|
||||
|
||||
async def async_get(self, executor=None):
|
||||
"""Async get with priority ordering - uses thread pool to avoid blocking
|
||||
|
||||
Args:
|
||||
executor: Optional ThreadPoolExecutor. If None, uses default pool.
|
||||
With many workers (30+), pass custom executor scaled to worker count.
|
||||
"""
|
||||
import asyncio
|
||||
async def async_get(self):
|
||||
"""Pure async get with priority ordering"""
|
||||
try:
|
||||
# Use run_in_executor to call sync get without blocking event loop
|
||||
# This works across multiple event loops since it uses threading
|
||||
loop = asyncio.get_event_loop()
|
||||
item = await loop.run_in_executor(
|
||||
executor, # Use provided executor (scales with FETCH_WORKERS) or default
|
||||
lambda: self.get(block=True, timeout=1.0)
|
||||
)
|
||||
# Wait for notification
|
||||
await self.async_q.get()
|
||||
|
||||
# Get highest priority item
|
||||
with self._lock:
|
||||
if not self._priority_items:
|
||||
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
|
||||
raise Exception("Priority queue inconsistency")
|
||||
item = heapq.heappop(self._priority_items)
|
||||
|
||||
# Emit signals
|
||||
self._emit_get_signals()
|
||||
|
||||
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
|
||||
return item
|
||||
@@ -183,9 +187,9 @@ class RecheckPriorityQueue:
|
||||
return []
|
||||
|
||||
def close(self):
|
||||
"""Close the queue"""
|
||||
"""Close the janus queue"""
|
||||
try:
|
||||
# Nothing to close for threading.Queue
|
||||
self._janus_queue.close()
|
||||
logger.debug("RecheckPriorityQueue closed successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
|
||||
@@ -359,11 +363,12 @@ class NotificationQueue:
|
||||
|
||||
def __init__(self, maxsize: int = 0, datastore=None):
|
||||
try:
|
||||
# Use pure threading.Queue to avoid event loop binding issues
|
||||
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
|
||||
self._janus_queue = janus.Queue(maxsize=maxsize)
|
||||
# BOTH interfaces required - see class docstring for why
|
||||
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
|
||||
self.async_q = self._janus_queue.async_q # Async workers
|
||||
self.notification_event_signal = signal('notification_event')
|
||||
self.datastore = datastore # For checking all_muted setting
|
||||
self._lock = threading.RLock()
|
||||
logger.debug("NotificationQueue initialized successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
|
||||
@@ -381,8 +386,7 @@ class NotificationQueue:
|
||||
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
|
||||
return False
|
||||
|
||||
with self._lock:
|
||||
self._notification_queue.put(item, block=block, timeout=timeout)
|
||||
self.sync_q.put(item, block=block, timeout=timeout)
|
||||
self._emit_notification_signal(item)
|
||||
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
|
||||
return True
|
||||
@@ -390,49 +394,36 @@ class NotificationQueue:
|
||||
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
||||
return False
|
||||
|
||||
async def async_put(self, item: Dict[str, Any], executor=None):
|
||||
"""Async put with signal emission - uses thread pool
|
||||
|
||||
Args:
|
||||
item: Notification item to queue
|
||||
executor: Optional ThreadPoolExecutor
|
||||
"""
|
||||
import asyncio
|
||||
async def async_put(self, item: Dict[str, Any]):
|
||||
"""Pure async put with signal emission"""
|
||||
try:
|
||||
# Check if all notifications are muted
|
||||
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
|
||||
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
|
||||
return False
|
||||
|
||||
loop = asyncio.get_event_loop()
|
||||
await loop.run_in_executor(executor, lambda: self.put(item, block=True, timeout=5.0))
|
||||
await self.async_q.put(item)
|
||||
self._emit_notification_signal(item)
|
||||
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
|
||||
return False
|
||||
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get"""
|
||||
try:
|
||||
with self._lock:
|
||||
return self._notification_queue.get(block=block, timeout=timeout)
|
||||
return self.sync_q.get(block=block, timeout=timeout)
|
||||
except queue.Empty as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
|
||||
raise e
|
||||
|
||||
async def async_get(self, executor=None):
|
||||
"""Async get - uses thread pool
|
||||
|
||||
Args:
|
||||
executor: Optional ThreadPoolExecutor
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
async def async_get(self):
|
||||
"""Pure async get"""
|
||||
try:
|
||||
loop = asyncio.get_event_loop()
|
||||
return await loop.run_in_executor(executor, lambda: self.get(block=True, timeout=1.0))
|
||||
return await self.async_q.get()
|
||||
except queue.Empty as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
@@ -442,20 +433,19 @@ class NotificationQueue:
|
||||
def qsize(self) -> int:
|
||||
"""Get current queue size"""
|
||||
try:
|
||||
with self._lock:
|
||||
return self._notification_queue.qsize()
|
||||
return self.sync_q.qsize()
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
|
||||
return 0
|
||||
|
||||
|
||||
def empty(self) -> bool:
|
||||
"""Check if queue is empty"""
|
||||
return self.qsize() == 0
|
||||
|
||||
|
||||
def close(self):
|
||||
"""Close the queue"""
|
||||
"""Close the janus queue"""
|
||||
try:
|
||||
# Nothing to close for threading.Queue
|
||||
self._janus_queue.close()
|
||||
logger.debug("NotificationQueue closed successfully")
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
|
||||
|
||||
@@ -37,9 +37,9 @@ def register_watch_operation_handlers(socketio, datastore):
|
||||
# Import here to avoid circular imports
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
logger.info(f"Socket.IO: Queued recheck for watch {uuid}")
|
||||
else:
|
||||
emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'})
|
||||
|
||||
@@ -145,10 +145,10 @@ def handle_watch_update(socketio, **kwargs):
|
||||
# Emit the watch update to all connected clients
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio.flask_app import _jinja2_filter_datetime
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
# Get list of watches that are currently running
|
||||
running_uuids = worker_pool.get_running_uuids()
|
||||
running_uuids = worker_handler.get_running_uuids()
|
||||
|
||||
# Get list of watches in the queue (efficient single-lock method)
|
||||
queue_list = update_q.get_queued_uuids()
|
||||
@@ -252,7 +252,7 @@ def init_socketio(app, datastore):
|
||||
def event_checkbox_operations(data):
|
||||
from changedetectionio.blueprint.ui import _handle_operations
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.flask_app import update_q, watch_check_update
|
||||
import threading
|
||||
|
||||
@@ -268,7 +268,7 @@ def init_socketio(app, datastore):
|
||||
uuids=data.get('uuids'),
|
||||
datastore=datastore,
|
||||
extra_data=data.get('extra_data'),
|
||||
worker_pool=worker_pool,
|
||||
worker_handler=worker_handler,
|
||||
update_q=update_q,
|
||||
queuedWatchMetaData=queuedWatchMetaData,
|
||||
watch_check_update=watch_check_update,
|
||||
|
||||
@@ -66,7 +66,7 @@ echo "-------------------- Running rest of tests in parallel -------------------
|
||||
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false \
|
||||
pytest tests/test_*.py \
|
||||
-n 16 \
|
||||
-n 30 \
|
||||
--dist=load \
|
||||
-vvv \
|
||||
-s \
|
||||
|
||||
@@ -166,9 +166,6 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
"""
|
||||
logger.info(f"Datastore path is '{datastore_path}'")
|
||||
|
||||
# CRITICAL: Update datastore_path (was using old path from __init__)
|
||||
self.datastore_path = datastore_path
|
||||
|
||||
# Initialize data structure
|
||||
self.__data = App.model()
|
||||
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
|
||||
@@ -607,6 +604,19 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
|
||||
return None
|
||||
|
||||
# Check PAGE_WATCH_LIMIT if set
|
||||
page_watch_limit = os.getenv('PAGE_WATCH_LIMIT')
|
||||
if page_watch_limit:
|
||||
try:
|
||||
page_watch_limit = int(page_watch_limit)
|
||||
current_watch_count = len(self.__data['watching'])
|
||||
if current_watch_count >= page_watch_limit:
|
||||
logger.error(f"Watch limit reached: {current_watch_count}/{page_watch_limit} watches. Cannot add {url}")
|
||||
flash(gettext("Watch limit reached ({}/{} watches). Cannot add more watches.").format(current_watch_count, page_watch_limit), 'error')
|
||||
return None
|
||||
except ValueError:
|
||||
logger.warning(f"Invalid PAGE_WATCH_LIMIT value: {page_watch_limit}, ignoring limit check")
|
||||
|
||||
if tag and type(tag) == str:
|
||||
# Then it's probably a string of the actual tag by name, split and add it
|
||||
for t in tag.split(','):
|
||||
|
||||
@@ -165,57 +165,6 @@ def prepare_test_function(live_server, datastore_path):
|
||||
except:
|
||||
break
|
||||
|
||||
# Add test helper methods to the app for worker management
|
||||
def set_workers(count):
|
||||
"""Set the number of workers for testing - brutal shutdown, no delays"""
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio.flask_app import update_q, notification_q
|
||||
|
||||
current_count = worker_pool.get_worker_count()
|
||||
|
||||
# Special case: Setting to 0 means shutdown all workers brutally
|
||||
if count == 0:
|
||||
logger.debug(f"Brutally shutting down all {current_count} workers")
|
||||
worker_pool.shutdown_workers()
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Shutdown all {current_count} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': 0
|
||||
}
|
||||
|
||||
# Adjust worker count (no delays, no verification)
|
||||
result = worker_pool.adjust_async_worker_count(
|
||||
count,
|
||||
update_q=update_q,
|
||||
notification_q=notification_q,
|
||||
app=live_server.app,
|
||||
datastore=datastore
|
||||
)
|
||||
|
||||
return result
|
||||
|
||||
def check_all_workers_alive(expected_count):
|
||||
"""Check that all expected workers are alive"""
|
||||
from changedetectionio import worker_pool
|
||||
from changedetectionio.flask_app import update_q, notification_q
|
||||
result = worker_pool.check_worker_health(
|
||||
expected_count,
|
||||
update_q=update_q,
|
||||
notification_q=notification_q,
|
||||
app=live_server.app,
|
||||
datastore=datastore
|
||||
)
|
||||
assert result['status'] == 'healthy', f"Workers not healthy: {result['message']}"
|
||||
return result
|
||||
|
||||
# Attach helper methods to app for easy test access
|
||||
live_server.app.set_workers = set_workers
|
||||
live_server.app.check_all_workers_alive = check_all_workers_alive
|
||||
|
||||
|
||||
|
||||
|
||||
# Prevent background thread from writing during cleanup/reload
|
||||
datastore.needs_write = False
|
||||
datastore.needs_write_urgent = False
|
||||
@@ -313,8 +262,8 @@ def app(request, datastore_path):
|
||||
|
||||
# Shutdown workers gracefully before loguru cleanup
|
||||
try:
|
||||
from changedetectionio import worker_pool
|
||||
worker_pool.shutdown_workers()
|
||||
from changedetectionio import worker_handler
|
||||
worker_handler.shutdown_workers()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks, delete_all_watches
|
||||
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
|
||||
import os
|
||||
|
||||
|
||||
@@ -116,7 +116,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
|
||||
# And not this cause its not the ld-json
|
||||
assert b"So let's see what happens" not in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
##########################################################################################
|
||||
# And we shouldnt see the offer
|
||||
@@ -131,7 +131,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
|
||||
assert b'ldjson-price-track-offer' not in res.data
|
||||
|
||||
##########################################################################################
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
|
||||
@@ -147,7 +147,7 @@ def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_
|
||||
|
||||
|
||||
##########################################################################################
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_bad_ldjson_is_correctly_ignored(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -414,4 +414,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
|
||||
assert b'Abonnementen bijwerken' in res.data
|
||||
assert b'<foobar' not in res.data
|
||||
|
||||
res = delete_all_watches(client)
|
||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
@@ -6,7 +6,7 @@ from .util import (
|
||||
set_original_response,
|
||||
set_modified_response,
|
||||
live_server_setup,
|
||||
wait_for_all_checks, delete_all_watches
|
||||
wait_for_all_checks
|
||||
)
|
||||
from loguru import logger
|
||||
|
||||
@@ -104,7 +104,7 @@ def run_socketio_watch_update_test(client, live_server, password_mode="", datast
|
||||
assert watch.has_unviewed, "The watch was not marked as unviewed after content change"
|
||||
|
||||
# Clean up
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
def test_everything(live_server, client, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
@@ -166,8 +166,7 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage, datastore_path
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path):
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
@@ -82,7 +82,7 @@ def test_import_distillio(client, live_server, measure_memory_usage, datastore_p
|
||||
|
||||
# Give the endpoint time to spin up
|
||||
time.sleep(1)
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={
|
||||
|
||||
@@ -224,7 +224,6 @@ def check_json_filter(json_filter, client, live_server, datastore_path):
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
|
||||
|
||||
delete_all_watches(client)
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
|
||||
import logging
|
||||
|
||||
def test_check_notification_error_handling(client, live_server, measure_memory_usage, datastore_path):
|
||||
@@ -81,4 +81,4 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
|
||||
os.unlink(os.path.join(datastore_path, "notification.txt"))
|
||||
assert 'xxxxx' in notification_submission
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
@@ -1,52 +0,0 @@
|
||||
import os
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
|
||||
from ..notification import valid_notification_formats
|
||||
from loguru import logger
|
||||
|
||||
def test_queue_system(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""Test that multiple workers can process queue concurrently without blocking each other"""
|
||||
# (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)).
|
||||
items = os.cpu_count() +3
|
||||
delay = 10
|
||||
# Auto-queue is off here.
|
||||
live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True
|
||||
|
||||
test_urls = [
|
||||
f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}"
|
||||
for i in range(0, items)
|
||||
]
|
||||
|
||||
# Import 30 URLs to queue
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": "\r\n".join(test_urls)},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert f"{items} Imported".encode('utf-8') in res.data
|
||||
|
||||
client.application.set_workers(items)
|
||||
|
||||
start = time.time()
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
time.sleep(delay/2)
|
||||
|
||||
# Verify all workers are idle (no UUIDs being processed)
|
||||
from changedetectionio import worker_pool
|
||||
running_uuids = worker_pool.get_running_uuids()
|
||||
logger.debug( f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}")
|
||||
assert len(running_uuids) != 0, f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}"
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too)
|
||||
total_time = (time.time() - start)
|
||||
logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total")
|
||||
# if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds
|
||||
assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s"
|
||||
|
||||
# Verify all workers are idle (no UUIDs being processed)
|
||||
from changedetectionio import worker_pool
|
||||
running_uuids = worker_pool.get_running_uuids()
|
||||
assert len(running_uuids) == 0, f"Expected all workers to be idle, but {len(running_uuids)} UUIDs still being processed: {running_uuids}"
|
||||
@@ -107,7 +107,7 @@ def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path
|
||||
assert b"Access denied, bad token" not in res.data
|
||||
assert b"Random content" in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
@@ -366,7 +366,7 @@ def test_check_with_prefix_include_filters(client, live_server, measure_memory_u
|
||||
assert b"Some text thats the same" in res.data # in selector
|
||||
assert b"Some text that will change" not in res.data # not in selector
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_various_rules(client, live_server, measure_memory_usage, datastore_path):
|
||||
@@ -423,7 +423,7 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
data={"include_filters": "//*[contains(@class, 'sametext')]|//*[contains(@class, 'changetext')]",
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
@@ -437,14 +437,14 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
url_for("ui.ui_preview.preview_page", uuid=uuid),
|
||||
url_for("ui.ui_preview.preview_page", uuid="first"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Some text thats the same" in res.data # in selector
|
||||
assert b"Some text that will change" in res.data # in selector
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_xpath_20_function_count(client, live_server, measure_memory_usage, datastore_path):
|
||||
@@ -477,7 +477,7 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage, data
|
||||
|
||||
assert b"246913579975308642" in res.data # in selector
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_xpath_20_function_count2(client, live_server, measure_memory_usage, datastore_path):
|
||||
@@ -501,8 +501,6 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
|
||||
)
|
||||
|
||||
assert b"Updated watch." in res.data
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
@@ -512,7 +510,7 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
|
||||
|
||||
assert b"246913579975308642" in res.data # in selector
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def test_xpath_20_function_string_join_matches(client, live_server, measure_memory_usage, datastore_path):
|
||||
@@ -546,7 +544,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
|
||||
|
||||
assert b"Some text thats the samespecialconjunctionSome text that will change" in res.data # in selector
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
|
||||
def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
|
||||
@@ -584,7 +582,7 @@ def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
|
||||
assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'"
|
||||
assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target
|
||||
|
||||
delete_all_watches(client)
|
||||
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
# Be sure all-in-the-wild types of RSS feeds work with xpath
|
||||
def test_rss_xpath(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -135,25 +135,18 @@ def delete_all_watches(client=None):
|
||||
uuids = list(client.application.config.get('DATASTORE').data['watching'])
|
||||
for uuid in uuids:
|
||||
client.application.config.get('DATASTORE').delete(uuid)
|
||||
from changedetectionio.flask_app import update_q
|
||||
|
||||
# Clear the queue to prevent leakage to next test
|
||||
while not update_q.empty():
|
||||
try:
|
||||
update_q.get_nowait()
|
||||
except:
|
||||
break
|
||||
|
||||
def wait_for_all_checks(client=None):
|
||||
"""
|
||||
Waits until the queue is empty and workers are idle.
|
||||
Delegates to worker_pool.wait_for_all_checks for shared logic.
|
||||
Delegates to worker_handler.wait_for_all_checks for shared logic.
|
||||
"""
|
||||
from changedetectionio.flask_app import update_q as global_update_q
|
||||
from changedetectionio import worker_pool
|
||||
time.sleep(0.25)
|
||||
# Use the shared wait logic from worker_pool
|
||||
return worker_pool.wait_for_all_checks(global_update_q, timeout=150)
|
||||
from changedetectionio import worker_handler
|
||||
time.sleep(0.05)
|
||||
# Use the shared wait logic from worker_handler
|
||||
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
|
||||
|
||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||
"""
|
||||
|
||||
@@ -91,8 +91,7 @@ class WorkerThread:
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the worker thread brutally - no waiting"""
|
||||
# Try to stop the event loop if it exists
|
||||
"""Stop the worker thread"""
|
||||
if self.loop and self.running:
|
||||
try:
|
||||
# Signal the loop to stop
|
||||
@@ -100,7 +99,8 @@ class WorkerThread:
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
# Don't wait - thread is daemon and will die when needed
|
||||
if self.thread and self.thread.is_alive():
|
||||
self.thread.join(timeout=2.0)
|
||||
|
||||
|
||||
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
@@ -125,7 +125,7 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
|
||||
"""Start a single async worker with auto-restart capability"""
|
||||
from changedetectionio.worker import async_update_worker
|
||||
from changedetectionio.async_update_worker import async_update_worker
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
@@ -337,36 +337,24 @@ def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
|
||||
def shutdown_workers():
|
||||
"""Shutdown all async workers brutally - no delays, no waiting"""
|
||||
global worker_threads, queue_executor
|
||||
"""Shutdown all async workers fast and aggressively"""
|
||||
global worker_threads
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Brutal shutdown of async workers initiated...")
|
||||
logger.info("Fast shutdown of async workers initiated...")
|
||||
|
||||
# Stop all worker event loops
|
||||
# Stop all worker threads
|
||||
for worker in worker_threads:
|
||||
worker.stop()
|
||||
|
||||
# Clear immediately - threads are daemon and will die
|
||||
worker_threads.clear()
|
||||
|
||||
# Shutdown the queue executor to prevent "cannot schedule new futures after shutdown" errors
|
||||
# This must happen AFTER workers are stopped to avoid race conditions
|
||||
if queue_executor:
|
||||
try:
|
||||
queue_executor.shutdown(wait=False)
|
||||
if not in_pytest:
|
||||
logger.debug("Queue executor shut down")
|
||||
except Exception as e:
|
||||
if not in_pytest:
|
||||
logger.warning(f"Error shutting down queue executor: {e}")
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Async workers brutal shutdown complete")
|
||||
logger.info("Async workers fast shutdown complete")
|
||||
|
||||
|
||||
|
||||
@@ -8,7 +8,7 @@ flask-paginate
|
||||
flask_expects_json~=1.7
|
||||
flask_restful
|
||||
flask_cors # For the Chrome extension to operate
|
||||
# janus # No longer needed - using pure threading.Queue for multi-loop support
|
||||
janus # Thread-safe async/sync queue bridge
|
||||
flask_wtf~=1.2
|
||||
flask~=3.1
|
||||
flask-socketio~=5.6.0
|
||||
|
||||
Reference in New Issue
Block a user