diff --git a/.github/workflows/test-stack-reusable-workflow.yml b/.github/workflows/test-stack-reusable-workflow.yml index 42da7df8..d63eeadd 100644 --- a/.github/workflows/test-stack-reusable-workflow.yml +++ b/.github/workflows/test-stack-reusable-workflow.yml @@ -111,6 +111,32 @@ jobs: docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh' + - name: Test CLI options + run: | + docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network + docker run --name test-cdio-cli-opts --network changedet-network test-changedetectionio bash -c 'changedetectionio/test_cli_opts.sh' &> cli-opts-output.txt + echo "=== CLI Options Test Output ===" + cat cli-opts-output.txt + + - name: CLI Memory Test + run: | + echo "=== Checking CLI batch mode memory usage ===" + # Extract RSS memory value from output + RSS_MB=$(grep -oP "Memory consumption before worker shutdown: RSS=\K[\d.]+" cli-opts-output.txt | head -1 || echo "0") + echo "RSS Memory: ${RSS_MB} MB" + + # Check if RSS is less than 100MB + if [ -n "$RSS_MB" ]; then + if (( $(echo "$RSS_MB < 100" | bc -l) )); then + echo "✓ Memory usage is acceptable: ${RSS_MB} MB < 100 MB" + else + echo "✗ Memory usage too high: ${RSS_MB} MB >= 100 MB" + exit 1 + fi + else + echo "⚠ Could not extract memory usage, skipping check" + fi + - name: Extract memory report and logs if: always() uses: ./.github/actions/extract-memory-report @@ -125,6 +151,13 @@ jobs: name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }} path: output-logs + - name: Store CLI test output + if: always() + uses: actions/upload-artifact@v6 + with: + name: test-cdio-cli-opts-output-py${{ env.PYTHON_VERSION }} + path: cli-opts-output.txt + # Playwright tests playwright-tests: runs-on: ubuntu-latest diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index 6b6f4937..5b0a8877 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -6,19 +6,20 @@ __version__ = '0.52.9' from changedetectionio.strtobool import strtobool from json.decoder import JSONDecodeError + +from loguru import logger +import getopt import logging import os -import getopt import platform import signal - -import sys +import threading +import time # Eventlet completely removed - using threading mode for SocketIO # This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts -from changedetectionio import store -from changedetectionio.flask_app import changedetection_app -from loguru import logger +# Note: store and changedetection_app are imported inside main() to avoid +# initialization before argument parsing (allows --help to work without loading everything) # ============================================================================== # Multiprocessing Configuration - CRITICAL for Thread Safety @@ -83,11 +84,22 @@ def get_version(): def sigshutdown_handler(_signo, _stack_frame): name = signal.Signals(_signo).name logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated') - + # Set exit flag immediately to stop all loops app.config.exit.set() datastore.stop_thread = True - + + # Log memory consumption before shutting down workers (cross-platform) + try: + import psutil + process = psutil.Process() + mem_info = process.memory_info() + rss_mb = mem_info.rss / 1024 / 1024 + vms_mb = mem_info.vms / 1024 / 1024 + logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB") + except Exception as e: + logger.warning(f"Could not retrieve memory stats: {str(e)}") + # Shutdown workers and queues immediately try: from changedetectionio import worker_handler @@ -125,10 +137,51 @@ def main(): global datastore global app + # Early help/version check before any initialization + if '--help' in sys.argv or '-help' in sys.argv: + print('Usage: changedetection.py [options]') + print('') + print('Standard options:') + print(' -s SSL enable') + print(' -h HOST Listen host (default: 0.0.0.0)') + print(' -p PORT Listen port (default: 5000)') + print(' -d PATH Datastore path') + print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)') + print(' -c Cleanup unused snapshots') + print(' -C Create datastore directory if it doesn\'t exist') + print('') + print('Add URLs on startup:') + print(' -u URL Add URL to watch (can be used multiple times)') + print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')') + print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)') + print(' -u2 \'JSON\' Set options for third -u URL, etc.') + print(' Available options: processor, fetch_backend, headers, method, etc.') + print(' See model/Watch.py for all available options') + print('') + print('Recheck on startup:') + print(' -r all Queue all watches for recheck on startup') + print(' -r UUID,... Queue specific watches (comma-separated UUIDs)') + print(' -r all N Queue all watches, wait for completion, repeat N times') + print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times') + print('') + print('Batch mode:') + print(' -b Run in batch mode (process queue then exit)') + print(' Useful for CI/CD, cron jobs, or one-time checks') + print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use') + print(' Use -p PORT to specify a different port if needed') + print('') + sys.exit(0) + + if '--version' in sys.argv or '-v' in sys.argv: + print(f'changedetection.io {__version__}') + sys.exit(0) + + # Import heavy modules after help/version checks to keep startup fast for those flags + from changedetectionio import store + from changedetectionio.flask_app import changedetection_app + datastore_path = None do_cleanup = False - # Optional URL to watch since start - default_url = None # Set a default logger level logger_level = 'DEBUG' include_default_watches = True @@ -137,6 +190,13 @@ def main(): port = int(os.environ.get('PORT', 5000)) ssl_mode = False + # Lists for multiple URLs and their options + urls_to_add = [] + url_options = {} # Key: index (0-based), Value: dict of options + recheck_watches = None # None, 'all', or list of UUIDs + recheck_repeat_count = 1 # Number of times to repeat recheck cycle + batch_mode = False # Run once then exit when queue is empty + # On Windows, create and use a default path. if os.name == 'nt': datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io') @@ -145,10 +205,98 @@ def main(): # Must be absolute so that send_from_directory doesnt try to make it relative to backend/ datastore_path = os.path.join(os.getcwd(), "../datastore") + # Pre-process arguments to extract -u, -u, and -r options before getopt + # This allows unlimited -u0, -u1, -u2, ... options without predefining them + cleaned_argv = ['changedetection.py'] # Start with program name + i = 1 + while i < len(sys.argv): + arg = sys.argv[i] + + # Handle -u (add URL) + if arg == '-u' and i + 1 < len(sys.argv): + urls_to_add.append(sys.argv[i + 1]) + i += 2 + continue + + # Handle -u (set options for URL at index N) + if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit(): + idx = int(arg[2:]) + if i + 1 < len(sys.argv): + try: + import json + url_options[idx] = json.loads(sys.argv[i + 1]) + except json.JSONDecodeError as e: + print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}') + print(f'JSON decode error: {e}') + sys.exit(2) + i += 2 + continue + + # Handle -r (recheck watches) + if arg == '-r' and i + 1 < len(sys.argv): + recheck_arg = sys.argv[i + 1] + if recheck_arg.lower() == 'all': + recheck_watches = 'all' + else: + # Parse comma-separated list of UUIDs + recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()] + + # Check for optional repeat count as third argument + if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit(): + recheck_repeat_count = int(sys.argv[i + 2]) + if recheck_repeat_count < 1: + print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}') + sys.exit(2) + i += 3 + else: + i += 2 + continue + + # Handle -b (batch mode - run once and exit) + if arg == '-b': + batch_mode = True + i += 1 + continue + + # Keep other arguments for getopt + cleaned_argv.append(arg) + i += 1 + try: - opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port") - except getopt.GetoptError: - print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]') + opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port") + except getopt.GetoptError as e: + print('Usage: changedetection.py [options]') + print('') + print('Standard options:') + print(' -s SSL enable') + print(' -h HOST Listen host (default: 0.0.0.0)') + print(' -p PORT Listen port (default: 5000)') + print(' -d PATH Datastore path') + print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)') + print(' -c Cleanup unused snapshots') + print(' -C Create datastore directory if it doesn\'t exist') + print('') + print('Add URLs on startup:') + print(' -u URL Add URL to watch (can be used multiple times)') + print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')') + print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)') + print(' -u2 \'JSON\' Set options for third -u URL, etc.') + print(' Available options: processor, fetch_backend, headers, method, etc.') + print(' See model/Watch.py for all available options') + print('') + print('Recheck on startup:') + print(' -r all Queue all watches for recheck on startup') + print(' -r UUID,... Queue specific watches (comma-separated UUIDs)') + print(' -r all N Queue all watches, wait for completion, repeat N times') + print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times') + print('') + print('Batch mode:') + print(' -b Run in batch mode (process queue then exit)') + print(' Useful for CI/CD, cron jobs, or one-time checks') + print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use') + print(' Use -p PORT to specify a different port if needed') + print('') + print(f'Error: {e}') sys.exit(2) create_datastore_dir = False @@ -173,10 +321,6 @@ def main(): if opt == '-d': datastore_path = arg - if opt == '-u': - default_url = arg - include_default_watches = False - # Cleanup (remove text files that arent in the index) if opt == '-c': do_cleanup = True @@ -188,6 +332,10 @@ def main(): if opt == '-l': logger_level = int(arg) if arg.isdigit() else arg.upper() + # If URLs are provided, don't include default watches + if urls_to_add: + include_default_watches = False + logger.success(f"changedetection.io version {get_version()} starting.") # Launch using SocketIO run method for proper integration (if enabled) @@ -224,11 +372,16 @@ def main(): logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING) # isnt there some @thingy to attach to each route to tell it, that this route needs a datastore - app_config = {'datastore_path': datastore_path} + app_config = { + 'datastore_path': datastore_path, + 'batch_mode': batch_mode, + 'recheck_watches': recheck_watches, + 'recheck_repeat_count': recheck_repeat_count + } if not os.path.isdir(app_config['datastore_path']): if create_datastore_dir: - os.mkdir(app_config['datastore_path']) + os.makedirs(app_config['datastore_path'], exist_ok=True) else: logger.critical( f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'" @@ -249,11 +402,200 @@ def main(): from changedetectionio.pluggy_interface import inject_datastore_into_plugins inject_datastore_into_plugins(datastore) - if default_url: - datastore.add_watch(url = default_url) + # Step 1: Add URLs with their options (if provided via -u flags) + added_watch_uuids = [] + if urls_to_add: + logger.info(f"Adding {len(urls_to_add)} URL(s) from command line") + for idx, url in enumerate(urls_to_add): + extras = url_options.get(idx, {}) + if extras: + logger.debug(f"Adding watch {idx}: {url} with options: {extras}") + else: + logger.debug(f"Adding watch {idx}: {url}") + + new_uuid = datastore.add_watch(url=url, extras=extras) + if new_uuid: + added_watch_uuids.append(new_uuid) + logger.success(f"Added watch: {url} (UUID: {new_uuid})") + else: + logger.error(f"Failed to add watch: {url}") app = changedetection_app(app_config, datastore) + # Step 2: Queue newly added watches (if -u was provided in batch mode) + # This must happen AFTER app initialization so update_q is available + if batch_mode and added_watch_uuids: + from changedetectionio.flask_app import update_q + from changedetectionio import queuedWatchMetaData, worker_handler + + logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches") + for watch_uuid in added_watch_uuids: + try: + worker_handler.queue_item_async_safe( + update_q, + queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) + ) + logger.debug(f"Queued newly added watch: {watch_uuid}") + except Exception as e: + logger.error(f"Failed to queue watch {watch_uuid}: {e}") + + # Step 3: Queue watches for recheck (if -r was provided) + # This must happen AFTER app initialization so update_q is available + if recheck_watches is not None: + from changedetectionio.flask_app import update_q + from changedetectionio import queuedWatchMetaData, worker_handler + + watches_to_queue = [] + if recheck_watches == 'all': + # Queue all watches, excluding those already queued in batch mode + all_watches = list(datastore.data['watching'].keys()) + if batch_mode and added_watch_uuids: + # Exclude newly added watches that were already queued in batch mode + watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids] + logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)") + else: + watches_to_queue = all_watches + logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck") + else: + # Queue specific UUIDs + watches_to_queue = recheck_watches + logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck") + + queued_count = 0 + for watch_uuid in watches_to_queue: + if watch_uuid in datastore.data['watching']: + try: + worker_handler.queue_item_async_safe( + update_q, + queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) + ) + queued_count += 1 + logger.debug(f"Queued watch for recheck: {watch_uuid}") + except Exception as e: + logger.error(f"Failed to queue watch {watch_uuid}: {e}") + else: + logger.warning(f"Watch UUID not found in datastore: {watch_uuid}") + + logger.success(f"Successfully queued {queued_count} watches for recheck") + + # Step 4: Setup batch mode monitor (if -b was provided) + if batch_mode: + from changedetectionio.flask_app import update_q + + # Safety check: Ensure Flask app is not already running on this port + # Batch mode should never run alongside the web server + import socket + test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + + try: + # Try to bind to the configured host:port (no SO_REUSEADDR - strict check) + test_socket.bind((host, port)) + test_socket.close() + logger.debug(f"Batch mode: Port {port} is available (Flask app not running)") + except OSError as e: + test_socket.close() + # errno 98 = EADDRINUSE (Linux) + # errno 48 = EADDRINUSE (macOS) + # errno 10048 = WSAEADDRINUSE (Windows) + if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower(): + logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use") + logger.critical(f"The Flask web server appears to be running on {host}:{port}") + logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)") + logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT") + sys.exit(1) + else: + # Some other socket error - log but continue (might be network configuration issue) + logger.warning(f"Port availability check failed with unexpected error: {e}") + logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts") + + def queue_watches_for_recheck(datastore, iteration): + """Helper function to queue watches for recheck""" + watches_to_queue = [] + if recheck_watches == 'all': + all_watches = list(datastore.data['watching'].keys()) + if batch_mode and added_watch_uuids and iteration == 1: + # Only exclude newly added watches on first iteration + watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids] + else: + watches_to_queue = all_watches + logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches") + elif recheck_watches: + watches_to_queue = recheck_watches + logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches") + + queued_count = 0 + for watch_uuid in watches_to_queue: + if watch_uuid in datastore.data['watching']: + try: + worker_handler.queue_item_async_safe( + update_q, + queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) + ) + queued_count += 1 + except Exception as e: + logger.error(f"Failed to queue watch {watch_uuid}: {e}") + else: + logger.warning(f"Watch UUID not found in datastore: {watch_uuid}") + logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches") + return queued_count + + def batch_mode_monitor(): + """Monitor queue and workers, shutdown or repeat when work is complete""" + import time + + # Track iterations if repeat mode is enabled + current_iteration = 1 + total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1 + + if total_iterations > 1: + logger.info(f"Batch mode: Will repeat recheck {total_iterations} times") + else: + logger.info("Batch mode: Waiting for all queued items to complete...") + + # Wait a bit for workers to start processing + time.sleep(3) + + try: + while current_iteration <= total_iterations: + logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...") + + # Use the shared wait_for_all_checks function + completed = worker_handler.wait_for_all_checks(update_q, timeout=300) + + if not completed: + logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds") + + logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed") + + # Check if we need to repeat + if current_iteration < total_iterations: + logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...") + current_iteration += 1 + + # Re-queue watches for next iteration + queue_watches_for_recheck(datastore, current_iteration) + + # Brief pause before continuing + time.sleep(2) + else: + # All iterations complete + logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown") + # Trigger shutdown + import os, signal + os.kill(os.getpid(), signal.SIGTERM) + return + + except Exception as e: + logger.error(f"Batch mode monitor error: {e}") + logger.error(f"Initiating emergency shutdown") + import os, signal + os.kill(os.getpid(), signal.SIGTERM) + + # Start monitor in background thread + monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor") + monitor_thread.start() + logger.info("Batch mode enabled: Will exit after all queued items are processed") + # Get the SocketIO instance from the Flask app (created in flask_app.py) from changedetectionio.flask_app import socketio_server global socketio @@ -314,20 +656,33 @@ def main(): app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1) - # SocketIO instance is already initialized in flask_app.py - if socketio_server: - if ssl_mode: - logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") - socketio.run(app, host=host, port=int(port), debug=False, - ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True) - else: - socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True) + # In batch mode, skip starting the HTTP server - just keep workers running + if batch_mode: + logger.info("Batch mode: Skipping HTTP server startup, workers will process queue") + logger.info("Batch mode: Main thread will wait for shutdown signal") + # Keep main thread alive until batch monitor triggers shutdown + try: + while True: + time.sleep(1) + except KeyboardInterrupt: + logger.info("Batch mode: Keyboard interrupt received") + pass else: - # Run Flask app without Socket.IO if disabled - logger.info("Starting Flask app without Socket.IO server") - if ssl_mode: - logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") - app.run(host=host, port=int(port), debug=False, - ssl_context=(ssl_cert_file, ssl_privkey_file)) + # Normal mode: Start HTTP server + # SocketIO instance is already initialized in flask_app.py + if socketio_server: + if ssl_mode: + logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") + socketio.run(app, host=host, port=int(port), debug=False, + ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True) + else: + socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True) else: - app.run(host=host, port=int(port), debug=False) + # Run Flask app without Socket.IO if disabled + logger.info("Starting Flask app without Socket.IO server") + if ssl_mode: + logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") + app.run(host=host, port=int(port), debug=False, + ssl_context=(ssl_cert_file, ssl_privkey_file)) + else: + app.run(host=host, port=int(port), debug=False) diff --git a/changedetectionio/async_update_worker.py b/changedetectionio/async_update_worker.py index afb59901..cabed7c5 100644 --- a/changedetectionio/async_update_worker.py +++ b/changedetectionio/async_update_worker.py @@ -89,11 +89,16 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec continue uuid = queued_item_data.item.get('uuid') - # RACE CONDITION FIX: Check if this UUID is already being processed by another worker + + # RACE CONDITION FIX: Atomically claim this UUID for processing from changedetectionio import worker_handler from changedetectionio.queuedWatchMetaData import PrioritizedItem - if worker_handler.is_watch_running_by_another_worker(uuid, worker_id): - logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring") + + # Try to claim the UUID atomically - prevents duplicate processing + if not worker_handler.claim_uuid_for_processing(uuid, worker_id): + # Already being processed by another worker + logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring") + # Sleep to avoid tight loop and give the other worker time to finish await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED) @@ -105,9 +110,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec continue fetch_start_time = round(time.time()) - - # Mark this UUID as being processed by this worker - worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True) try: if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'): @@ -487,8 +489,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec except Exception as e: logger.error(f"Exception while cleaning/quit after calling browser: {e}") try: - # Mark UUID as no longer being processed by this worker - worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False) + # Release UUID from processing (thread-safe) + worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id) # Send completion signal if watch: diff --git a/changedetectionio/flask_app.py b/changedetectionio/flask_app.py index 4333cd2c..5a9d7adc 100644 --- a/changedetectionio/flask_app.py +++ b/changedetectionio/flask_app.py @@ -401,7 +401,10 @@ def changedetection_app(config=None, datastore_o=None): # so far just for read-only via tests, but this will be moved eventually to be the main source # (instead of the global var) app.config['DATASTORE'] = datastore_o - + + # Store batch mode flag to skip background threads when running in batch mode + app.config['batch_mode'] = config.get('batch_mode', False) if config else False + # Store the signal in the app config to ensure it's accessible everywhere app.config['watch_check_update_SIGNAL'] = watch_check_update @@ -805,6 +808,8 @@ def changedetection_app(config=None, datastore_o=None): # Initialize Socket.IO server conditionally based on settings socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True) + if socket_io_enabled and app.config.get('batch_mode'): + socket_io_enabled = False if socket_io_enabled: from changedetectionio.realtime.socket_server import init_socketio global socketio_server @@ -902,14 +907,19 @@ def changedetection_app(config=None, datastore_o=None): logger.info(f"Starting {n_workers} workers during app initialization") worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) - # @todo handle ctrl break - ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start() - threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start() + # Skip background threads in batch mode (just process queue and exit) + batch_mode = app.config.get('batch_mode', False) + if not batch_mode: + # @todo handle ctrl break + ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start() + threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start() - in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ - # Check for new release version, but not when running in test/build or pytest - if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest: - threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start() + in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ + # Check for new release version, but not when running in test/build or pytest + if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest: + threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start() + else: + logger.info("Batch mode: Skipping ticker thread, notification runner, and version checker") # Return the Flask app - the Socket.IO will be attached to it but initialized separately # This avoids circular dependencies diff --git a/changedetectionio/store.py b/changedetectionio/store.py index 330c5948..296faf78 100644 --- a/changedetectionio/store.py +++ b/changedetectionio/store.py @@ -338,7 +338,6 @@ class ChangeDetectionStore: self.needs_write_urgent = True def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True): - import requests if extras is None: extras = {} @@ -349,6 +348,8 @@ class ChangeDetectionStore: # Was it a share link? try to fetch the data if (url.startswith("https://changedetection.io/share/")): + import requests + try: r = requests.request(method="GET", url=url, diff --git a/changedetectionio/test_cli_opts.sh b/changedetectionio/test_cli_opts.sh new file mode 100755 index 00000000..fc7e5e71 --- /dev/null +++ b/changedetectionio/test_cli_opts.sh @@ -0,0 +1,243 @@ +#!/bin/bash +# Test script for CLI options - Parallel execution +# Tests -u, -uN, -r, -b flags + +set -u # Exit on undefined variables + +# Color output +RED='\033[0;31m' +GREEN='\033[0;32m' +YELLOW='\033[1;33m' +NC='\033[0m' # No Color + +# Test results directory (for parallel safety) +TEST_RESULTS_DIR="/tmp/cli-test-results-$$" +mkdir -p "$TEST_RESULTS_DIR" + +# Cleanup function +cleanup() { + echo "" + echo "=== Cleaning up test directories ===" + rm -rf /tmp/cli-test-* 2>/dev/null || true + rm -rf "$TEST_RESULTS_DIR" 2>/dev/null || true + # Kill any hanging processes + pkill -f "changedetection.py.*cli-test" 2>/dev/null || true +} +trap cleanup EXIT + +# Helper to record test result +record_result() { + local test_num=$1 + local status=$2 # pass or fail + local message=$3 + + echo "$status|$message" > "$TEST_RESULTS_DIR/test_${test_num}.result" +} + +# Run a test in background +run_test() { + local test_num=$1 + local test_name=$2 + local test_func=$3 + + ( + echo -e "${YELLOW}[Test $test_num]${NC} $test_name" + if $test_func "$test_num"; then + record_result "$test_num" "pass" "$test_name" + echo -e "${GREEN}✓ PASS${NC}: $test_name" + else + record_result "$test_num" "fail" "$test_name" + echo -e "${RED}✗ FAIL${NC}: $test_name" + fi + ) & +} + +# ============================================================================= +# Test Functions (each runs independently) +# ============================================================================= + +test_help_flag() { + local test_id=$1 + timeout 3 python3 changedetection.py --help 2>&1 | grep -q "Add URLs on startup" +} + +test_version_flag() { + local test_id=$1 + timeout 3 python3 changedetection.py --version 2>&1 | grep -qE "changedetection.io [0-9]+\.[0-9]+" +} + +test_single_url() { + local test_id=$1 + local dir="/tmp/cli-test-single-${test_id}-$$" + timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null + [ -f "$dir/url-watches.json" ] && \ + [ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ] +} + +test_multiple_urls() { + local test_id=$1 + local dir="/tmp/cli-test-multi-${test_id}-$$" + timeout 12 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -u https://github.com \ + -u https://httpbin.org \ + -b &>/dev/null + [ -f "$dir/url-watches.json" ] && \ + [ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ] +} + +test_url_with_options() { + local test_id=$1 + local dir="/tmp/cli-test-opts-${test_id}-$$" + timeout 10 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -u0 '{"title":"Test Site","processor":"text_json_diff"}' \ + -b &>/dev/null + [ -f "$dir/url-watches.json" ] && \ + python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)" +} + +test_multiple_urls_with_options() { + local test_id=$1 + local dir="/tmp/cli-test-multi-opts-${test_id}-$$" + timeout 12 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -u0 '{"title":"Site One"}' \ + -u https://github.com \ + -u1 '{"title":"Site Two"}' \ + -b &>/dev/null + [ -f "$dir/url-watches.json" ] && \ + [ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \ + python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)" +} + +test_batch_mode_exit() { + local test_id=$1 + local dir="/tmp/cli-test-batch-${test_id}-$$" + local start=$(date +%s) + timeout 15 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -b &>/dev/null + local end=$(date +%s) + local elapsed=$((end - start)) + [ $elapsed -lt 14 ] +} + +test_recheck_all() { + local test_id=$1 + local dir="/tmp/cli-test-recheck-all-${test_id}-$$" + mkdir -p "$dir" + cat > "$dir/url-watches.json" << 'EOF' +{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}} +EOF + timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all" +} + +test_recheck_specific() { + local test_id=$1 + local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$" + mkdir -p "$dir" + cat > "$dir/url-watches.json" << 'EOF' +{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}} +EOF + timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches" +} + +test_combined_operations() { + local test_id=$1 + local dir="/tmp/cli-test-combined-${test_id}-$$" + timeout 12 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -u https://github.com \ + -r all \ + -b &>/dev/null + [ -f "$dir/url-watches.json" ] && \ + [ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] +} + +test_invalid_json() { + local test_id=$1 + local dir="/tmp/cli-test-invalid-${test_id}-$$" + timeout 5 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -u0 'invalid json here' \ + 2>&1 | grep -qi "invalid json\|json decode error" +} + +test_create_directory() { + local test_id=$1 + local dir="/tmp/cli-test-create-${test_id}-$$/nested/path" + timeout 10 python3 changedetection.py -d "$dir" -C \ + -u https://example.com \ + -b &>/dev/null + [ -d "$dir" ] +} + +# ============================================================================= +# Main Test Execution +# ============================================================================= + +echo "==========================================" +echo " CLI Options Test Suite (Parallel)" +echo "==========================================" +echo "" + +# Launch all tests in parallel +run_test 1 "Help flag (--help) shows usage without initialization" test_help_flag +run_test 2 "Version flag (--version) displays version" test_version_flag +run_test 3 "Add single URL with -u flag" test_single_url +run_test 4 "Add multiple URLs with multiple -u flags" test_multiple_urls +run_test 5 "Add URL with JSON options using -u0" test_url_with_options +run_test 6 "Add multiple URLs with different options (-u0, -u1)" test_multiple_urls_with_options +run_test 7 "Batch mode (-b) exits automatically after processing" test_batch_mode_exit +run_test 8 "Recheck all watches with -r all" test_recheck_all +run_test 9 "Recheck specific watches with -r UUID" test_recheck_specific +run_test 10 "Combined: Add URLs and recheck all with -u and -r all" test_combined_operations +run_test 11 "Invalid JSON in -u0 option should show error" test_invalid_json +run_test 12 "Create datastore directory with -C flag" test_create_directory + +# Wait for all tests to complete +echo "" +echo "Waiting for all tests to complete..." +wait + +# Collect results +echo "" +echo "==========================================" +echo " Test Summary" +echo "==========================================" + +TESTS_RUN=0 +TESTS_PASSED=0 +TESTS_FAILED=0 + +for result_file in "$TEST_RESULTS_DIR"/test_*.result; do + if [ -f "$result_file" ]; then + TESTS_RUN=$((TESTS_RUN + 1)) + status=$(cut -d'|' -f1 < "$result_file") + if [ "$status" = "pass" ]; then + TESTS_PASSED=$((TESTS_PASSED + 1)) + else + TESTS_FAILED=$((TESTS_FAILED + 1)) + fi + fi +done + +echo "Tests run: $TESTS_RUN" +echo -e "${GREEN}Tests passed: $TESTS_PASSED${NC}" +if [ $TESTS_FAILED -gt 0 ]; then + echo -e "${RED}Tests failed: $TESTS_FAILED${NC}" +else + echo -e "${GREEN}Tests failed: $TESTS_FAILED${NC}" +fi +echo "==========================================" +echo "" + +# Exit with appropriate code +if [ $TESTS_FAILED -gt 0 ]; then + echo -e "${RED}Some tests failed!${NC}" + exit 1 +else + echo -e "${GREEN}All tests passed!${NC}" + exit 0 +fi diff --git a/changedetectionio/tests/conftest.py b/changedetectionio/tests/conftest.py index 68f3dd03..702af2aa 100644 --- a/changedetectionio/tests/conftest.py +++ b/changedetectionio/tests/conftest.py @@ -5,13 +5,11 @@ from threading import Thread import pytest import arrow -from changedetectionio import changedetection_app from changedetectionio import store import os import sys -from loguru import logger -from changedetectionio.flask_app import init_app_secret +from changedetectionio.flask_app import init_app_secret, changedetection_app from changedetectionio.tests.util import live_server_setup, new_live_server_setup # https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py diff --git a/changedetectionio/tests/util.py b/changedetectionio/tests/util.py index 2b8dda98..a531d4a5 100644 --- a/changedetectionio/tests/util.py +++ b/changedetectionio/tests/util.py @@ -140,38 +140,13 @@ def delete_all_watches(client=None): def wait_for_all_checks(client=None): """ Waits until the queue is empty and workers are idle. - Much faster than the original with adaptive timing. + Delegates to worker_handler.wait_for_all_checks for shared logic. """ from changedetectionio.flask_app import update_q as global_update_q from changedetectionio import worker_handler - empty_since = None - attempt = 0 - max_attempts = 150 # Still reasonable upper bound - while attempt < max_attempts: - # Start with fast checks, slow down if needed - if attempt < 10: - time.sleep(0.2) # Very fast initial checks - elif attempt < 30: - time.sleep(0.4) # Medium speed - else: - time.sleep(0.8) # Slower for persistent issues - - q_length = global_update_q.qsize() - running_uuids = worker_handler.get_running_uuids() - any_workers_busy = len(running_uuids) > 0 - - if q_length == 0 and not any_workers_busy: - if empty_since is None: - empty_since = time.time() - # Brief stabilization period for async workers - elif time.time() - empty_since >= 0.3: - break - else: - empty_since = None - - attempt += 1 - time.sleep(0.3) + # Use the shared wait logic from worker_handler + return worker_handler.wait_for_all_checks(global_update_q, timeout=150) def wait_for_watch_history(client, min_history_count=2, timeout=10): """ diff --git a/changedetectionio/worker_handler.py b/changedetectionio/worker_handler.py index addcb6a5..51aa2ecc 100644 --- a/changedetectionio/worker_handler.py +++ b/changedetectionio/worker_handler.py @@ -17,6 +17,7 @@ worker_threads = [] # List of WorkerThread objects # Track currently processing UUIDs for async workers - maps {uuid: worker_id} currently_processing_uuids = {} +_uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids # Configuration - async workers only USE_ASYNC_WORKERS = True @@ -207,31 +208,80 @@ def get_worker_count(): def get_running_uuids(): """Get list of UUIDs currently being processed by async workers""" - return list(currently_processing_uuids.keys()) + with _uuid_processing_lock: + return list(currently_processing_uuids.keys()) + + +def claim_uuid_for_processing(uuid, worker_id): + """ + Atomically check if UUID is available and claim it for processing. + + This is thread-safe and prevents race conditions where multiple workers + try to process the same UUID simultaneously. + + Args: + uuid: The watch UUID to claim + worker_id: The ID of the worker claiming this UUID + + Returns: + True if successfully claimed (UUID was not being processed) + False if already being processed by another worker + """ + with _uuid_processing_lock: + if uuid in currently_processing_uuids: + # Already being processed by another worker + return False + # Claim it atomically + currently_processing_uuids[uuid] = worker_id + logger.debug(f"Worker {worker_id} claimed UUID: {uuid}") + return True + + +def release_uuid_from_processing(uuid, worker_id): + """ + Release a UUID from processing (thread-safe). + + Args: + uuid: The watch UUID to release + worker_id: The ID of the worker releasing this UUID + """ + with _uuid_processing_lock: + # Only remove if this worker owns it (defensive) + if currently_processing_uuids.get(uuid) == worker_id: + currently_processing_uuids.pop(uuid, None) + logger.debug(f"Worker {worker_id} released UUID: {uuid}") + else: + logger.warning(f"Worker {worker_id} tried to release UUID {uuid} but doesn't own it (owned by {currently_processing_uuids.get(uuid, 'nobody')})") def set_uuid_processing(uuid, worker_id=None, processing=True): - """Mark a UUID as being processed or completed by a specific worker""" - global currently_processing_uuids + """ + Mark a UUID as being processed or completed by a specific worker. + + DEPRECATED: Use claim_uuid_for_processing() and release_uuid_from_processing() instead. + This function is kept for backward compatibility but doesn't provide atomic check-and-set. + """ if processing: - currently_processing_uuids[uuid] = worker_id - logger.debug(f"Worker {worker_id} started processing UUID: {uuid}") + with _uuid_processing_lock: + currently_processing_uuids[uuid] = worker_id + logger.debug(f"Worker {worker_id} started processing UUID: {uuid}") else: - currently_processing_uuids.pop(uuid, None) - logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}") + release_uuid_from_processing(uuid, worker_id) def is_watch_running(watch_uuid): """Check if a specific watch is currently being processed by any worker""" - return watch_uuid in currently_processing_uuids + with _uuid_processing_lock: + return watch_uuid in currently_processing_uuids def is_watch_running_by_another_worker(watch_uuid, current_worker_id): """Check if a specific watch is currently being processed by a different worker""" - if watch_uuid not in currently_processing_uuids: - return False - processing_worker_id = currently_processing_uuids[watch_uuid] - return processing_worker_id != current_worker_id + with _uuid_processing_lock: + if watch_uuid not in currently_processing_uuids: + return False + processing_worker_id = currently_processing_uuids[watch_uuid] + return processing_worker_id != current_worker_id def queue_item_async_safe(update_q, item, silent=False): @@ -381,6 +431,53 @@ def get_worker_status(): } +def wait_for_all_checks(update_q, timeout=150): + """ + Wait for queue to be empty and all workers to be idle. + + Args: + update_q: The update queue to monitor + timeout: Maximum wait time in seconds (default 150 = 150 iterations * 0.2-0.8s) + + Returns: + bool: True if all checks completed, False if timeout + """ + import time + empty_since = None + attempt = 0 + max_attempts = timeout + + while attempt < max_attempts: + # Adaptive sleep - start fast, slow down if needed + if attempt < 10: + sleep_time = 0.2 # Very fast initial checks + elif attempt < 30: + sleep_time = 0.4 # Medium speed + else: + sleep_time = 0.8 # Slower for persistent issues + + time.sleep(sleep_time) + + q_length = update_q.qsize() + running_uuids = get_running_uuids() + any_workers_busy = len(running_uuids) > 0 + + if q_length == 0 and not any_workers_busy: + if empty_since is None: + empty_since = time.time() + # Brief stabilization period for async workers + elif time.time() - empty_since >= 0.3: + # Add small buffer for filesystem operations to complete + time.sleep(0.2) + return True + else: + empty_since = None + + attempt += 1 + + return False # Timeout + + def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None): """ Check if the expected number of async workers are running and restart any missing ones.