Compare commits

...

8 Commits

Author SHA1 Message Date
dgtlmoon
04e22f1b0a Be sure Batch Mode cant run when flask mode is running
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-24 13:42:21 +01:00
dgtlmoon
2a0e8cbf6c Repeat checks 2026-01-24 13:40:38 +01:00
dgtlmoon
901d69af42 Better cross-worker watch UUID management 2026-01-24 13:29:20 +01:00
dgtlmoon
528c305928 setup tweaks 2026-01-24 13:13:10 +01:00
dgtlmoon
a4357c7bb7 fi ximports 2026-01-24 13:13:00 +01:00
dgtlmoon
75db43fc09 batch mode fixes 2026-01-23 12:14:44 +01:00
dgtlmoon
f8c6c62107 Adding CLI options 2026-01-23 11:37:06 +01:00
dgtlmoon
4523918752 Adding memory info stats to output log on app shutdown 2026-01-23 10:34:02 +01:00
9 changed files with 810 additions and 96 deletions

View File

@@ -111,6 +111,32 @@ jobs:
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh' docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh'
- name: Test CLI options
run: |
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
docker run --name test-cdio-cli-opts --network changedet-network test-changedetectionio bash -c 'changedetectionio/test_cli_opts.sh' &> cli-opts-output.txt
echo "=== CLI Options Test Output ==="
cat cli-opts-output.txt
- name: CLI Memory Test
run: |
echo "=== Checking CLI batch mode memory usage ==="
# Extract RSS memory value from output
RSS_MB=$(grep -oP "Memory consumption before worker shutdown: RSS=\K[\d.]+" cli-opts-output.txt | head -1 || echo "0")
echo "RSS Memory: ${RSS_MB} MB"
# Check if RSS is less than 100MB
if [ -n "$RSS_MB" ]; then
if (( $(echo "$RSS_MB < 100" | bc -l) )); then
echo "✓ Memory usage is acceptable: ${RSS_MB} MB < 100 MB"
else
echo "✗ Memory usage too high: ${RSS_MB} MB >= 100 MB"
exit 1
fi
else
echo "⚠ Could not extract memory usage, skipping check"
fi
- name: Extract memory report and logs - name: Extract memory report and logs
if: always() if: always()
uses: ./.github/actions/extract-memory-report uses: ./.github/actions/extract-memory-report
@@ -125,6 +151,13 @@ jobs:
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }} name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
path: output-logs path: output-logs
- name: Store CLI test output
if: always()
uses: actions/upload-artifact@v6
with:
name: test-cdio-cli-opts-output-py${{ env.PYTHON_VERSION }}
path: cli-opts-output.txt
# Playwright tests # Playwright tests
playwright-tests: playwright-tests:
runs-on: ubuntu-latest runs-on: ubuntu-latest

View File

@@ -6,19 +6,20 @@ __version__ = '0.52.9'
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
from loguru import logger
import getopt
import logging import logging
import os import os
import getopt
import platform import platform
import signal import signal
import threading
import sys import time
# Eventlet completely removed - using threading mode for SocketIO # Eventlet completely removed - using threading mode for SocketIO
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts # This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
from changedetectionio import store # Note: store and changedetection_app are imported inside main() to avoid
from changedetectionio.flask_app import changedetection_app # initialization before argument parsing (allows --help to work without loading everything)
from loguru import logger
# ============================================================================== # ==============================================================================
# Multiprocessing Configuration - CRITICAL for Thread Safety # Multiprocessing Configuration - CRITICAL for Thread Safety
@@ -83,11 +84,22 @@ def get_version():
def sigshutdown_handler(_signo, _stack_frame): def sigshutdown_handler(_signo, _stack_frame):
name = signal.Signals(_signo).name name = signal.Signals(_signo).name
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated') logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
# Set exit flag immediately to stop all loops # Set exit flag immediately to stop all loops
app.config.exit.set() app.config.exit.set()
datastore.stop_thread = True datastore.stop_thread = True
# Log memory consumption before shutting down workers (cross-platform)
try:
import psutil
process = psutil.Process()
mem_info = process.memory_info()
rss_mb = mem_info.rss / 1024 / 1024
vms_mb = mem_info.vms / 1024 / 1024
logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB")
except Exception as e:
logger.warning(f"Could not retrieve memory stats: {str(e)}")
# Shutdown workers and queues immediately # Shutdown workers and queues immediately
try: try:
from changedetectionio import worker_handler from changedetectionio import worker_handler
@@ -125,10 +137,51 @@ def main():
global datastore global datastore
global app global app
# Early help/version check before any initialization
if '--help' in sys.argv or '-help' in sys.argv:
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print(' -r all N Queue all watches, wait for completion, repeat N times')
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
print(' Use -p PORT to specify a different port if needed')
print('')
sys.exit(0)
if '--version' in sys.argv or '-v' in sys.argv:
print(f'changedetection.io {__version__}')
sys.exit(0)
# Import heavy modules after help/version checks to keep startup fast for those flags
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app
datastore_path = None datastore_path = None
do_cleanup = False do_cleanup = False
# Optional URL to watch since start
default_url = None
# Set a default logger level # Set a default logger level
logger_level = 'DEBUG' logger_level = 'DEBUG'
include_default_watches = True include_default_watches = True
@@ -137,6 +190,13 @@ def main():
port = int(os.environ.get('PORT', 5000)) port = int(os.environ.get('PORT', 5000))
ssl_mode = False ssl_mode = False
# Lists for multiple URLs and their options
urls_to_add = []
url_options = {} # Key: index (0-based), Value: dict of options
recheck_watches = None # None, 'all', or list of UUIDs
recheck_repeat_count = 1 # Number of times to repeat recheck cycle
batch_mode = False # Run once then exit when queue is empty
# On Windows, create and use a default path. # On Windows, create and use a default path.
if os.name == 'nt': if os.name == 'nt':
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io') datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
@@ -145,10 +205,98 @@ def main():
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/ # Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "../datastore") datastore_path = os.path.join(os.getcwd(), "../datastore")
# Pre-process arguments to extract -u, -u<N>, and -r options before getopt
# This allows unlimited -u0, -u1, -u2, ... options without predefining them
cleaned_argv = ['changedetection.py'] # Start with program name
i = 1
while i < len(sys.argv):
arg = sys.argv[i]
# Handle -u (add URL)
if arg == '-u' and i + 1 < len(sys.argv):
urls_to_add.append(sys.argv[i + 1])
i += 2
continue
# Handle -u<N> (set options for URL at index N)
if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit():
idx = int(arg[2:])
if i + 1 < len(sys.argv):
try:
import json
url_options[idx] = json.loads(sys.argv[i + 1])
except json.JSONDecodeError as e:
print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}')
print(f'JSON decode error: {e}')
sys.exit(2)
i += 2
continue
# Handle -r (recheck watches)
if arg == '-r' and i + 1 < len(sys.argv):
recheck_arg = sys.argv[i + 1]
if recheck_arg.lower() == 'all':
recheck_watches = 'all'
else:
# Parse comma-separated list of UUIDs
recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()]
# Check for optional repeat count as third argument
if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit():
recheck_repeat_count = int(sys.argv[i + 2])
if recheck_repeat_count < 1:
print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}')
sys.exit(2)
i += 3
else:
i += 2
continue
# Handle -b (batch mode - run once and exit)
if arg == '-b':
batch_mode = True
i += 1
continue
# Keep other arguments for getopt
cleaned_argv.append(arg)
i += 1
try: try:
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port") opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port")
except getopt.GetoptError: except getopt.GetoptError as e:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]') print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print(' -r all N Queue all watches, wait for completion, repeat N times')
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
print(' Use -p PORT to specify a different port if needed')
print('')
print(f'Error: {e}')
sys.exit(2) sys.exit(2)
create_datastore_dir = False create_datastore_dir = False
@@ -173,10 +321,6 @@ def main():
if opt == '-d': if opt == '-d':
datastore_path = arg datastore_path = arg
if opt == '-u':
default_url = arg
include_default_watches = False
# Cleanup (remove text files that arent in the index) # Cleanup (remove text files that arent in the index)
if opt == '-c': if opt == '-c':
do_cleanup = True do_cleanup = True
@@ -188,6 +332,10 @@ def main():
if opt == '-l': if opt == '-l':
logger_level = int(arg) if arg.isdigit() else arg.upper() logger_level = int(arg) if arg.isdigit() else arg.upper()
# If URLs are provided, don't include default watches
if urls_to_add:
include_default_watches = False
logger.success(f"changedetection.io version {get_version()} starting.") logger.success(f"changedetection.io version {get_version()} starting.")
# Launch using SocketIO run method for proper integration (if enabled) # Launch using SocketIO run method for proper integration (if enabled)
@@ -224,11 +372,16 @@ def main():
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING) logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore # isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {'datastore_path': datastore_path} app_config = {
'datastore_path': datastore_path,
'batch_mode': batch_mode,
'recheck_watches': recheck_watches,
'recheck_repeat_count': recheck_repeat_count
}
if not os.path.isdir(app_config['datastore_path']): if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir: if create_datastore_dir:
os.mkdir(app_config['datastore_path']) os.makedirs(app_config['datastore_path'], exist_ok=True)
else: else:
logger.critical( logger.critical(
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'" f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
@@ -249,11 +402,200 @@ def main():
from changedetectionio.pluggy_interface import inject_datastore_into_plugins from changedetectionio.pluggy_interface import inject_datastore_into_plugins
inject_datastore_into_plugins(datastore) inject_datastore_into_plugins(datastore)
if default_url: # Step 1: Add URLs with their options (if provided via -u flags)
datastore.add_watch(url = default_url) added_watch_uuids = []
if urls_to_add:
logger.info(f"Adding {len(urls_to_add)} URL(s) from command line")
for idx, url in enumerate(urls_to_add):
extras = url_options.get(idx, {})
if extras:
logger.debug(f"Adding watch {idx}: {url} with options: {extras}")
else:
logger.debug(f"Adding watch {idx}: {url}")
new_uuid = datastore.add_watch(url=url, extras=extras)
if new_uuid:
added_watch_uuids.append(new_uuid)
logger.success(f"Added watch: {url} (UUID: {new_uuid})")
else:
logger.error(f"Failed to add watch: {url}")
app = changedetection_app(app_config, datastore) app = changedetection_app(app_config, datastore)
# Step 2: Queue newly added watches (if -u was provided in batch mode)
# This must happen AFTER app initialization so update_q is available
if batch_mode and added_watch_uuids:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
for watch_uuid in added_watch_uuids:
try:
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
logger.debug(f"Queued newly added watch: {watch_uuid}")
except Exception as e:
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
# Step 3: Queue watches for recheck (if -r was provided)
# This must happen AFTER app initialization so update_q is available
if recheck_watches is not None:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
watches_to_queue = []
if recheck_watches == 'all':
# Queue all watches, excluding those already queued in batch mode
all_watches = list(datastore.data['watching'].keys())
if batch_mode and added_watch_uuids:
# Exclude newly added watches that were already queued in batch mode
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)")
else:
watches_to_queue = all_watches
logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck")
else:
# Queue specific UUIDs
watches_to_queue = recheck_watches
logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck")
queued_count = 0
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
queued_count += 1
logger.debug(f"Queued watch for recheck: {watch_uuid}")
except Exception as e:
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
else:
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
logger.success(f"Successfully queued {queued_count} watches for recheck")
# Step 4: Setup batch mode monitor (if -b was provided)
if batch_mode:
from changedetectionio.flask_app import update_q
# Safety check: Ensure Flask app is not already running on this port
# Batch mode should never run alongside the web server
import socket
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
try:
# Try to bind to the configured host:port (no SO_REUSEADDR - strict check)
test_socket.bind((host, port))
test_socket.close()
logger.debug(f"Batch mode: Port {port} is available (Flask app not running)")
except OSError as e:
test_socket.close()
# errno 98 = EADDRINUSE (Linux)
# errno 48 = EADDRINUSE (macOS)
# errno 10048 = WSAEADDRINUSE (Windows)
if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower():
logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use")
logger.critical(f"The Flask web server appears to be running on {host}:{port}")
logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)")
logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT")
sys.exit(1)
else:
# Some other socket error - log but continue (might be network configuration issue)
logger.warning(f"Port availability check failed with unexpected error: {e}")
logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts")
def queue_watches_for_recheck(datastore, iteration):
"""Helper function to queue watches for recheck"""
watches_to_queue = []
if recheck_watches == 'all':
all_watches = list(datastore.data['watching'].keys())
if batch_mode and added_watch_uuids and iteration == 1:
# Only exclude newly added watches on first iteration
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
else:
watches_to_queue = all_watches
logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches")
elif recheck_watches:
watches_to_queue = recheck_watches
logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches")
queued_count = 0
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
queued_count += 1
except Exception as e:
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
else:
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches")
return queued_count
def batch_mode_monitor():
"""Monitor queue and workers, shutdown or repeat when work is complete"""
import time
# Track iterations if repeat mode is enabled
current_iteration = 1
total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1
if total_iterations > 1:
logger.info(f"Batch mode: Will repeat recheck {total_iterations} times")
else:
logger.info("Batch mode: Waiting for all queued items to complete...")
# Wait a bit for workers to start processing
time.sleep(3)
try:
while current_iteration <= total_iterations:
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
# Use the shared wait_for_all_checks function
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
if not completed:
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed")
# Check if we need to repeat
if current_iteration < total_iterations:
logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...")
current_iteration += 1
# Re-queue watches for next iteration
queue_watches_for_recheck(datastore, current_iteration)
# Brief pause before continuing
time.sleep(2)
else:
# All iterations complete
logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown")
# Trigger shutdown
import os, signal
os.kill(os.getpid(), signal.SIGTERM)
return
except Exception as e:
logger.error(f"Batch mode monitor error: {e}")
logger.error(f"Initiating emergency shutdown")
import os, signal
os.kill(os.getpid(), signal.SIGTERM)
# Start monitor in background thread
monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor")
monitor_thread.start()
logger.info("Batch mode enabled: Will exit after all queued items are processed")
# Get the SocketIO instance from the Flask app (created in flask_app.py) # Get the SocketIO instance from the Flask app (created in flask_app.py)
from changedetectionio.flask_app import socketio_server from changedetectionio.flask_app import socketio_server
global socketio global socketio
@@ -314,20 +656,33 @@ def main():
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1) app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
# SocketIO instance is already initialized in flask_app.py # In batch mode, skip starting the HTTP server - just keep workers running
if socketio_server: if batch_mode:
if ssl_mode: logger.info("Batch mode: Skipping HTTP server startup, workers will process queue")
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") logger.info("Batch mode: Main thread will wait for shutdown signal")
socketio.run(app, host=host, port=int(port), debug=False, # Keep main thread alive until batch monitor triggers shutdown
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True) try:
else: while True:
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True) time.sleep(1)
except KeyboardInterrupt:
logger.info("Batch mode: Keyboard interrupt received")
pass
else: else:
# Run Flask app without Socket.IO if disabled # Normal mode: Start HTTP server
logger.info("Starting Flask app without Socket.IO server") # SocketIO instance is already initialized in flask_app.py
if ssl_mode: if socketio_server:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") if ssl_mode:
app.run(host=host, port=int(port), debug=False, logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
ssl_context=(ssl_cert_file, ssl_privkey_file)) socketio.run(app, host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
else:
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
else: else:
app.run(host=host, port=int(port), debug=False) # Run Flask app without Socket.IO if disabled
logger.info("Starting Flask app without Socket.IO server")
if ssl_mode:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
app.run(host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file))
else:
app.run(host=host, port=int(port), debug=False)

View File

@@ -89,11 +89,16 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
continue continue
uuid = queued_item_data.item.get('uuid') uuid = queued_item_data.item.get('uuid')
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
# RACE CONDITION FIX: Atomically claim this UUID for processing
from changedetectionio import worker_handler from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem from changedetectionio.queuedWatchMetaData import PrioritizedItem
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring") # Try to claim the UUID atomically - prevents duplicate processing
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
# Already being processed by another worker
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
# Sleep to avoid tight loop and give the other worker time to finish # Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED) await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
@@ -105,9 +110,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
continue continue
fetch_start_time = round(time.time()) fetch_start_time = round(time.time())
# Mark this UUID as being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
try: try:
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'): if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
@@ -487,8 +489,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
except Exception as e: except Exception as e:
logger.error(f"Exception while cleaning/quit after calling browser: {e}") logger.error(f"Exception while cleaning/quit after calling browser: {e}")
try: try:
# Mark UUID as no longer being processed by this worker # Release UUID from processing (thread-safe)
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False) worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
# Send completion signal # Send completion signal
if watch: if watch:

View File

@@ -401,7 +401,10 @@ def changedetection_app(config=None, datastore_o=None):
# so far just for read-only via tests, but this will be moved eventually to be the main source # so far just for read-only via tests, but this will be moved eventually to be the main source
# (instead of the global var) # (instead of the global var)
app.config['DATASTORE'] = datastore_o app.config['DATASTORE'] = datastore_o
# Store batch mode flag to skip background threads when running in batch mode
app.config['batch_mode'] = config.get('batch_mode', False) if config else False
# Store the signal in the app config to ensure it's accessible everywhere # Store the signal in the app config to ensure it's accessible everywhere
app.config['watch_check_update_SIGNAL'] = watch_check_update app.config['watch_check_update_SIGNAL'] = watch_check_update
@@ -805,6 +808,8 @@ def changedetection_app(config=None, datastore_o=None):
# Initialize Socket.IO server conditionally based on settings # Initialize Socket.IO server conditionally based on settings
socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True) socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True)
if socket_io_enabled and app.config.get('batch_mode'):
socket_io_enabled = False
if socket_io_enabled: if socket_io_enabled:
from changedetectionio.realtime.socket_server import init_socketio from changedetectionio.realtime.socket_server import init_socketio
global socketio_server global socketio_server
@@ -902,14 +907,19 @@ def changedetection_app(config=None, datastore_o=None):
logger.info(f"Starting {n_workers} workers during app initialization") logger.info(f"Starting {n_workers} workers during app initialization")
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# @todo handle ctrl break # Skip background threads in batch mode (just process queue and exit)
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start() batch_mode = app.config.get('batch_mode', False)
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start() if not batch_mode:
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest # Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest: if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start() threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
else:
logger.info("Batch mode: Skipping ticker thread, notification runner, and version checker")
# Return the Flask app - the Socket.IO will be attached to it but initialized separately # Return the Flask app - the Socket.IO will be attached to it but initialized separately
# This avoids circular dependencies # This avoids circular dependencies

View File

@@ -338,7 +338,6 @@ class ChangeDetectionStore:
self.needs_write_urgent = True self.needs_write_urgent = True
def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True): def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True):
import requests
if extras is None: if extras is None:
extras = {} extras = {}
@@ -349,6 +348,8 @@ class ChangeDetectionStore:
# Was it a share link? try to fetch the data # Was it a share link? try to fetch the data
if (url.startswith("https://changedetection.io/share/")): if (url.startswith("https://changedetection.io/share/")):
import requests
try: try:
r = requests.request(method="GET", r = requests.request(method="GET",
url=url, url=url,

View File

@@ -0,0 +1,243 @@
#!/bin/bash
# Test script for CLI options - Parallel execution
# Tests -u, -uN, -r, -b flags
set -u # Exit on undefined variables
# Color output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Test results directory (for parallel safety)
TEST_RESULTS_DIR="/tmp/cli-test-results-$$"
mkdir -p "$TEST_RESULTS_DIR"
# Cleanup function
cleanup() {
echo ""
echo "=== Cleaning up test directories ==="
rm -rf /tmp/cli-test-* 2>/dev/null || true
rm -rf "$TEST_RESULTS_DIR" 2>/dev/null || true
# Kill any hanging processes
pkill -f "changedetection.py.*cli-test" 2>/dev/null || true
}
trap cleanup EXIT
# Helper to record test result
record_result() {
local test_num=$1
local status=$2 # pass or fail
local message=$3
echo "$status|$message" > "$TEST_RESULTS_DIR/test_${test_num}.result"
}
# Run a test in background
run_test() {
local test_num=$1
local test_name=$2
local test_func=$3
(
echo -e "${YELLOW}[Test $test_num]${NC} $test_name"
if $test_func "$test_num"; then
record_result "$test_num" "pass" "$test_name"
echo -e "${GREEN}✓ PASS${NC}: $test_name"
else
record_result "$test_num" "fail" "$test_name"
echo -e "${RED}✗ FAIL${NC}: $test_name"
fi
) &
}
# =============================================================================
# Test Functions (each runs independently)
# =============================================================================
test_help_flag() {
local test_id=$1
timeout 3 python3 changedetection.py --help 2>&1 | grep -q "Add URLs on startup"
}
test_version_flag() {
local test_id=$1
timeout 3 python3 changedetection.py --version 2>&1 | grep -qE "changedetection.io [0-9]+\.[0-9]+"
}
test_single_url() {
local test_id=$1
local dir="/tmp/cli-test-single-${test_id}-$$"
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ]
}
test_multiple_urls() {
local test_id=$1
local dir="/tmp/cli-test-multi-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u https://github.com \
-u https://httpbin.org \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ]
}
test_url_with_options() {
local test_id=$1
local dir="/tmp/cli-test-opts-${test_id}-$$"
timeout 10 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 '{"title":"Test Site","processor":"text_json_diff"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)"
}
test_multiple_urls_with_options() {
local test_id=$1
local dir="/tmp/cli-test-multi-opts-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 '{"title":"Site One"}' \
-u https://github.com \
-u1 '{"title":"Site Two"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)"
}
test_batch_mode_exit() {
local test_id=$1
local dir="/tmp/cli-test-batch-${test_id}-$$"
local start=$(date +%s)
timeout 15 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-b &>/dev/null
local end=$(date +%s)
local elapsed=$((end - start))
[ $elapsed -lt 14 ]
}
test_recheck_all() {
local test_id=$1
local dir="/tmp/cli-test-recheck-all-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all"
}
test_recheck_specific() {
local test_id=$1
local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches"
}
test_combined_operations() {
local test_id=$1
local dir="/tmp/cli-test-combined-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u https://github.com \
-r all \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ]
}
test_invalid_json() {
local test_id=$1
local dir="/tmp/cli-test-invalid-${test_id}-$$"
timeout 5 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 'invalid json here' \
2>&1 | grep -qi "invalid json\|json decode error"
}
test_create_directory() {
local test_id=$1
local dir="/tmp/cli-test-create-${test_id}-$$/nested/path"
timeout 10 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-b &>/dev/null
[ -d "$dir" ]
}
# =============================================================================
# Main Test Execution
# =============================================================================
echo "=========================================="
echo " CLI Options Test Suite (Parallel)"
echo "=========================================="
echo ""
# Launch all tests in parallel
run_test 1 "Help flag (--help) shows usage without initialization" test_help_flag
run_test 2 "Version flag (--version) displays version" test_version_flag
run_test 3 "Add single URL with -u flag" test_single_url
run_test 4 "Add multiple URLs with multiple -u flags" test_multiple_urls
run_test 5 "Add URL with JSON options using -u0" test_url_with_options
run_test 6 "Add multiple URLs with different options (-u0, -u1)" test_multiple_urls_with_options
run_test 7 "Batch mode (-b) exits automatically after processing" test_batch_mode_exit
run_test 8 "Recheck all watches with -r all" test_recheck_all
run_test 9 "Recheck specific watches with -r UUID" test_recheck_specific
run_test 10 "Combined: Add URLs and recheck all with -u and -r all" test_combined_operations
run_test 11 "Invalid JSON in -u0 option should show error" test_invalid_json
run_test 12 "Create datastore directory with -C flag" test_create_directory
# Wait for all tests to complete
echo ""
echo "Waiting for all tests to complete..."
wait
# Collect results
echo ""
echo "=========================================="
echo " Test Summary"
echo "=========================================="
TESTS_RUN=0
TESTS_PASSED=0
TESTS_FAILED=0
for result_file in "$TEST_RESULTS_DIR"/test_*.result; do
if [ -f "$result_file" ]; then
TESTS_RUN=$((TESTS_RUN + 1))
status=$(cut -d'|' -f1 < "$result_file")
if [ "$status" = "pass" ]; then
TESTS_PASSED=$((TESTS_PASSED + 1))
else
TESTS_FAILED=$((TESTS_FAILED + 1))
fi
fi
done
echo "Tests run: $TESTS_RUN"
echo -e "${GREEN}Tests passed: $TESTS_PASSED${NC}"
if [ $TESTS_FAILED -gt 0 ]; then
echo -e "${RED}Tests failed: $TESTS_FAILED${NC}"
else
echo -e "${GREEN}Tests failed: $TESTS_FAILED${NC}"
fi
echo "=========================================="
echo ""
# Exit with appropriate code
if [ $TESTS_FAILED -gt 0 ]; then
echo -e "${RED}Some tests failed!${NC}"
exit 1
else
echo -e "${GREEN}All tests passed!${NC}"
exit 0
fi

View File

@@ -5,13 +5,11 @@ from threading import Thread
import pytest import pytest
import arrow import arrow
from changedetectionio import changedetection_app
from changedetectionio import store from changedetectionio import store
import os import os
import sys import sys
from loguru import logger
from changedetectionio.flask_app import init_app_secret from changedetectionio.flask_app import init_app_secret, changedetection_app
from changedetectionio.tests.util import live_server_setup, new_live_server_setup from changedetectionio.tests.util import live_server_setup, new_live_server_setup
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py # https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py

View File

@@ -140,38 +140,13 @@ def delete_all_watches(client=None):
def wait_for_all_checks(client=None): def wait_for_all_checks(client=None):
""" """
Waits until the queue is empty and workers are idle. Waits until the queue is empty and workers are idle.
Much faster than the original with adaptive timing. Delegates to worker_handler.wait_for_all_checks for shared logic.
""" """
from changedetectionio.flask_app import update_q as global_update_q from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_handler from changedetectionio import worker_handler
empty_since = None
attempt = 0
max_attempts = 150 # Still reasonable upper bound
while attempt < max_attempts: # Use the shared wait logic from worker_handler
# Start with fast checks, slow down if needed return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
if attempt < 10:
time.sleep(0.2) # Very fast initial checks
elif attempt < 30:
time.sleep(0.4) # Medium speed
else:
time.sleep(0.8) # Slower for persistent issues
q_length = global_update_q.qsize()
running_uuids = worker_handler.get_running_uuids()
any_workers_busy = len(running_uuids) > 0
if q_length == 0 and not any_workers_busy:
if empty_since is None:
empty_since = time.time()
# Brief stabilization period for async workers
elif time.time() - empty_since >= 0.3:
break
else:
empty_since = None
attempt += 1
time.sleep(0.3)
def wait_for_watch_history(client, min_history_count=2, timeout=10): def wait_for_watch_history(client, min_history_count=2, timeout=10):
""" """

View File

@@ -17,6 +17,7 @@ worker_threads = [] # List of WorkerThread objects
# Track currently processing UUIDs for async workers - maps {uuid: worker_id} # Track currently processing UUIDs for async workers - maps {uuid: worker_id}
currently_processing_uuids = {} currently_processing_uuids = {}
_uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids
# Configuration - async workers only # Configuration - async workers only
USE_ASYNC_WORKERS = True USE_ASYNC_WORKERS = True
@@ -207,31 +208,80 @@ def get_worker_count():
def get_running_uuids(): def get_running_uuids():
"""Get list of UUIDs currently being processed by async workers""" """Get list of UUIDs currently being processed by async workers"""
return list(currently_processing_uuids.keys()) with _uuid_processing_lock:
return list(currently_processing_uuids.keys())
def claim_uuid_for_processing(uuid, worker_id):
"""
Atomically check if UUID is available and claim it for processing.
This is thread-safe and prevents race conditions where multiple workers
try to process the same UUID simultaneously.
Args:
uuid: The watch UUID to claim
worker_id: The ID of the worker claiming this UUID
Returns:
True if successfully claimed (UUID was not being processed)
False if already being processed by another worker
"""
with _uuid_processing_lock:
if uuid in currently_processing_uuids:
# Already being processed by another worker
return False
# Claim it atomically
currently_processing_uuids[uuid] = worker_id
logger.debug(f"Worker {worker_id} claimed UUID: {uuid}")
return True
def release_uuid_from_processing(uuid, worker_id):
"""
Release a UUID from processing (thread-safe).
Args:
uuid: The watch UUID to release
worker_id: The ID of the worker releasing this UUID
"""
with _uuid_processing_lock:
# Only remove if this worker owns it (defensive)
if currently_processing_uuids.get(uuid) == worker_id:
currently_processing_uuids.pop(uuid, None)
logger.debug(f"Worker {worker_id} released UUID: {uuid}")
else:
logger.warning(f"Worker {worker_id} tried to release UUID {uuid} but doesn't own it (owned by {currently_processing_uuids.get(uuid, 'nobody')})")
def set_uuid_processing(uuid, worker_id=None, processing=True): def set_uuid_processing(uuid, worker_id=None, processing=True):
"""Mark a UUID as being processed or completed by a specific worker""" """
global currently_processing_uuids Mark a UUID as being processed or completed by a specific worker.
DEPRECATED: Use claim_uuid_for_processing() and release_uuid_from_processing() instead.
This function is kept for backward compatibility but doesn't provide atomic check-and-set.
"""
if processing: if processing:
currently_processing_uuids[uuid] = worker_id with _uuid_processing_lock:
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}") currently_processing_uuids[uuid] = worker_id
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
else: else:
currently_processing_uuids.pop(uuid, None) release_uuid_from_processing(uuid, worker_id)
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
def is_watch_running(watch_uuid): def is_watch_running(watch_uuid):
"""Check if a specific watch is currently being processed by any worker""" """Check if a specific watch is currently being processed by any worker"""
return watch_uuid in currently_processing_uuids with _uuid_processing_lock:
return watch_uuid in currently_processing_uuids
def is_watch_running_by_another_worker(watch_uuid, current_worker_id): def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
"""Check if a specific watch is currently being processed by a different worker""" """Check if a specific watch is currently being processed by a different worker"""
if watch_uuid not in currently_processing_uuids: with _uuid_processing_lock:
return False if watch_uuid not in currently_processing_uuids:
processing_worker_id = currently_processing_uuids[watch_uuid] return False
return processing_worker_id != current_worker_id processing_worker_id = currently_processing_uuids[watch_uuid]
return processing_worker_id != current_worker_id
def queue_item_async_safe(update_q, item, silent=False): def queue_item_async_safe(update_q, item, silent=False):
@@ -381,6 +431,53 @@ def get_worker_status():
} }
def wait_for_all_checks(update_q, timeout=150):
"""
Wait for queue to be empty and all workers to be idle.
Args:
update_q: The update queue to monitor
timeout: Maximum wait time in seconds (default 150 = 150 iterations * 0.2-0.8s)
Returns:
bool: True if all checks completed, False if timeout
"""
import time
empty_since = None
attempt = 0
max_attempts = timeout
while attempt < max_attempts:
# Adaptive sleep - start fast, slow down if needed
if attempt < 10:
sleep_time = 0.2 # Very fast initial checks
elif attempt < 30:
sleep_time = 0.4 # Medium speed
else:
sleep_time = 0.8 # Slower for persistent issues
time.sleep(sleep_time)
q_length = update_q.qsize()
running_uuids = get_running_uuids()
any_workers_busy = len(running_uuids) > 0
if q_length == 0 and not any_workers_busy:
if empty_since is None:
empty_since = time.time()
# Brief stabilization period for async workers
elif time.time() - empty_since >= 0.3:
# Add small buffer for filesystem operations to complete
time.sleep(0.2)
return True
else:
empty_since = None
attempt += 1
return False # Timeout
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None): def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
""" """
Check if the expected number of async workers are running and restart any missing ones. Check if the expected number of async workers are running and restart any missing ones.