diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index 2be27421..ea45cd14 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -102,8 +102,8 @@ def sigshutdown_handler(_signo, _stack_frame): # Shutdown workers and queues immediately try: - from changedetectionio import worker_handler - worker_handler.shutdown_workers() + from changedetectionio import worker_pool + worker_pool.shutdown_workers() except Exception as e: logger.error(f"Error shutting down workers: {str(e)}") @@ -415,12 +415,12 @@ def main(): # This must happen AFTER app initialization so update_q is available if batch_mode and added_watch_uuids: from changedetectionio.flask_app import update_q - from changedetectionio import queuedWatchMetaData, worker_handler + from changedetectionio import queuedWatchMetaData, worker_pool logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches") for watch_uuid in added_watch_uuids: try: - worker_handler.queue_item_async_safe( + worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) @@ -432,7 +432,7 @@ def main(): # This must happen AFTER app initialization so update_q is available if recheck_watches is not None: from changedetectionio.flask_app import update_q - from changedetectionio import queuedWatchMetaData, worker_handler + from changedetectionio import queuedWatchMetaData, worker_pool watches_to_queue = [] if recheck_watches == 'all': @@ -454,7 +454,7 @@ def main(): for watch_uuid in watches_to_queue: if watch_uuid in datastore.data['watching']: try: - worker_handler.queue_item_async_safe( + worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) @@ -516,7 +516,7 @@ def main(): for watch_uuid in watches_to_queue: if watch_uuid in datastore.data['watching']: try: - worker_handler.queue_item_async_safe( + worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) @@ -549,7 +549,7 @@ def main(): logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...") # Use the shared wait_for_all_checks function - completed = worker_handler.wait_for_all_checks(update_q, timeout=300) + completed = worker_pool.wait_for_all_checks(update_q, timeout=300) if not completed: logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds") diff --git a/changedetectionio/api/Tags.py b/changedetectionio/api/Tags.py index 91ff9510..e70d565a 100644 --- a/changedetectionio/api/Tags.py +++ b/changedetectionio/api/Tags.py @@ -1,5 +1,5 @@ from changedetectionio import queuedWatchMetaData -from changedetectionio import worker_handler +from changedetectionio import worker_pool from flask_expects_json import expects_json from flask_restful import abort, Resource from loguru import logger @@ -42,7 +42,7 @@ class Tag(Resource): # If less than 20 watches, queue synchronously for immediate feedback if len(watches_to_queue) < 20: for watch_uuid in watches_to_queue: - worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) + worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200 else: # 20+ watches - queue in background thread to avoid blocking API response @@ -50,7 +50,7 @@ class Tag(Resource): """Background thread to queue watches - discarded after completion.""" try: for watch_uuid in watches_to_queue: - worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) + worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued") except Exception as e: logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}") diff --git a/changedetectionio/api/Watch.py b/changedetectionio/api/Watch.py index 293a1c0d..5fdc0177 100644 --- a/changedetectionio/api/Watch.py +++ b/changedetectionio/api/Watch.py @@ -6,7 +6,7 @@ from changedetectionio.favicon_utils import get_favicon_mime_type from . import auth from changedetectionio import queuedWatchMetaData, strtobool -from changedetectionio import worker_handler +from changedetectionio import worker_pool from flask import request, make_response, send_from_directory from flask_expects_json import expects_json from flask_restful import abort, Resource @@ -85,7 +85,7 @@ class Watch(Resource): abort(404, message='No watch exists with the UUID of {}'.format(uuid)) if request.args.get('recheck'): - worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) return "OK", 200 if request.args.get('paused', '') == 'paused': self.datastore.data['watching'].get(uuid).pause() @@ -477,7 +477,7 @@ class CreateWatch(Resource): new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags) if new_uuid: # Dont queue because the scheduler will check that it hasnt been checked before anyway -# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) +# worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) return {'uuid': new_uuid}, 201 else: return "Invalid or unsupported URL", 400 @@ -514,7 +514,7 @@ class CreateWatch(Resource): if len(watches_to_queue) < 20: # Get already queued/running UUIDs once (efficient) queued_uuids = set(self.update_q.get_queued_uuids()) - running_uuids = set(worker_handler.get_running_uuids()) + running_uuids = set(worker_pool.get_running_uuids()) # Filter out watches that are already queued or running watches_to_queue_filtered = [ @@ -524,7 +524,7 @@ class CreateWatch(Resource): # Queue only the filtered watches for uuid in watches_to_queue_filtered: - worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # Provide feedback about skipped watches skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered) @@ -536,7 +536,7 @@ class CreateWatch(Resource): # 20+ watches - queue in background thread to avoid blocking API response # Capture queued/running state before background thread queued_uuids = set(self.update_q.get_queued_uuids()) - running_uuids = set(worker_handler.get_running_uuids()) + running_uuids = set(worker_pool.get_running_uuids()) def queue_all_watches_background(): """Background thread to queue all watches - discarded after completion.""" @@ -546,7 +546,7 @@ class CreateWatch(Resource): for uuid in watches_to_queue: # Check if already queued or running (state captured at start) if uuid not in queued_uuids and uuid not in running_uuids: - worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) queued_count += 1 else: skipped_count += 1 diff --git a/changedetectionio/blueprint/imports/__init__.py b/changedetectionio/blueprint/imports/__init__.py index fa6fe771..96e2360a 100644 --- a/changedetectionio/blueprint/imports/__init__.py +++ b/changedetectionio/blueprint/imports/__init__.py @@ -14,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe from changedetectionio import forms # if request.method == 'POST': -# from changedetectionio import worker_handler +# from changedetectionio import worker_pool from changedetectionio.blueprint.imports.importer import ( import_url_list, @@ -31,7 +31,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs") # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # for uuid in importer_handler.new_uuids: -# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) +# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) if len(importer_handler.remaining_data) == 0: return redirect(url_for('watchlist.index')) @@ -45,7 +45,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # for uuid in importer_handler.new_uuids: -# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) +# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # XLSX importer @@ -70,7 +70,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # for uuid in importer_handler.new_uuids: -# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) +# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # Could be some remaining, or we could be on GET diff --git a/changedetectionio/blueprint/price_data_follower/__init__.py b/changedetectionio/blueprint/price_data_follower/__init__.py index c2c6e768..ecf60eea 100644 --- a/changedetectionio/blueprint/price_data_follower/__init__.py +++ b/changedetectionio/blueprint/price_data_follower/__init__.py @@ -4,7 +4,7 @@ from flask import Blueprint, flash, redirect, url_for from flask_login import login_required from changedetectionio.store import ChangeDetectionStore from changedetectionio import queuedWatchMetaData -from changedetectionio import worker_handler +from changedetectionio import worker_pool from queue import PriorityQueue PRICE_DATA_TRACK_ACCEPT = 'accepted' @@ -20,7 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT datastore.data['watching'][uuid]['processor'] = 'restock_diff' datastore.data['watching'][uuid].clear_watch() - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) return redirect(url_for("watchlist.index")) @login_required diff --git a/changedetectionio/blueprint/settings/__init__.py b/changedetectionio/blueprint/settings/__init__.py index 3d660f8b..4c35ddb5 100644 --- a/changedetectionio/blueprint/settings/__init__.py +++ b/changedetectionio/blueprint/settings/__init__.py @@ -83,7 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): # Adjust worker count if it changed if new_worker_count != old_worker_count: - from changedetectionio import worker_handler + from changedetectionio import worker_pool from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds # Check CPU core availability and warn if worker count is high @@ -92,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format( new_worker_count, cpu_count), 'warning') - result = worker_handler.adjust_async_worker_count( + result = worker_pool.adjust_async_worker_count( new_count=new_worker_count, update_q=update_q, notification_q=notification_q, diff --git a/changedetectionio/blueprint/ui/__init__.py b/changedetectionio/blueprint/ui/__init__.py index 296533ba..548f12e1 100644 --- a/changedetectionio/blueprint/ui/__init__.py +++ b/changedetectionio/blueprint/ui/__init__.py @@ -10,7 +10,7 @@ from changedetectionio.blueprint.ui.notification import construct_blueprint as c from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint from changedetectionio.blueprint.ui import diff, preview -def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True): +def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True): from flask import request, flash if op == 'delete': @@ -63,7 +63,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): # Recheck and require a full reprocessing - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) if emit_flash: flash(gettext("{} watches queued for rechecking").format(len(uuids))) @@ -114,7 +114,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: watch_check_update.send(watch_uuid=uuid) -def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update): +def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update): ui_blueprint = Blueprint('ui', __name__, template_folder="templates") # Register the edit blueprint @@ -246,7 +246,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle new_uuid = datastore.clone(uuid) if not datastore.data['watching'].get(uuid).get('paused'): - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid})) flash(gettext('Cloned, you are editing the new watch.')) @@ -262,10 +262,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle if uuid: # Single watch - check if already queued or running - if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids(): + if worker_pool.is_watch_running(uuid) or uuid in update_q.get_queued_uuids(): flash(gettext("Watch is already queued or being checked.")) else: - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) flash(gettext("Queued 1 watch for rechecking.")) else: # Multiple watches - first count how many need to be queued @@ -284,7 +284,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle if len(watches_to_queue) < 20: # Get already queued/running UUIDs once (efficient) queued_uuids = set(update_q.get_queued_uuids()) - running_uuids = set(worker_handler.get_running_uuids()) + running_uuids = set(worker_pool.get_running_uuids()) # Filter out watches that are already queued or running watches_to_queue_filtered = [] @@ -294,7 +294,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle # Queue only the filtered watches for watch_uuid in watches_to_queue_filtered: - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) # Provide feedback about skipped watches skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered) @@ -310,7 +310,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle # 20+ watches - queue in background thread to avoid blocking HTTP response # Capture queued/running state before background thread queued_uuids = set(update_q.get_queued_uuids()) - running_uuids = set(worker_handler.get_running_uuids()) + running_uuids = set(worker_pool.get_running_uuids()) def queue_watches_background(): """Background thread to queue watches - discarded after completion.""" @@ -320,7 +320,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle for watch_uuid in watches_to_queue: # Check if already queued or running (state captured at start) if watch_uuid not in queued_uuids and watch_uuid not in running_uuids: - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) queued_count += 1 else: skipped_count += 1 @@ -349,7 +349,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle extra_data=extra_data, queuedWatchMetaData=queuedWatchMetaData, uuids=uuids, - worker_handler=worker_handler, + worker_pool=worker_pool, update_q=update_q, watch_check_update=watch_check_update, op=op, diff --git a/changedetectionio/blueprint/ui/edit.py b/changedetectionio/blueprint/ui/edit.py index a8b0d0b4..6db548b0 100644 --- a/changedetectionio/blueprint/ui/edit.py +++ b/changedetectionio/blueprint/ui/edit.py @@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader from changedetectionio.store import ChangeDetectionStore from changedetectionio.auth_decorator import login_optionally_required from changedetectionio.time_handler import is_within_schedule -from changedetectionio import worker_handler +from changedetectionio import worker_pool def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData): edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates") @@ -283,7 +283,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe ############################# if not datastore.data['watching'][uuid].get('paused') and is_in_schedule: # Queue the watch for immediate recheck, with a higher priority - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # Diff page [edit] link should go back to diff page if request.args.get("next") and request.args.get("next") == 'diff': @@ -314,7 +314,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe app_rss_token = datastore.data['settings']['application'].get('rss_access_token'), c = [f"processor-{watch.get('processor')}"] - if worker_handler.is_watch_running(uuid): + if worker_pool.is_watch_running(uuid): c.append('checking-now') template_args = { diff --git a/changedetectionio/blueprint/ui/views.py b/changedetectionio/blueprint/ui/views.py index 2e588c82..4673ab87 100644 --- a/changedetectionio/blueprint/ui/views.py +++ b/changedetectionio/blueprint/ui/views.py @@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, url_for, flash from flask_babel import gettext from changedetectionio.store import ChangeDetectionStore from changedetectionio.auth_decorator import login_optionally_required -from changedetectionio import worker_handler +from changedetectionio import worker_pool def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update): @@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag'))) else: # Straight into the queue. - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) flash(gettext("Watch added.")) return redirect(url_for('watchlist.index', tag=request.args.get('tag',''))) diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 310c4665..8c25133d 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -14,7 +14,7 @@ from pathlib import Path from changedetectionio.strtobool import strtobool from threading import Event from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue -from changedetectionio import worker_handler +from changedetectionio import worker_pool from flask import ( Flask, @@ -195,7 +195,7 @@ def _jinja2_filter_format_number_locale(value: float) -> str: @app.template_global('is_checking_now') def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"): - return worker_handler.is_watch_running(watch_obj['uuid']) + return worker_pool.is_watch_running(watch_obj['uuid']) @app.template_global('get_watch_queue_position') def _get_watch_queue_position(watch_obj): @@ -206,13 +206,13 @@ def _get_watch_queue_position(watch_obj): @app.template_global('get_current_worker_count') def _get_current_worker_count(): """Get the current number of operational workers""" - return worker_handler.get_worker_count() + return worker_pool.get_worker_count() @app.template_global('get_worker_status_info') def _get_worker_status_info(): """Get detailed worker status information for display""" - status = worker_handler.get_worker_status() - running_uuids = worker_handler.get_running_uuids() + status = worker_pool.get_worker_status() + running_uuids = worker_pool.get_running_uuids() return { 'count': status['worker_count'], @@ -801,7 +801,7 @@ def changedetection_app(config=None, datastore_o=None): # watchlist UI buttons etc import changedetectionio.blueprint.ui as ui - app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update)) + app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update)) import changedetectionio.blueprint.watchlist as watchlist app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='') @@ -838,10 +838,10 @@ def changedetection_app(config=None, datastore_o=None): expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) # Get basic status - status = worker_handler.get_worker_status() + status = worker_pool.get_worker_status() # Perform health check - health_result = worker_handler.check_worker_health( + health_result = worker_pool.check_worker_health( expected_count=expected_workers, update_q=update_q, notification_q=notification_q, @@ -905,7 +905,7 @@ def changedetection_app(config=None, datastore_o=None): # Can be overridden by ENV or use the default settings n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) logger.info(f"Starting {n_workers} workers during app initialization") - worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) + worker_pool.start_workers(n_workers, update_q, notification_q, app, datastore) # Skip background threads in batch mode (just process queue and exit) batch_mode = app.config.get('batch_mode', False) @@ -1038,7 +1038,7 @@ def ticker_thread_check_time_launch_checks(): now = time.time() if now - last_health_check > 60: expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) - health_result = worker_handler.check_worker_health( + health_result = worker_pool.check_worker_health( expected_count=expected_workers, update_q=update_q, notification_q=notification_q, @@ -1057,7 +1057,7 @@ def ticker_thread_check_time_launch_checks(): continue # Get a list of watches by UUID that are currently fetching data - running_uuids = worker_handler.get_running_uuids() + running_uuids = worker_pool.get_running_uuids() # Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue} @@ -1163,7 +1163,7 @@ def ticker_thread_check_time_launch_checks(): priority = int(time.time()) # Into the queue with you - queued_successfully = worker_handler.queue_item_async_safe(update_q, + queued_successfully = worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}) ) diff --git a/changedetectionio/queue_handlers.py b/changedetectionio/queue_handlers.py index 1143d33e..4750484a 100644 --- a/changedetectionio/queue_handlers.py +++ b/changedetectionio/queue_handlers.py @@ -106,7 +106,7 @@ class RecheckPriorityQueue: except queue_module.Empty: # Queue is empty with timeout - expected behavior, re-raise without logging - raise + raise # noqa except Exception as e: # Re-raise without logging - caller (worker) will handle and log appropriately raise diff --git a/changedetectionio/realtime/events.py b/changedetectionio/realtime/events.py index 7c0397dd..f9176079 100644 --- a/changedetectionio/realtime/events.py +++ b/changedetectionio/realtime/events.py @@ -37,9 +37,9 @@ def register_watch_operation_handlers(socketio, datastore): # Import here to avoid circular imports from changedetectionio.flask_app import update_q from changedetectionio import queuedWatchMetaData - from changedetectionio import worker_handler + from changedetectionio import worker_pool - worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) + worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) logger.info(f"Socket.IO: Queued recheck for watch {uuid}") else: emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'}) diff --git a/changedetectionio/realtime/socket_server.py b/changedetectionio/realtime/socket_server.py index ae96e772..f975a0c2 100644 --- a/changedetectionio/realtime/socket_server.py +++ b/changedetectionio/realtime/socket_server.py @@ -145,10 +145,10 @@ def handle_watch_update(socketio, **kwargs): # Emit the watch update to all connected clients from changedetectionio.flask_app import update_q from changedetectionio.flask_app import _jinja2_filter_datetime - from changedetectionio import worker_handler + from changedetectionio import worker_pool # Get list of watches that are currently running - running_uuids = worker_handler.get_running_uuids() + running_uuids = worker_pool.get_running_uuids() # Get list of watches in the queue (efficient single-lock method) queue_list = update_q.get_queued_uuids() @@ -252,7 +252,7 @@ def init_socketio(app, datastore): def event_checkbox_operations(data): from changedetectionio.blueprint.ui import _handle_operations from changedetectionio import queuedWatchMetaData - from changedetectionio import worker_handler + from changedetectionio import worker_pool from changedetectionio.flask_app import update_q, watch_check_update import threading @@ -268,7 +268,7 @@ def init_socketio(app, datastore): uuids=data.get('uuids'), datastore=datastore, extra_data=data.get('extra_data'), - worker_handler=worker_handler, + worker_pool=worker_pool, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData, watch_check_update=watch_check_update, diff --git a/changedetectionio/tests/conftest.py b/changedetectionio/tests/conftest.py index 702af2aa..be5e8f39 100644 --- a/changedetectionio/tests/conftest.py +++ b/changedetectionio/tests/conftest.py @@ -29,6 +29,24 @@ def reportlog(pytestconfig): logger.remove(handler_id) +@pytest.fixture(scope="session") +def live_server_options(): + """Configure live_server to run in threaded mode for concurrent requests. + + CRITICAL: Without threaded=True, the test server is single-threaded and will + serialize all requests. This breaks tests that verify concurrent worker behavior. + + With threaded=True: + - Multiple workers can fetch from test server concurrently + - Test endpoints with time.sleep(delay) don't block other requests + - Real concurrency testing is possible + """ + return { + 'threaded': True, # Enable multi-threading for concurrent requests + 'port': 0, # Use random available port + } + + @pytest.fixture def environment(mocker): """Mock arrow.now() to return a fixed datetime for testing jinja2 time extension.""" @@ -187,6 +205,54 @@ def prepare_test_function(live_server, datastore_path): logger.debug(f"prepare_test_function: Reloaded datastore at {hex(id(datastore))}") logger.debug(f"prepare_test_function: Path {datastore.datastore_path}") + # Add test helper methods to the app for worker management + def set_workers(count): + """Set the number of workers for testing - brutal shutdown, no delays""" + from changedetectionio import worker_pool + from changedetectionio.flask_app import update_q, notification_q + + current_count = worker_pool.get_worker_count() + + # Special case: Setting to 0 means shutdown all workers brutally + if count == 0: + logger.debug(f"Brutally shutting down all {current_count} workers") + worker_pool.shutdown_workers() + return { + 'status': 'success', + 'message': f'Shutdown all {current_count} workers', + 'previous_count': current_count, + 'current_count': 0 + } + + # Adjust worker count (no delays, no verification) + result = worker_pool.adjust_async_worker_count( + count, + update_q=update_q, + notification_q=notification_q, + app=live_server.app, + datastore=datastore + ) + + return result + + def check_all_workers_alive(expected_count): + """Check that all expected workers are alive""" + from changedetectionio import worker_pool + from changedetectionio.flask_app import update_q, notification_q + result = worker_pool.check_worker_health( + expected_count, + update_q=update_q, + notification_q=notification_q, + app=live_server.app, + datastore=datastore + ) + assert result['status'] == 'healthy', f"Workers not healthy: {result['message']}" + return result + + # Attach helper methods to app for easy test access + live_server.app.set_workers = set_workers + live_server.app.check_all_workers_alive = check_all_workers_alive + yield # Cleanup: Clear watches and queue after test @@ -262,8 +328,8 @@ def app(request, datastore_path): # Shutdown workers gracefully before loguru cleanup try: - from changedetectionio import worker_handler - worker_handler.shutdown_workers() + from changedetectionio import worker_pool + worker_pool.shutdown_workers() except Exception: pass diff --git a/changedetectionio/tests/test_queue_handler.py b/changedetectionio/tests/test_queue_handler.py new file mode 100644 index 00000000..bc58106c --- /dev/null +++ b/changedetectionio/tests/test_queue_handler.py @@ -0,0 +1,41 @@ +import os +import time +from flask import url_for +from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output +from ..notification import valid_notification_formats +from loguru import logger + +def test_queue_system(client, live_server, measure_memory_usage, datastore_path): + """Test that multiple workers can process queue concurrently without blocking each other""" + # (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)). + items = os.cpu_count() +3 + delay = 10 + # Auto-queue is off here. + live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True + + test_urls = [ + f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}" + for i in range(0, items) + ] + + # Import 30 URLs to queue + res = client.post( + url_for("imports.import_page"), + data={"urls": "\r\n".join(test_urls)}, + follow_redirects=True + ) + assert f"{items} Imported".encode('utf-8') in res.data + + # Start 30 workers and verify all are alive + client.application.set_workers(items) + + start = time.time() + res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) + + wait_for_all_checks(client) + + # all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too) + total_time = (time.time() - start) + logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total") + # if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds + assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s" diff --git a/changedetectionio/tests/util.py b/changedetectionio/tests/util.py index 6d0fb977..b5cb793a 100644 --- a/changedetectionio/tests/util.py +++ b/changedetectionio/tests/util.py @@ -140,13 +140,13 @@ def delete_all_watches(client=None): def wait_for_all_checks(client=None): """ Waits until the queue is empty and workers are idle. - Delegates to worker_handler.wait_for_all_checks for shared logic. + Delegates to worker_pool.wait_for_all_checks for shared logic. """ from changedetectionio.flask_app import update_q as global_update_q - from changedetectionio import worker_handler + from changedetectionio import worker_pool time.sleep(0.05) # Use the shared wait logic from worker_handler - return worker_handler.wait_for_all_checks(global_update_q, timeout=150) + return worker_pool.wait_for_all_checks(global_update_q, timeout=150) def wait_for_watch_history(client, min_history_count=2, timeout=10): """ diff --git a/changedetectionio/async_update_worker.py b/changedetectionio/worker.py similarity index 99% rename from changedetectionio/async_update_worker.py rename to changedetectionio/worker.py index ce8c3f03..59cf6164 100644 --- a/changedetectionio/async_update_worker.py +++ b/changedetectionio/worker.py @@ -54,7 +54,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec try: # Use async interface with custom executor to avoid thread pool exhaustion - # With 30+ workers, we need executor sized to match (see worker_handler.py) + # With 30+ workers, we need executor sized to match (see worker_pool.py) queued_item_data = await asyncio.wait_for( q.async_get(executor=executor), timeout=1.0 @@ -91,11 +91,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec uuid = queued_item_data.item.get('uuid') # RACE CONDITION FIX: Atomically claim this UUID for processing - from changedetectionio import worker_handler + from changedetectionio import worker_pool from changedetectionio.queuedWatchMetaData import PrioritizedItem # Try to claim the UUID atomically - prevents duplicate processing - if not worker_handler.claim_uuid_for_processing(uuid, worker_id): + if not worker_pool.claim_uuid_for_processing(uuid, worker_id): # Already being processed by another worker logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring") @@ -105,7 +105,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec # Re-queue with lower priority so it gets checked again after current processing finishes deferred_priority = max(1000, queued_item_data.priority * 10) deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item) - worker_handler.queue_item_async_safe(q, deferred_item, silent=True) + worker_pool.queue_item_async_safe(q, deferred_item, silent=True) logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check") continue @@ -490,7 +490,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec logger.error(f"Exception while cleaning/quit after calling browser: {e}") try: # Release UUID from processing (thread-safe) - worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id) + worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id) # Send completion signal if watch: diff --git a/changedetectionio/worker_handler.py b/changedetectionio/worker_pool.py similarity index 97% rename from changedetectionio/worker_handler.py rename to changedetectionio/worker_pool.py index 51aa2ecc..b193e1cb 100644 --- a/changedetectionio/worker_handler.py +++ b/changedetectionio/worker_pool.py @@ -91,7 +91,8 @@ class WorkerThread: self.thread.start() def stop(self): - """Stop the worker thread""" + """Stop the worker thread brutally - no waiting""" + # Try to stop the event loop if it exists if self.loop and self.running: try: # Signal the loop to stop @@ -99,8 +100,7 @@ class WorkerThread: except RuntimeError: pass - if self.thread and self.thread.is_alive(): - self.thread.join(timeout=2.0) + # Don't wait - thread is daemon and will die when needed def start_async_workers(n_workers, update_q, notification_q, app, datastore): @@ -125,7 +125,7 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore): async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None): """Start a single async worker with auto-restart capability""" - from changedetectionio.async_update_worker import async_update_worker + from changedetectionio.worker import async_update_worker # Check if we're in pytest environment - if so, be more gentle with logging import os @@ -337,7 +337,7 @@ def queue_item_async_safe(update_q, item, silent=False): def shutdown_workers(): - """Shutdown all async workers fast and aggressively""" + """Shutdown all async workers brutally - no delays, no waiting""" global worker_threads # Check if we're in pytest environment - if so, be more gentle with logging @@ -345,16 +345,17 @@ def shutdown_workers(): in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ if not in_pytest: - logger.info("Fast shutdown of async workers initiated...") + logger.info("Brutal shutdown of async workers initiated...") - # Stop all worker threads + # Stop all worker event loops for worker in worker_threads: worker.stop() + # Clear immediately - threads are daemon and will die worker_threads.clear() if not in_pytest: - logger.info("Async workers fast shutdown complete") + logger.info("Async workers brutal shutdown complete")