Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
71eb1daf70 Ability to limit total number of watches with env var PAGE_WATCH_LIMIT 2026-02-02 11:12:25 +01:00
33 changed files with 257 additions and 393 deletions

View File

@@ -102,8 +102,8 @@ def sigshutdown_handler(_signo, _stack_frame):
# Shutdown workers and queues immediately
try:
from changedetectionio import worker_pool
worker_pool.shutdown_workers()
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
except Exception as e:
logger.error(f"Error shutting down workers: {str(e)}")
@@ -415,12 +415,12 @@ def main():
# This must happen AFTER app initialization so update_q is available
if batch_mode and added_watch_uuids:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_pool
from changedetectionio import queuedWatchMetaData, worker_handler
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
for watch_uuid in added_watch_uuids:
try:
worker_pool.queue_item_async_safe(
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -432,7 +432,7 @@ def main():
# This must happen AFTER app initialization so update_q is available
if recheck_watches is not None:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_pool
from changedetectionio import queuedWatchMetaData, worker_handler
watches_to_queue = []
if recheck_watches == 'all':
@@ -454,7 +454,7 @@ def main():
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_pool.queue_item_async_safe(
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -516,7 +516,7 @@ def main():
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_pool.queue_item_async_safe(
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -549,7 +549,7 @@ def main():
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
# Use the shared wait_for_all_checks function
completed = worker_pool.wait_for_all_checks(update_q, timeout=300)
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
if not completed:
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")

View File

@@ -1,5 +1,5 @@
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from flask_expects_json import expects_json
from flask_restful import abort, Resource
from loguru import logger
@@ -42,7 +42,7 @@ class Tag(Resource):
# If less than 20 watches, queue synchronously for immediate feedback
if len(watches_to_queue) < 20:
for watch_uuid in watches_to_queue:
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200
else:
# 20+ watches - queue in background thread to avoid blocking API response
@@ -50,7 +50,7 @@ class Tag(Resource):
"""Background thread to queue watches - discarded after completion."""
try:
for watch_uuid in watches_to_queue:
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued")
except Exception as e:
logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}")

View File

@@ -6,7 +6,7 @@ from changedetectionio.favicon_utils import get_favicon_mime_type
from . import auth
from changedetectionio import queuedWatchMetaData, strtobool
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from flask import request, make_response, send_from_directory
from flask_expects_json import expects_json
from flask_restful import abort, Resource
@@ -85,7 +85,7 @@ class Watch(Resource):
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'):
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return "OK", 200
if request.args.get('paused', '') == 'paused':
self.datastore.data['watching'].get(uuid).pause()
@@ -477,9 +477,19 @@ class CreateWatch(Resource):
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
if new_uuid:
# Dont queue because the scheduler will check that it hasnt been checked before anyway
# worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
return {'uuid': new_uuid}, 201
else:
# Check if it was a limit issue
page_watch_limit = os.getenv('PAGE_WATCH_LIMIT')
if page_watch_limit:
try:
page_watch_limit = int(page_watch_limit)
current_watch_count = len(self.datastore.data['watching'])
if current_watch_count >= page_watch_limit:
return f"Watch limit reached ({current_watch_count}/{page_watch_limit} watches). Cannot add more watches.", 429
except ValueError:
pass
return "Invalid or unsupported URL", 400
@auth.check_token
@@ -514,7 +524,7 @@ class CreateWatch(Resource):
if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_pool.get_running_uuids())
running_uuids = set(worker_handler.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = [
@@ -524,7 +534,7 @@ class CreateWatch(Resource):
# Queue only the filtered watches
for uuid in watches_to_queue_filtered:
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -536,7 +546,7 @@ class CreateWatch(Resource):
# 20+ watches - queue in background thread to avoid blocking API response
# Capture queued/running state before background thread
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_pool.get_running_uuids())
running_uuids = set(worker_handler.get_running_uuids())
def queue_all_watches_background():
"""Background thread to queue all watches - discarded after completion."""
@@ -546,7 +556,7 @@ class CreateWatch(Resource):
for uuid in watches_to_queue:
# Check if already queued or running (state captured at start)
if uuid not in queued_uuids and uuid not in running_uuids:
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
queued_count += 1
else:
skipped_count += 1

View File

@@ -53,11 +53,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
watch = None
try:
# Use async interface with custom executor to avoid thread pool exhaustion
# With 30+ workers, we need executor sized to match (see worker_pool.py)
# Use sync interface via run_in_executor since each worker has its own event loop
loop = asyncio.get_event_loop()
queued_item_data = await asyncio.wait_for(
q.async_get(executor=executor),
timeout=1.0
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
timeout=1.5
)
except asyncio.TimeoutError:
@@ -67,17 +67,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
return "restart"
continue
except RuntimeError as e:
# Handle executor shutdown gracefully - this is expected during shutdown
if "cannot schedule new futures after shutdown" in str(e):
# Executor shut down - exit gracefully without logging in pytest
if not IN_PYTEST:
logger.debug(f"Worker {worker_id} detected executor shutdown, exiting")
break
# Other RuntimeError - log and continue
logger.error(f"Worker {worker_id} runtime error: {e}")
await asyncio.sleep(0.1)
continue
except Exception as e:
# Handle expected Empty exception from queue timeout
import queue
@@ -102,11 +91,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
uuid = queued_item_data.item.get('uuid')
# RACE CONDITION FIX: Atomically claim this UUID for processing
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem
# Try to claim the UUID atomically - prevents duplicate processing
if not worker_pool.claim_uuid_for_processing(uuid, worker_id):
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
# Already being processed by another worker
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
@@ -116,7 +105,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
# Re-queue with lower priority so it gets checked again after current processing finishes
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_pool.queue_item_async_safe(q, deferred_item, silent=True)
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
continue
@@ -501,7 +490,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
try:
# Release UUID from processing (thread-safe)
worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id)
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
# Send completion signal
if watch:

View File

@@ -14,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import forms
#
if request.method == 'POST':
# from changedetectionio import worker_pool
# from changedetectionio import worker_handler
from changedetectionio.blueprint.imports.importer import (
import_url_list,
@@ -31,7 +31,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs")
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if len(importer_handler.remaining_data) == 0:
return redirect(url_for('watchlist.index'))
@@ -45,7 +45,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# XLSX importer
@@ -70,7 +70,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Could be some remaining, or we could be on GET

View File

@@ -4,7 +4,7 @@ from flask import Blueprint, flash, redirect, url_for
from flask_login import login_required
from changedetectionio.store import ChangeDetectionStore
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from queue import PriorityQueue
PRICE_DATA_TRACK_ACCEPT = 'accepted'
@@ -20,7 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
datastore.data['watching'][uuid]['processor'] = 'restock_diff'
datastore.data['watching'][uuid].clear_watch()
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return redirect(url_for("watchlist.index"))
@login_required

View File

@@ -37,8 +37,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:

View File

@@ -83,7 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Adjust worker count if it changed
if new_worker_count != old_worker_count:
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds
# Check CPU core availability and warn if worker count is high
@@ -92,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format(
new_worker_count, cpu_count), 'warning')
result = worker_pool.adjust_async_worker_count(
result = worker_handler.adjust_async_worker_count(
new_count=new_worker_count,
update_q=update_q,
notification_q=notification_q,

View File

@@ -10,7 +10,7 @@ from changedetectionio.blueprint.ui.notification import construct_blueprint as c
from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint
from changedetectionio.blueprint.ui import diff, preview
def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
from flask import request, flash
if op == 'delete':
@@ -63,7 +63,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids:
if datastore.data['watching'].get(uuid):
# Recheck and require a full reprocessing
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if emit_flash:
flash(gettext("{} watches queued for rechecking").format(len(uuids)))
@@ -114,7 +114,7 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids:
watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint
@@ -222,14 +222,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
@login_optionally_required
def form_delete():
uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if uuid != 'all' and not uuid in datastore.data['watching'].keys():
flash(gettext('The watch by UUID {} does not exist.').format(uuid), 'error')
return redirect(url_for('watchlist.index'))
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
datastore.delete(uuid)
flash(gettext('Deleted.'))
@@ -239,14 +239,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
@login_optionally_required
def form_clone():
uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid)
if not datastore.data['watching'].get(uuid).get('paused'):
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
flash(gettext('Cloned, you are editing the new watch.'))
@@ -262,10 +262,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
if uuid:
# Single watch - check if already queued or running
if worker_pool.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
flash(gettext("Watch is already queued or being checked."))
else:
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
flash(gettext("Queued 1 watch for rechecking."))
else:
# Multiple watches - first count how many need to be queued
@@ -284,7 +284,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_pool.get_running_uuids())
running_uuids = set(worker_handler.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = []
@@ -294,7 +294,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
# Queue only the filtered watches
for watch_uuid in watches_to_queue_filtered:
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -310,7 +310,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
# 20+ watches - queue in background thread to avoid blocking HTTP response
# Capture queued/running state before background thread
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_pool.get_running_uuids())
running_uuids = set(worker_handler.get_running_uuids())
def queue_watches_background():
"""Background thread to queue watches - discarded after completion."""
@@ -320,7 +320,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
for watch_uuid in watches_to_queue:
# Check if already queued or running (state captured at start)
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
queued_count += 1
else:
skipped_count += 1
@@ -349,7 +349,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
extra_data=extra_data,
queuedWatchMetaData=queuedWatchMetaData,
uuids=uuids,
worker_pool=worker_pool,
worker_handler=worker_handler,
update_q=update_q,
watch_check_update=watch_check_update,
op=op,
@@ -367,6 +367,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool,
import json
from copy import deepcopy
# more for testing
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# copy it to memory as trim off what we dont need (history)
watch = deepcopy(datastore.data['watching'].get(uuid))

View File

@@ -83,6 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
If a processor doesn't have a difference module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
@@ -143,10 +144,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::render_form()
If a processor doesn't have an extract module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
@@ -199,7 +200,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::process_extraction()
If a processor doesn't have an extract module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
@@ -266,7 +267,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
- /diff/{uuid}/processor-asset/after
- /diff/{uuid}/processor-asset/rendered_diff
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()

View File

@@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.time_handler import is_within_schedule
from changedetectionio import worker_pool
from changedetectionio import worker_handler
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
@@ -30,13 +30,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import processors
import importlib
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# More for testing, possible to return the first/only
if not datastore.data['watching'].keys():
flash(gettext("No watches to edit"), "error")
return redirect(url_for('watchlist.index'))
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if not uuid in datastore.data['watching']:
flash(gettext("No watch with the UUID {} found.").format(uuid), "error")
return redirect(url_for('watchlist.index'))
@@ -282,7 +283,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
#############################
if not datastore.data['watching'][uuid].get('paused') and is_in_schedule:
# Queue the watch for immediate recheck, with a higher priority
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff':
@@ -313,7 +314,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
c = [f"processor-{watch.get('processor')}"]
if worker_pool.is_watch_running(uuid):
if worker_handler.is_watch_running(uuid):
c.append('checking-now')
template_args = {
@@ -370,8 +371,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from flask import send_file
import brotli
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
watch = datastore.data['watching'].get(uuid)
if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir):
latest_filename = list(watch.history.keys())[-1]
@@ -396,9 +395,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
def watch_get_preview_rendered(uuid):
'''For when viewing the "preview" of the rendered text from inside of Edit'''
from flask import jsonify
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
from changedetectionio.processors.text_json_diff import prepare_filter_prevew
result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore)
return jsonify(result)

View File

@@ -26,9 +26,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/preview.py::render()
If a processor doesn't have a preview module, falls back to default text preview.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
@@ -149,8 +150,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
"""
from flask import make_response
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:

View File

@@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, url_for, flash
from flask_babel import gettext
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio import worker_pool
from changedetectionio import worker_handler
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update):
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag')))
else:
# Straight into the queue.
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
flash(gettext("Watch added."))
return redirect(url_for('watchlist.index', tag=request.args.get('tag','')))

View File

@@ -14,7 +14,7 @@ from pathlib import Path
from changedetectionio.strtobool import strtobool
from threading import Event
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from flask import (
Flask,
@@ -195,7 +195,7 @@ def _jinja2_filter_format_number_locale(value: float) -> str:
@app.template_global('is_checking_now')
def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"):
return worker_pool.is_watch_running(watch_obj['uuid'])
return worker_handler.is_watch_running(watch_obj['uuid'])
@app.template_global('get_watch_queue_position')
def _get_watch_queue_position(watch_obj):
@@ -206,13 +206,13 @@ def _get_watch_queue_position(watch_obj):
@app.template_global('get_current_worker_count')
def _get_current_worker_count():
"""Get the current number of operational workers"""
return worker_pool.get_worker_count()
return worker_handler.get_worker_count()
@app.template_global('get_worker_status_info')
def _get_worker_status_info():
"""Get detailed worker status information for display"""
status = worker_pool.get_worker_status()
running_uuids = worker_pool.get_running_uuids()
status = worker_handler.get_worker_status()
running_uuids = worker_handler.get_running_uuids()
return {
'count': status['worker_count'],
@@ -801,7 +801,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
@@ -838,10 +838,10 @@ def changedetection_app(config=None, datastore_o=None):
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
# Get basic status
status = worker_pool.get_worker_status()
status = worker_handler.get_worker_status()
# Perform health check
health_result = worker_pool.check_worker_health(
health_result = worker_handler.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
@@ -905,24 +905,14 @@ def changedetection_app(config=None, datastore_o=None):
# Can be overridden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
logger.info(f"Starting {n_workers} workers during app initialization")
worker_pool.start_workers(n_workers, update_q, notification_q, app, datastore)
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# Skip background threads in batch mode (just process queue and exit)
batch_mode = app.config.get('batch_mode', False)
if not batch_mode:
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
# Start configurable number of notification workers (default 1)
notification_workers = int(os.getenv("NOTIFICATION_WORKERS", "1"))
for i in range(notification_workers):
threading.Thread(
target=notification_runner,
args=(i,),
daemon=True,
name=f"NotificationRunner-{i}"
).start()
logger.info(f"Started {notification_workers} notification worker(s)")
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
@@ -964,14 +954,14 @@ def check_for_new_version():
app.config.exit.wait(86400)
def notification_runner(worker_id=0):
def notification_runner():
global notification_debug_log
from datetime import datetime
import json
with app.app_context():
while not app.config.exit.is_set():
try:
# Multiple workers can run concurrently (configurable via NOTIFICATION_WORKERS)
# At the moment only one thread runs (single runner)
n_object = notification_q.get(block=False)
except queue.Empty:
app.config.exit.wait(1)
@@ -997,7 +987,7 @@ def notification_runner(worker_id=0):
sent_obj = process_notification(n_object, datastore)
except Exception as e:
logger.error(f"Notification worker {worker_id} - Watch URL: {n_object['watch_url']} Error {str(e)}")
logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}")
# UUID wont be present when we submit a 'test' from the global settings
if 'uuid' in n_object:
@@ -1038,7 +1028,7 @@ def ticker_thread_check_time_launch_checks():
now = time.time()
if now - last_health_check > 60:
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
health_result = worker_pool.check_worker_health(
health_result = worker_handler.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
@@ -1057,7 +1047,7 @@ def ticker_thread_check_time_launch_checks():
continue
# Get a list of watches by UUID that are currently fetching data
running_uuids = worker_pool.get_running_uuids()
running_uuids = worker_handler.get_running_uuids()
# Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch
queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue}
@@ -1163,7 +1153,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time())
# Into the queue with you
queued_successfully = worker_pool.queue_item_async_safe(update_q,
queued_successfully = worker_handler.queue_item_async_safe(update_q,
queuedWatchMetaData.PrioritizedItem(priority=priority,
item={'uuid': uuid})
)

View File

@@ -5,52 +5,51 @@ import heapq
import queue
import threading
# Janus is no longer required - we use pure threading.Queue for multi-loop support
# try:
# import janus
# except ImportError:
# pass # Not needed anymore
try:
import janus
except ImportError:
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
raise
class RecheckPriorityQueue:
"""
Thread-safe priority queue supporting multiple async event loops.
ARCHITECTURE:
- Multiple async workers, each with its own event loop in its own thread
- One shared queue accessed safely via threading primitives
- Workers use run_in_executor to access queue without blocking their event loop
IMPLEMENTATION:
- Pure threading.Queue for notifications (no event loop binding)
- Heapq-based priority storage (thread-safe with RLock)
- Async methods wrap sync methods via run_in_executor
- Supports both sync and async access patterns
WHY NOT JANUS:
- Janus binds to ONE event loop at creation time
- Our architecture has 15+ workers, each with separate event loops
- Workers in different threads/loops cannot share janus async interface
- Pure threading approach works across all event loops
Ultra-reliable priority queue using janus for async/sync bridging.
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
- async_q: Used by async workers (the actual fetchers/processors) and coroutines
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
- Synchronous code (Flask, threads) cannot use async methods without blocking
- Async code cannot use sync methods without blocking the event loop
- janus provides the only safe bridge between these two worlds
Attempting to unify to async-only would require:
- Converting all Flask routes to async (major breaking change)
- Using asyncio.run() in sync contexts (causes deadlocks)
- Thread-pool wrapping (adds complexity and overhead)
Minimal implementation focused on reliability:
- Pure janus for sync/async bridge
- Thread-safe priority ordering
- Bulletproof error handling with critical logging
"""
def __init__(self, maxsize: int = 0):
try:
# Use pure threading.Queue for notification - no janus needed
# This avoids event loop binding issues with multiple worker threads
import asyncio
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
self._janus_queue = janus.Queue(maxsize=maxsize)
# BOTH interfaces required - see class docstring for why
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread
self.async_q = self._janus_queue.async_q # Async workers
# Priority storage - thread-safe
self._priority_items = []
self._lock = threading.RLock()
# Condition variable for async wait support
self._condition = threading.Condition(self._lock)
# Signals for UI updates
self.queue_length_signal = signal('queue_length')
logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
@@ -60,23 +59,24 @@ class RecheckPriorityQueue:
def put(self, item, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with priority ordering"""
try:
# Add to priority storage and notify waiters
with self._condition:
# Add to priority storage
with self._lock:
heapq.heappush(self._priority_items, item)
self._notification_queue.put(True, block=False) # Notification only
self._condition.notify_all() # Wake up any async waiters
# Notify via janus sync queue
self.sync_q.put(True, block=block, timeout=timeout)
# Emit signals
self._emit_put_signals(item)
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if put failed
# Remove from priority storage if janus put failed
try:
with self._condition:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
@@ -86,10 +86,10 @@ class RecheckPriorityQueue:
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get with priority ordering"""
import queue as queue_module
import queue
try:
# Wait for notification (this doesn't return the actual item, just signals availability)
self._notification_queue.get(block=block, timeout=timeout)
# Wait for notification
self.sync_q.get(block=block, timeout=timeout)
# Get highest priority item
with self._lock:
@@ -104,53 +104,57 @@ class RecheckPriorityQueue:
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
return item
except queue_module.Empty:
except queue.Empty:
# Queue is empty with timeout - expected behavior, re-raise without logging
raise # noqa
raise
except Exception as e:
# Re-raise without logging - caller (worker) will handle and log appropriately
raise
# ASYNC INTERFACE (for workers)
async def async_put(self, item, executor=None):
"""Async put with priority ordering - uses thread pool to avoid blocking
Args:
item: Item to add to queue
executor: Optional ThreadPoolExecutor. If None, uses default pool.
"""
import asyncio
async def async_put(self, item):
"""Pure async put with priority ordering"""
try:
# Use run_in_executor to call sync put without blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, # Use provided executor or default
lambda: self.put(item, block=True, timeout=5.0)
)
# Add to priority storage
with self._lock:
heapq.heappush(self._priority_items, item)
# Notify via janus async queue
await self.async_q.put(True)
# Emit signals
self._emit_put_signals(item)
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
return result
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
return False
async def async_get(self, executor=None):
"""Async get with priority ordering - uses thread pool to avoid blocking
Args:
executor: Optional ThreadPoolExecutor. If None, uses default pool.
With many workers (30+), pass custom executor scaled to worker count.
"""
import asyncio
async def async_get(self):
"""Pure async get with priority ordering"""
try:
# Use run_in_executor to call sync get without blocking event loop
# This works across multiple event loops since it uses threading
loop = asyncio.get_event_loop()
item = await loop.run_in_executor(
executor, # Use provided executor (scales with FETCH_WORKERS) or default
lambda: self.get(block=True, timeout=1.0)
)
# Wait for notification
await self.async_q.get()
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
return item
@@ -183,9 +187,9 @@ class RecheckPriorityQueue:
return []
def close(self):
"""Close the queue"""
"""Close the janus queue"""
try:
# Nothing to close for threading.Queue
self._janus_queue.close()
logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
@@ -359,11 +363,12 @@ class NotificationQueue:
def __init__(self, maxsize: int = 0, datastore=None):
try:
# Use pure threading.Queue to avoid event loop binding issues
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
self._janus_queue = janus.Queue(maxsize=maxsize)
# BOTH interfaces required - see class docstring for why
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
self.async_q = self._janus_queue.async_q # Async workers
self.notification_event_signal = signal('notification_event')
self.datastore = datastore # For checking all_muted setting
self._lock = threading.RLock()
logger.debug("NotificationQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
@@ -381,8 +386,7 @@ class NotificationQueue:
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False
with self._lock:
self._notification_queue.put(item, block=block, timeout=timeout)
self.sync_q.put(item, block=block, timeout=timeout)
self._emit_notification_signal(item)
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
return True
@@ -390,49 +394,36 @@ class NotificationQueue:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
async def async_put(self, item: Dict[str, Any], executor=None):
"""Async put with signal emission - uses thread pool
Args:
item: Notification item to queue
executor: Optional ThreadPoolExecutor
"""
import asyncio
async def async_put(self, item: Dict[str, Any]):
"""Pure async put with signal emission"""
try:
# Check if all notifications are muted
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, lambda: self.put(item, block=True, timeout=5.0))
await self.async_q.put(item)
self._emit_notification_signal(item)
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get"""
try:
with self._lock:
return self._notification_queue.get(block=block, timeout=timeout)
return self.sync_q.get(block=block, timeout=timeout)
except queue.Empty as e:
raise e
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
raise e
async def async_get(self, executor=None):
"""Async get - uses thread pool
Args:
executor: Optional ThreadPoolExecutor
"""
import asyncio
async def async_get(self):
"""Pure async get"""
try:
loop = asyncio.get_event_loop()
return await loop.run_in_executor(executor, lambda: self.get(block=True, timeout=1.0))
return await self.async_q.get()
except queue.Empty as e:
raise e
except Exception as e:
@@ -442,20 +433,19 @@ class NotificationQueue:
def qsize(self) -> int:
"""Get current queue size"""
try:
with self._lock:
return self._notification_queue.qsize()
return self.sync_q.qsize()
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
return 0
def empty(self) -> bool:
"""Check if queue is empty"""
return self.qsize() == 0
def close(self):
"""Close the queue"""
"""Close the janus queue"""
try:
# Nothing to close for threading.Queue
self._janus_queue.close()
logger.debug("NotificationQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")

View File

@@ -37,9 +37,9 @@ def register_watch_operation_handlers(socketio, datastore):
# Import here to avoid circular imports
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_pool
from changedetectionio import worker_handler
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
logger.info(f"Socket.IO: Queued recheck for watch {uuid}")
else:
emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'})

View File

@@ -145,10 +145,10 @@ def handle_watch_update(socketio, **kwargs):
# Emit the watch update to all connected clients
from changedetectionio.flask_app import update_q
from changedetectionio.flask_app import _jinja2_filter_datetime
from changedetectionio import worker_pool
from changedetectionio import worker_handler
# Get list of watches that are currently running
running_uuids = worker_pool.get_running_uuids()
running_uuids = worker_handler.get_running_uuids()
# Get list of watches in the queue (efficient single-lock method)
queue_list = update_q.get_queued_uuids()
@@ -252,7 +252,7 @@ def init_socketio(app, datastore):
def event_checkbox_operations(data):
from changedetectionio.blueprint.ui import _handle_operations
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_pool
from changedetectionio import worker_handler
from changedetectionio.flask_app import update_q, watch_check_update
import threading
@@ -268,7 +268,7 @@ def init_socketio(app, datastore):
uuids=data.get('uuids'),
datastore=datastore,
extra_data=data.get('extra_data'),
worker_pool=worker_pool,
worker_handler=worker_handler,
update_q=update_q,
queuedWatchMetaData=queuedWatchMetaData,
watch_check_update=watch_check_update,

View File

@@ -66,7 +66,7 @@ echo "-------------------- Running rest of tests in parallel -------------------
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
REMOVE_REQUESTS_OLD_SCREENSHOTS=false \
pytest tests/test_*.py \
-n 16 \
-n 30 \
--dist=load \
-vvv \
-s \

View File

@@ -166,9 +166,6 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
"""
logger.info(f"Datastore path is '{datastore_path}'")
# CRITICAL: Update datastore_path (was using old path from __init__)
self.datastore_path = datastore_path
# Initialize data structure
self.__data = App.model()
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
@@ -607,6 +604,19 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
return None
# Check PAGE_WATCH_LIMIT if set
page_watch_limit = os.getenv('PAGE_WATCH_LIMIT')
if page_watch_limit:
try:
page_watch_limit = int(page_watch_limit)
current_watch_count = len(self.__data['watching'])
if current_watch_count >= page_watch_limit:
logger.error(f"Watch limit reached: {current_watch_count}/{page_watch_limit} watches. Cannot add {url}")
flash(gettext("Watch limit reached ({}/{} watches). Cannot add more watches.").format(current_watch_count, page_watch_limit), 'error')
return None
except ValueError:
logger.warning(f"Invalid PAGE_WATCH_LIMIT value: {page_watch_limit}, ignoring limit check")
if tag and type(tag) == str:
# Then it's probably a string of the actual tag by name, split and add it
for t in tag.split(','):

View File

@@ -165,57 +165,6 @@ def prepare_test_function(live_server, datastore_path):
except:
break
# Add test helper methods to the app for worker management
def set_workers(count):
"""Set the number of workers for testing - brutal shutdown, no delays"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
current_count = worker_pool.get_worker_count()
# Special case: Setting to 0 means shutdown all workers brutally
if count == 0:
logger.debug(f"Brutally shutting down all {current_count} workers")
worker_pool.shutdown_workers()
return {
'status': 'success',
'message': f'Shutdown all {current_count} workers',
'previous_count': current_count,
'current_count': 0
}
# Adjust worker count (no delays, no verification)
result = worker_pool.adjust_async_worker_count(
count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
return result
def check_all_workers_alive(expected_count):
"""Check that all expected workers are alive"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
result = worker_pool.check_worker_health(
expected_count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
assert result['status'] == 'healthy', f"Workers not healthy: {result['message']}"
return result
# Attach helper methods to app for easy test access
live_server.app.set_workers = set_workers
live_server.app.check_all_workers_alive = check_all_workers_alive
# Prevent background thread from writing during cleanup/reload
datastore.needs_write = False
datastore.needs_write_urgent = False
@@ -313,8 +262,8 @@ def app(request, datastore_path):
# Shutdown workers gracefully before loguru cleanup
try:
from changedetectionio import worker_pool
worker_pool.shutdown_workers()
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
except Exception:
pass

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks, delete_all_watches
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
import os
@@ -116,7 +116,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# And not this cause its not the ld-json
assert b"So let's see what happens" not in res.data
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
##########################################################################################
# And we shouldnt see the offer
@@ -131,7 +131,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
assert b'ldjson-price-track-offer' not in res.data
##########################################################################################
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
@@ -147,7 +147,7 @@ def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_
##########################################################################################
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_bad_ldjson_is_correctly_ignored(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -414,4 +414,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
assert b'Abonnementen bijwerken' in res.data
assert b'&lt;foobar' not in res.data
res = delete_all_watches(client)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -6,7 +6,7 @@ from .util import (
set_original_response,
set_modified_response,
live_server_setup,
wait_for_all_checks, delete_all_watches
wait_for_all_checks
)
from loguru import logger
@@ -104,7 +104,7 @@ def run_socketio_watch_update_test(client, live_server, password_mode="", datast
assert watch.has_unviewed, "The watch was not marked as unviewed after content change"
# Clean up
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_everything(live_server, client, measure_memory_usage, datastore_path):

View File

@@ -166,8 +166,7 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage, datastore_path
delete_all_watches(client)
def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path):
delete_all_watches(client)
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)

View File

@@ -82,7 +82,7 @@ def test_import_distillio(client, live_server, measure_memory_usage, datastore_p
# Give the endpoint time to spin up
time.sleep(1)
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
res = client.post(
url_for("imports.import_page"),
data={

View File

@@ -224,7 +224,6 @@ def check_json_filter(json_filter, client, live_server, datastore_path):
set_original_response(datastore_path=datastore_path)
delete_all_watches(client)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})

View File

@@ -1,7 +1,7 @@
import os
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
import logging
def test_check_notification_error_handling(client, live_server, measure_memory_usage, datastore_path):
@@ -81,4 +81,4 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
os.unlink(os.path.join(datastore_path, "notification.txt"))
assert 'xxxxx' in notification_submission
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -1,52 +0,0 @@
import os
import time
from flask import url_for
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
from ..notification import valid_notification_formats
from loguru import logger
def test_queue_system(client, live_server, measure_memory_usage, datastore_path):
"""Test that multiple workers can process queue concurrently without blocking each other"""
# (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)).
items = os.cpu_count() +3
delay = 10
# Auto-queue is off here.
live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True
test_urls = [
f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}"
for i in range(0, items)
]
# Import 30 URLs to queue
res = client.post(
url_for("imports.import_page"),
data={"urls": "\r\n".join(test_urls)},
follow_redirects=True
)
assert f"{items} Imported".encode('utf-8') in res.data
client.application.set_workers(items)
start = time.time()
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(delay/2)
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
logger.debug( f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}")
assert len(running_uuids) != 0, f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}"
wait_for_all_checks(client)
# all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too)
total_time = (time.time() - start)
logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total")
# if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds
assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s"
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
assert len(running_uuids) == 0, f"Expected all workers to be idle, but {len(running_uuids)} UUIDs still being processed: {running_uuids}"

View File

@@ -107,7 +107,7 @@ def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path
assert b"Access denied, bad token" not in res.data
assert b"Random content" in res.data
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -366,7 +366,7 @@ def test_check_with_prefix_include_filters(client, live_server, measure_memory_u
assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" not in res.data # not in selector
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_various_rules(client, live_server, measure_memory_usage, datastore_path):
@@ -423,7 +423,7 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": "//*[contains(@class, 'sametext')]|//*[contains(@class, 'changetext')]",
"url": test_url,
"tags": "",
@@ -437,14 +437,14 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_preview.preview_page", uuid=uuid),
url_for("ui.ui_preview.preview_page", uuid="first"),
follow_redirects=True
)
assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" in res.data # in selector
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_xpath_20_function_count(client, live_server, measure_memory_usage, datastore_path):
@@ -477,7 +477,7 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage, data
assert b"246913579975308642" in res.data # in selector
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_xpath_20_function_count2(client, live_server, measure_memory_usage, datastore_path):
@@ -501,8 +501,6 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
)
assert b"Updated watch." in res.data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
@@ -512,7 +510,7 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
assert b"246913579975308642" in res.data # in selector
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_xpath_20_function_string_join_matches(client, live_server, measure_memory_usage, datastore_path):
@@ -546,7 +544,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
assert b"Some text thats the samespecialconjunctionSome text that will change" in res.data # in selector
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
@@ -584,7 +582,7 @@ def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'"
assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target
delete_all_watches(client)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
# Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -135,25 +135,18 @@ def delete_all_watches(client=None):
uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid)
from changedetectionio.flask_app import update_q
# Clear the queue to prevent leakage to next test
while not update_q.empty():
try:
update_q.get_nowait()
except:
break
def wait_for_all_checks(client=None):
"""
Waits until the queue is empty and workers are idle.
Delegates to worker_pool.wait_for_all_checks for shared logic.
Delegates to worker_handler.wait_for_all_checks for shared logic.
"""
from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_pool
time.sleep(0.25)
# Use the shared wait logic from worker_pool
return worker_pool.wait_for_all_checks(global_update_q, timeout=150)
from changedetectionio import worker_handler
time.sleep(0.05)
# Use the shared wait logic from worker_handler
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
def wait_for_watch_history(client, min_history_count=2, timeout=10):
"""

View File

@@ -91,8 +91,7 @@ class WorkerThread:
self.thread.start()
def stop(self):
"""Stop the worker thread brutally - no waiting"""
# Try to stop the event loop if it exists
"""Stop the worker thread"""
if self.loop and self.running:
try:
# Signal the loop to stop
@@ -100,7 +99,8 @@ class WorkerThread:
except RuntimeError:
pass
# Don't wait - thread is daemon and will die when needed
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
@@ -125,7 +125,7 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
"""Start a single async worker with auto-restart capability"""
from changedetectionio.worker import async_update_worker
from changedetectionio.async_update_worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging
import os
@@ -337,36 +337,24 @@ def queue_item_async_safe(update_q, item, silent=False):
def shutdown_workers():
"""Shutdown all async workers brutally - no delays, no waiting"""
global worker_threads, queue_executor
"""Shutdown all async workers fast and aggressively"""
global worker_threads
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Brutal shutdown of async workers initiated...")
logger.info("Fast shutdown of async workers initiated...")
# Stop all worker event loops
# Stop all worker threads
for worker in worker_threads:
worker.stop()
# Clear immediately - threads are daemon and will die
worker_threads.clear()
# Shutdown the queue executor to prevent "cannot schedule new futures after shutdown" errors
# This must happen AFTER workers are stopped to avoid race conditions
if queue_executor:
try:
queue_executor.shutdown(wait=False)
if not in_pytest:
logger.debug("Queue executor shut down")
except Exception as e:
if not in_pytest:
logger.warning(f"Error shutting down queue executor: {e}")
if not in_pytest:
logger.info("Async workers brutal shutdown complete")
logger.info("Async workers fast shutdown complete")

View File

@@ -8,7 +8,7 @@ flask-paginate
flask_expects_json~=1.7
flask_restful
flask_cors # For the Chrome extension to operate
# janus # No longer needed - using pure threading.Queue for multi-loop support
janus # Thread-safe async/sync queue bridge
flask_wtf~=1.2
flask~=3.1
flask-socketio~=5.6.0