mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-25 00:20:33 +00:00
Compare commits
3 Commits
dependabot
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4dec1e017b | ||
|
|
9d1743adbe | ||
|
|
f34d806b09 |
@@ -111,6 +111,32 @@ jobs:
|
||||
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
|
||||
docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh'
|
||||
|
||||
- name: Test CLI options
|
||||
run: |
|
||||
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
|
||||
docker run --name test-cdio-cli-opts --network changedet-network test-changedetectionio bash -c 'changedetectionio/test_cli_opts.sh' &> cli-opts-output.txt
|
||||
echo "=== CLI Options Test Output ==="
|
||||
cat cli-opts-output.txt
|
||||
|
||||
- name: CLI Memory Test
|
||||
run: |
|
||||
echo "=== Checking CLI batch mode memory usage ==="
|
||||
# Extract RSS memory value from output
|
||||
RSS_MB=$(grep -oP "Memory consumption before worker shutdown: RSS=\K[\d.]+" cli-opts-output.txt | head -1 || echo "0")
|
||||
echo "RSS Memory: ${RSS_MB} MB"
|
||||
|
||||
# Check if RSS is less than 100MB
|
||||
if [ -n "$RSS_MB" ]; then
|
||||
if (( $(echo "$RSS_MB < 100" | bc -l) )); then
|
||||
echo "✓ Memory usage is acceptable: ${RSS_MB} MB < 100 MB"
|
||||
else
|
||||
echo "✗ Memory usage too high: ${RSS_MB} MB >= 100 MB"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "⚠ Could not extract memory usage, skipping check"
|
||||
fi
|
||||
|
||||
- name: Extract memory report and logs
|
||||
if: always()
|
||||
uses: ./.github/actions/extract-memory-report
|
||||
@@ -125,6 +151,13 @@ jobs:
|
||||
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
|
||||
path: output-logs
|
||||
|
||||
- name: Store CLI test output
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v6
|
||||
with:
|
||||
name: test-cdio-cli-opts-output-py${{ env.PYTHON_VERSION }}
|
||||
path: cli-opts-output.txt
|
||||
|
||||
# Playwright tests
|
||||
playwright-tests:
|
||||
runs-on: ubuntu-latest
|
||||
|
||||
@@ -6,19 +6,20 @@ __version__ = '0.52.9'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
from loguru import logger
|
||||
import getopt
|
||||
import logging
|
||||
import os
|
||||
import getopt
|
||||
import platform
|
||||
import signal
|
||||
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
|
||||
# Eventlet completely removed - using threading mode for SocketIO
|
||||
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
|
||||
from changedetectionio import store
|
||||
from changedetectionio.flask_app import changedetection_app
|
||||
from loguru import logger
|
||||
# Note: store and changedetection_app are imported inside main() to avoid
|
||||
# initialization before argument parsing (allows --help to work without loading everything)
|
||||
|
||||
# ==============================================================================
|
||||
# Multiprocessing Configuration - CRITICAL for Thread Safety
|
||||
@@ -83,11 +84,22 @@ def get_version():
|
||||
def sigshutdown_handler(_signo, _stack_frame):
|
||||
name = signal.Signals(_signo).name
|
||||
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
|
||||
|
||||
|
||||
# Set exit flag immediately to stop all loops
|
||||
app.config.exit.set()
|
||||
datastore.stop_thread = True
|
||||
|
||||
|
||||
# Log memory consumption before shutting down workers (cross-platform)
|
||||
try:
|
||||
import psutil
|
||||
process = psutil.Process()
|
||||
mem_info = process.memory_info()
|
||||
rss_mb = mem_info.rss / 1024 / 1024
|
||||
vms_mb = mem_info.vms / 1024 / 1024
|
||||
logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB")
|
||||
except Exception as e:
|
||||
logger.warning(f"Could not retrieve memory stats: {str(e)}")
|
||||
|
||||
# Shutdown workers and queues immediately
|
||||
try:
|
||||
from changedetectionio import worker_handler
|
||||
@@ -125,10 +137,51 @@ def main():
|
||||
global datastore
|
||||
global app
|
||||
|
||||
# Early help/version check before any initialization
|
||||
if '--help' in sys.argv or '-help' in sys.argv:
|
||||
print('Usage: changedetection.py [options]')
|
||||
print('')
|
||||
print('Standard options:')
|
||||
print(' -s SSL enable')
|
||||
print(' -h HOST Listen host (default: 0.0.0.0)')
|
||||
print(' -p PORT Listen port (default: 5000)')
|
||||
print(' -d PATH Datastore path')
|
||||
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
|
||||
print(' -c Cleanup unused snapshots')
|
||||
print(' -C Create datastore directory if it doesn\'t exist')
|
||||
print('')
|
||||
print('Add URLs on startup:')
|
||||
print(' -u URL Add URL to watch (can be used multiple times)')
|
||||
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
|
||||
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
|
||||
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
|
||||
print(' Available options: processor, fetch_backend, headers, method, etc.')
|
||||
print(' See model/Watch.py for all available options')
|
||||
print('')
|
||||
print('Recheck on startup:')
|
||||
print(' -r all Queue all watches for recheck on startup')
|
||||
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
|
||||
print(' -r all N Queue all watches, wait for completion, repeat N times')
|
||||
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
|
||||
print('')
|
||||
print('Batch mode:')
|
||||
print(' -b Run in batch mode (process queue then exit)')
|
||||
print(' Useful for CI/CD, cron jobs, or one-time checks')
|
||||
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
|
||||
print(' Use -p PORT to specify a different port if needed')
|
||||
print('')
|
||||
sys.exit(0)
|
||||
|
||||
if '--version' in sys.argv or '-v' in sys.argv:
|
||||
print(f'changedetection.io {__version__}')
|
||||
sys.exit(0)
|
||||
|
||||
# Import heavy modules after help/version checks to keep startup fast for those flags
|
||||
from changedetectionio import store
|
||||
from changedetectionio.flask_app import changedetection_app
|
||||
|
||||
datastore_path = None
|
||||
do_cleanup = False
|
||||
# Optional URL to watch since start
|
||||
default_url = None
|
||||
# Set a default logger level
|
||||
logger_level = 'DEBUG'
|
||||
include_default_watches = True
|
||||
@@ -137,6 +190,13 @@ def main():
|
||||
port = int(os.environ.get('PORT', 5000))
|
||||
ssl_mode = False
|
||||
|
||||
# Lists for multiple URLs and their options
|
||||
urls_to_add = []
|
||||
url_options = {} # Key: index (0-based), Value: dict of options
|
||||
recheck_watches = None # None, 'all', or list of UUIDs
|
||||
recheck_repeat_count = 1 # Number of times to repeat recheck cycle
|
||||
batch_mode = False # Run once then exit when queue is empty
|
||||
|
||||
# On Windows, create and use a default path.
|
||||
if os.name == 'nt':
|
||||
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
|
||||
@@ -145,10 +205,98 @@ def main():
|
||||
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
|
||||
datastore_path = os.path.join(os.getcwd(), "../datastore")
|
||||
|
||||
# Pre-process arguments to extract -u, -u<N>, and -r options before getopt
|
||||
# This allows unlimited -u0, -u1, -u2, ... options without predefining them
|
||||
cleaned_argv = ['changedetection.py'] # Start with program name
|
||||
i = 1
|
||||
while i < len(sys.argv):
|
||||
arg = sys.argv[i]
|
||||
|
||||
# Handle -u (add URL)
|
||||
if arg == '-u' and i + 1 < len(sys.argv):
|
||||
urls_to_add.append(sys.argv[i + 1])
|
||||
i += 2
|
||||
continue
|
||||
|
||||
# Handle -u<N> (set options for URL at index N)
|
||||
if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit():
|
||||
idx = int(arg[2:])
|
||||
if i + 1 < len(sys.argv):
|
||||
try:
|
||||
import json
|
||||
url_options[idx] = json.loads(sys.argv[i + 1])
|
||||
except json.JSONDecodeError as e:
|
||||
print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}')
|
||||
print(f'JSON decode error: {e}')
|
||||
sys.exit(2)
|
||||
i += 2
|
||||
continue
|
||||
|
||||
# Handle -r (recheck watches)
|
||||
if arg == '-r' and i + 1 < len(sys.argv):
|
||||
recheck_arg = sys.argv[i + 1]
|
||||
if recheck_arg.lower() == 'all':
|
||||
recheck_watches = 'all'
|
||||
else:
|
||||
# Parse comma-separated list of UUIDs
|
||||
recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()]
|
||||
|
||||
# Check for optional repeat count as third argument
|
||||
if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit():
|
||||
recheck_repeat_count = int(sys.argv[i + 2])
|
||||
if recheck_repeat_count < 1:
|
||||
print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}')
|
||||
sys.exit(2)
|
||||
i += 3
|
||||
else:
|
||||
i += 2
|
||||
continue
|
||||
|
||||
# Handle -b (batch mode - run once and exit)
|
||||
if arg == '-b':
|
||||
batch_mode = True
|
||||
i += 1
|
||||
continue
|
||||
|
||||
# Keep other arguments for getopt
|
||||
cleaned_argv.append(arg)
|
||||
i += 1
|
||||
|
||||
try:
|
||||
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
|
||||
except getopt.GetoptError:
|
||||
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
|
||||
opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port")
|
||||
except getopt.GetoptError as e:
|
||||
print('Usage: changedetection.py [options]')
|
||||
print('')
|
||||
print('Standard options:')
|
||||
print(' -s SSL enable')
|
||||
print(' -h HOST Listen host (default: 0.0.0.0)')
|
||||
print(' -p PORT Listen port (default: 5000)')
|
||||
print(' -d PATH Datastore path')
|
||||
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
|
||||
print(' -c Cleanup unused snapshots')
|
||||
print(' -C Create datastore directory if it doesn\'t exist')
|
||||
print('')
|
||||
print('Add URLs on startup:')
|
||||
print(' -u URL Add URL to watch (can be used multiple times)')
|
||||
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
|
||||
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
|
||||
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
|
||||
print(' Available options: processor, fetch_backend, headers, method, etc.')
|
||||
print(' See model/Watch.py for all available options')
|
||||
print('')
|
||||
print('Recheck on startup:')
|
||||
print(' -r all Queue all watches for recheck on startup')
|
||||
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
|
||||
print(' -r all N Queue all watches, wait for completion, repeat N times')
|
||||
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
|
||||
print('')
|
||||
print('Batch mode:')
|
||||
print(' -b Run in batch mode (process queue then exit)')
|
||||
print(' Useful for CI/CD, cron jobs, or one-time checks')
|
||||
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
|
||||
print(' Use -p PORT to specify a different port if needed')
|
||||
print('')
|
||||
print(f'Error: {e}')
|
||||
sys.exit(2)
|
||||
|
||||
create_datastore_dir = False
|
||||
@@ -173,10 +321,6 @@ def main():
|
||||
if opt == '-d':
|
||||
datastore_path = arg
|
||||
|
||||
if opt == '-u':
|
||||
default_url = arg
|
||||
include_default_watches = False
|
||||
|
||||
# Cleanup (remove text files that arent in the index)
|
||||
if opt == '-c':
|
||||
do_cleanup = True
|
||||
@@ -188,6 +332,10 @@ def main():
|
||||
if opt == '-l':
|
||||
logger_level = int(arg) if arg.isdigit() else arg.upper()
|
||||
|
||||
# If URLs are provided, don't include default watches
|
||||
if urls_to_add:
|
||||
include_default_watches = False
|
||||
|
||||
|
||||
logger.success(f"changedetection.io version {get_version()} starting.")
|
||||
# Launch using SocketIO run method for proper integration (if enabled)
|
||||
@@ -224,11 +372,16 @@ def main():
|
||||
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
|
||||
|
||||
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
|
||||
app_config = {'datastore_path': datastore_path}
|
||||
app_config = {
|
||||
'datastore_path': datastore_path,
|
||||
'batch_mode': batch_mode,
|
||||
'recheck_watches': recheck_watches,
|
||||
'recheck_repeat_count': recheck_repeat_count
|
||||
}
|
||||
|
||||
if not os.path.isdir(app_config['datastore_path']):
|
||||
if create_datastore_dir:
|
||||
os.mkdir(app_config['datastore_path'])
|
||||
os.makedirs(app_config['datastore_path'], exist_ok=True)
|
||||
else:
|
||||
logger.critical(
|
||||
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
|
||||
@@ -249,11 +402,200 @@ def main():
|
||||
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
|
||||
inject_datastore_into_plugins(datastore)
|
||||
|
||||
if default_url:
|
||||
datastore.add_watch(url = default_url)
|
||||
# Step 1: Add URLs with their options (if provided via -u flags)
|
||||
added_watch_uuids = []
|
||||
if urls_to_add:
|
||||
logger.info(f"Adding {len(urls_to_add)} URL(s) from command line")
|
||||
for idx, url in enumerate(urls_to_add):
|
||||
extras = url_options.get(idx, {})
|
||||
if extras:
|
||||
logger.debug(f"Adding watch {idx}: {url} with options: {extras}")
|
||||
else:
|
||||
logger.debug(f"Adding watch {idx}: {url}")
|
||||
|
||||
new_uuid = datastore.add_watch(url=url, extras=extras)
|
||||
if new_uuid:
|
||||
added_watch_uuids.append(new_uuid)
|
||||
logger.success(f"Added watch: {url} (UUID: {new_uuid})")
|
||||
else:
|
||||
logger.error(f"Failed to add watch: {url}")
|
||||
|
||||
app = changedetection_app(app_config, datastore)
|
||||
|
||||
# Step 2: Queue newly added watches (if -u was provided in batch mode)
|
||||
# This must happen AFTER app initialization so update_q is available
|
||||
if batch_mode and added_watch_uuids:
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||
|
||||
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
|
||||
for watch_uuid in added_watch_uuids:
|
||||
try:
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
logger.debug(f"Queued newly added watch: {watch_uuid}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||
|
||||
# Step 3: Queue watches for recheck (if -r was provided)
|
||||
# This must happen AFTER app initialization so update_q is available
|
||||
if recheck_watches is not None:
|
||||
from changedetectionio.flask_app import update_q
|
||||
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||
|
||||
watches_to_queue = []
|
||||
if recheck_watches == 'all':
|
||||
# Queue all watches, excluding those already queued in batch mode
|
||||
all_watches = list(datastore.data['watching'].keys())
|
||||
if batch_mode and added_watch_uuids:
|
||||
# Exclude newly added watches that were already queued in batch mode
|
||||
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||||
logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)")
|
||||
else:
|
||||
watches_to_queue = all_watches
|
||||
logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck")
|
||||
else:
|
||||
# Queue specific UUIDs
|
||||
watches_to_queue = recheck_watches
|
||||
logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck")
|
||||
|
||||
queued_count = 0
|
||||
for watch_uuid in watches_to_queue:
|
||||
if watch_uuid in datastore.data['watching']:
|
||||
try:
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
queued_count += 1
|
||||
logger.debug(f"Queued watch for recheck: {watch_uuid}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||
else:
|
||||
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||||
|
||||
logger.success(f"Successfully queued {queued_count} watches for recheck")
|
||||
|
||||
# Step 4: Setup batch mode monitor (if -b was provided)
|
||||
if batch_mode:
|
||||
from changedetectionio.flask_app import update_q
|
||||
|
||||
# Safety check: Ensure Flask app is not already running on this port
|
||||
# Batch mode should never run alongside the web server
|
||||
import socket
|
||||
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
|
||||
try:
|
||||
# Try to bind to the configured host:port (no SO_REUSEADDR - strict check)
|
||||
test_socket.bind((host, port))
|
||||
test_socket.close()
|
||||
logger.debug(f"Batch mode: Port {port} is available (Flask app not running)")
|
||||
except OSError as e:
|
||||
test_socket.close()
|
||||
# errno 98 = EADDRINUSE (Linux)
|
||||
# errno 48 = EADDRINUSE (macOS)
|
||||
# errno 10048 = WSAEADDRINUSE (Windows)
|
||||
if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower():
|
||||
logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use")
|
||||
logger.critical(f"The Flask web server appears to be running on {host}:{port}")
|
||||
logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)")
|
||||
logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT")
|
||||
sys.exit(1)
|
||||
else:
|
||||
# Some other socket error - log but continue (might be network configuration issue)
|
||||
logger.warning(f"Port availability check failed with unexpected error: {e}")
|
||||
logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts")
|
||||
|
||||
def queue_watches_for_recheck(datastore, iteration):
|
||||
"""Helper function to queue watches for recheck"""
|
||||
watches_to_queue = []
|
||||
if recheck_watches == 'all':
|
||||
all_watches = list(datastore.data['watching'].keys())
|
||||
if batch_mode and added_watch_uuids and iteration == 1:
|
||||
# Only exclude newly added watches on first iteration
|
||||
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||||
else:
|
||||
watches_to_queue = all_watches
|
||||
logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches")
|
||||
elif recheck_watches:
|
||||
watches_to_queue = recheck_watches
|
||||
logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches")
|
||||
|
||||
queued_count = 0
|
||||
for watch_uuid in watches_to_queue:
|
||||
if watch_uuid in datastore.data['watching']:
|
||||
try:
|
||||
worker_handler.queue_item_async_safe(
|
||||
update_q,
|
||||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||
)
|
||||
queued_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||
else:
|
||||
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||||
logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches")
|
||||
return queued_count
|
||||
|
||||
def batch_mode_monitor():
|
||||
"""Monitor queue and workers, shutdown or repeat when work is complete"""
|
||||
import time
|
||||
|
||||
# Track iterations if repeat mode is enabled
|
||||
current_iteration = 1
|
||||
total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1
|
||||
|
||||
if total_iterations > 1:
|
||||
logger.info(f"Batch mode: Will repeat recheck {total_iterations} times")
|
||||
else:
|
||||
logger.info("Batch mode: Waiting for all queued items to complete...")
|
||||
|
||||
# Wait a bit for workers to start processing
|
||||
time.sleep(3)
|
||||
|
||||
try:
|
||||
while current_iteration <= total_iterations:
|
||||
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
|
||||
|
||||
# Use the shared wait_for_all_checks function
|
||||
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
|
||||
|
||||
if not completed:
|
||||
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
|
||||
|
||||
logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed")
|
||||
|
||||
# Check if we need to repeat
|
||||
if current_iteration < total_iterations:
|
||||
logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...")
|
||||
current_iteration += 1
|
||||
|
||||
# Re-queue watches for next iteration
|
||||
queue_watches_for_recheck(datastore, current_iteration)
|
||||
|
||||
# Brief pause before continuing
|
||||
time.sleep(2)
|
||||
else:
|
||||
# All iterations complete
|
||||
logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown")
|
||||
# Trigger shutdown
|
||||
import os, signal
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
return
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Batch mode monitor error: {e}")
|
||||
logger.error(f"Initiating emergency shutdown")
|
||||
import os, signal
|
||||
os.kill(os.getpid(), signal.SIGTERM)
|
||||
|
||||
# Start monitor in background thread
|
||||
monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor")
|
||||
monitor_thread.start()
|
||||
logger.info("Batch mode enabled: Will exit after all queued items are processed")
|
||||
|
||||
# Get the SocketIO instance from the Flask app (created in flask_app.py)
|
||||
from changedetectionio.flask_app import socketio_server
|
||||
global socketio
|
||||
@@ -314,20 +656,33 @@ def main():
|
||||
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
|
||||
|
||||
|
||||
# SocketIO instance is already initialized in flask_app.py
|
||||
if socketio_server:
|
||||
if ssl_mode:
|
||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||
socketio.run(app, host=host, port=int(port), debug=False,
|
||||
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
|
||||
else:
|
||||
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
|
||||
# In batch mode, skip starting the HTTP server - just keep workers running
|
||||
if batch_mode:
|
||||
logger.info("Batch mode: Skipping HTTP server startup, workers will process queue")
|
||||
logger.info("Batch mode: Main thread will wait for shutdown signal")
|
||||
# Keep main thread alive until batch monitor triggers shutdown
|
||||
try:
|
||||
while True:
|
||||
time.sleep(1)
|
||||
except KeyboardInterrupt:
|
||||
logger.info("Batch mode: Keyboard interrupt received")
|
||||
pass
|
||||
else:
|
||||
# Run Flask app without Socket.IO if disabled
|
||||
logger.info("Starting Flask app without Socket.IO server")
|
||||
if ssl_mode:
|
||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||
app.run(host=host, port=int(port), debug=False,
|
||||
ssl_context=(ssl_cert_file, ssl_privkey_file))
|
||||
# Normal mode: Start HTTP server
|
||||
# SocketIO instance is already initialized in flask_app.py
|
||||
if socketio_server:
|
||||
if ssl_mode:
|
||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||
socketio.run(app, host=host, port=int(port), debug=False,
|
||||
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
|
||||
else:
|
||||
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
|
||||
else:
|
||||
app.run(host=host, port=int(port), debug=False)
|
||||
# Run Flask app without Socket.IO if disabled
|
||||
logger.info("Starting Flask app without Socket.IO server")
|
||||
if ssl_mode:
|
||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||
app.run(host=host, port=int(port), debug=False,
|
||||
ssl_context=(ssl_cert_file, ssl_privkey_file))
|
||||
else:
|
||||
app.run(host=host, port=int(port), debug=False)
|
||||
|
||||
@@ -89,11 +89,16 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
continue
|
||||
|
||||
uuid = queued_item_data.item.get('uuid')
|
||||
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
||||
|
||||
# RACE CONDITION FIX: Atomically claim this UUID for processing
|
||||
from changedetectionio import worker_handler
|
||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
|
||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
|
||||
|
||||
# Try to claim the UUID atomically - prevents duplicate processing
|
||||
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
|
||||
# Already being processed by another worker
|
||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
|
||||
|
||||
# Sleep to avoid tight loop and give the other worker time to finish
|
||||
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
|
||||
|
||||
@@ -105,9 +110,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
continue
|
||||
|
||||
fetch_start_time = round(time.time())
|
||||
|
||||
# Mark this UUID as being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
|
||||
|
||||
try:
|
||||
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
||||
@@ -487,8 +489,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
try:
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
# Release UUID from processing (thread-safe)
|
||||
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
|
||||
|
||||
# Send completion signal
|
||||
if watch:
|
||||
|
||||
@@ -401,7 +401,10 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
||||
# (instead of the global var)
|
||||
app.config['DATASTORE'] = datastore_o
|
||||
|
||||
|
||||
# Store batch mode flag to skip background threads when running in batch mode
|
||||
app.config['batch_mode'] = config.get('batch_mode', False) if config else False
|
||||
|
||||
# Store the signal in the app config to ensure it's accessible everywhere
|
||||
app.config['watch_check_update_SIGNAL'] = watch_check_update
|
||||
|
||||
@@ -805,6 +808,8 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
# Initialize Socket.IO server conditionally based on settings
|
||||
socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True)
|
||||
if socket_io_enabled and app.config.get('batch_mode'):
|
||||
socket_io_enabled = False
|
||||
if socket_io_enabled:
|
||||
from changedetectionio.realtime.socket_server import init_socketio
|
||||
global socketio_server
|
||||
@@ -902,14 +907,19 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
logger.info(f"Starting {n_workers} workers during app initialization")
|
||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
# Skip background threads in batch mode (just process queue and exit)
|
||||
batch_mode = app.config.get('batch_mode', False)
|
||||
if not batch_mode:
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||
else:
|
||||
logger.info("Batch mode: Skipping ticker thread, notification runner, and version checker")
|
||||
|
||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||
# This avoids circular dependencies
|
||||
|
||||
@@ -338,7 +338,6 @@ class ChangeDetectionStore:
|
||||
self.needs_write_urgent = True
|
||||
|
||||
def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True):
|
||||
import requests
|
||||
|
||||
if extras is None:
|
||||
extras = {}
|
||||
@@ -349,6 +348,8 @@ class ChangeDetectionStore:
|
||||
|
||||
# Was it a share link? try to fetch the data
|
||||
if (url.startswith("https://changedetection.io/share/")):
|
||||
import requests
|
||||
|
||||
try:
|
||||
r = requests.request(method="GET",
|
||||
url=url,
|
||||
|
||||
243
changedetectionio/test_cli_opts.sh
Executable file
243
changedetectionio/test_cli_opts.sh
Executable file
@@ -0,0 +1,243 @@
|
||||
#!/bin/bash
|
||||
# Test script for CLI options - Parallel execution
|
||||
# Tests -u, -uN, -r, -b flags
|
||||
|
||||
set -u # Exit on undefined variables
|
||||
|
||||
# Color output
|
||||
RED='\033[0;31m'
|
||||
GREEN='\033[0;32m'
|
||||
YELLOW='\033[1;33m'
|
||||
NC='\033[0m' # No Color
|
||||
|
||||
# Test results directory (for parallel safety)
|
||||
TEST_RESULTS_DIR="/tmp/cli-test-results-$$"
|
||||
mkdir -p "$TEST_RESULTS_DIR"
|
||||
|
||||
# Cleanup function
|
||||
cleanup() {
|
||||
echo ""
|
||||
echo "=== Cleaning up test directories ==="
|
||||
rm -rf /tmp/cli-test-* 2>/dev/null || true
|
||||
rm -rf "$TEST_RESULTS_DIR" 2>/dev/null || true
|
||||
# Kill any hanging processes
|
||||
pkill -f "changedetection.py.*cli-test" 2>/dev/null || true
|
||||
}
|
||||
trap cleanup EXIT
|
||||
|
||||
# Helper to record test result
|
||||
record_result() {
|
||||
local test_num=$1
|
||||
local status=$2 # pass or fail
|
||||
local message=$3
|
||||
|
||||
echo "$status|$message" > "$TEST_RESULTS_DIR/test_${test_num}.result"
|
||||
}
|
||||
|
||||
# Run a test in background
|
||||
run_test() {
|
||||
local test_num=$1
|
||||
local test_name=$2
|
||||
local test_func=$3
|
||||
|
||||
(
|
||||
echo -e "${YELLOW}[Test $test_num]${NC} $test_name"
|
||||
if $test_func "$test_num"; then
|
||||
record_result "$test_num" "pass" "$test_name"
|
||||
echo -e "${GREEN}✓ PASS${NC}: $test_name"
|
||||
else
|
||||
record_result "$test_num" "fail" "$test_name"
|
||||
echo -e "${RED}✗ FAIL${NC}: $test_name"
|
||||
fi
|
||||
) &
|
||||
}
|
||||
|
||||
# =============================================================================
|
||||
# Test Functions (each runs independently)
|
||||
# =============================================================================
|
||||
|
||||
test_help_flag() {
|
||||
local test_id=$1
|
||||
timeout 3 python3 changedetection.py --help 2>&1 | grep -q "Add URLs on startup"
|
||||
}
|
||||
|
||||
test_version_flag() {
|
||||
local test_id=$1
|
||||
timeout 3 python3 changedetection.py --version 2>&1 | grep -qE "changedetection.io [0-9]+\.[0-9]+"
|
||||
}
|
||||
|
||||
test_single_url() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-single-${test_id}-$$"
|
||||
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
|
||||
[ -f "$dir/url-watches.json" ] && \
|
||||
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ]
|
||||
}
|
||||
|
||||
test_multiple_urls() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-multi-${test_id}-$$"
|
||||
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-u https://github.com \
|
||||
-u https://httpbin.org \
|
||||
-b &>/dev/null
|
||||
[ -f "$dir/url-watches.json" ] && \
|
||||
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ]
|
||||
}
|
||||
|
||||
test_url_with_options() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-opts-${test_id}-$$"
|
||||
timeout 10 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-u0 '{"title":"Test Site","processor":"text_json_diff"}' \
|
||||
-b &>/dev/null
|
||||
[ -f "$dir/url-watches.json" ] && \
|
||||
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)"
|
||||
}
|
||||
|
||||
test_multiple_urls_with_options() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-multi-opts-${test_id}-$$"
|
||||
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-u0 '{"title":"Site One"}' \
|
||||
-u https://github.com \
|
||||
-u1 '{"title":"Site Two"}' \
|
||||
-b &>/dev/null
|
||||
[ -f "$dir/url-watches.json" ] && \
|
||||
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \
|
||||
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)"
|
||||
}
|
||||
|
||||
test_batch_mode_exit() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-batch-${test_id}-$$"
|
||||
local start=$(date +%s)
|
||||
timeout 15 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-b &>/dev/null
|
||||
local end=$(date +%s)
|
||||
local elapsed=$((end - start))
|
||||
[ $elapsed -lt 14 ]
|
||||
}
|
||||
|
||||
test_recheck_all() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-recheck-all-${test_id}-$$"
|
||||
mkdir -p "$dir"
|
||||
cat > "$dir/url-watches.json" << 'EOF'
|
||||
{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}}
|
||||
EOF
|
||||
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all"
|
||||
}
|
||||
|
||||
test_recheck_specific() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$"
|
||||
mkdir -p "$dir"
|
||||
cat > "$dir/url-watches.json" << 'EOF'
|
||||
{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}}
|
||||
EOF
|
||||
timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches"
|
||||
}
|
||||
|
||||
test_combined_operations() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-combined-${test_id}-$$"
|
||||
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-u https://github.com \
|
||||
-r all \
|
||||
-b &>/dev/null
|
||||
[ -f "$dir/url-watches.json" ] && \
|
||||
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ]
|
||||
}
|
||||
|
||||
test_invalid_json() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-invalid-${test_id}-$$"
|
||||
timeout 5 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-u0 'invalid json here' \
|
||||
2>&1 | grep -qi "invalid json\|json decode error"
|
||||
}
|
||||
|
||||
test_create_directory() {
|
||||
local test_id=$1
|
||||
local dir="/tmp/cli-test-create-${test_id}-$$/nested/path"
|
||||
timeout 10 python3 changedetection.py -d "$dir" -C \
|
||||
-u https://example.com \
|
||||
-b &>/dev/null
|
||||
[ -d "$dir" ]
|
||||
}
|
||||
|
||||
# =============================================================================
|
||||
# Main Test Execution
|
||||
# =============================================================================
|
||||
|
||||
echo "=========================================="
|
||||
echo " CLI Options Test Suite (Parallel)"
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
|
||||
# Launch all tests in parallel
|
||||
run_test 1 "Help flag (--help) shows usage without initialization" test_help_flag
|
||||
run_test 2 "Version flag (--version) displays version" test_version_flag
|
||||
run_test 3 "Add single URL with -u flag" test_single_url
|
||||
run_test 4 "Add multiple URLs with multiple -u flags" test_multiple_urls
|
||||
run_test 5 "Add URL with JSON options using -u0" test_url_with_options
|
||||
run_test 6 "Add multiple URLs with different options (-u0, -u1)" test_multiple_urls_with_options
|
||||
run_test 7 "Batch mode (-b) exits automatically after processing" test_batch_mode_exit
|
||||
run_test 8 "Recheck all watches with -r all" test_recheck_all
|
||||
run_test 9 "Recheck specific watches with -r UUID" test_recheck_specific
|
||||
run_test 10 "Combined: Add URLs and recheck all with -u and -r all" test_combined_operations
|
||||
run_test 11 "Invalid JSON in -u0 option should show error" test_invalid_json
|
||||
run_test 12 "Create datastore directory with -C flag" test_create_directory
|
||||
|
||||
# Wait for all tests to complete
|
||||
echo ""
|
||||
echo "Waiting for all tests to complete..."
|
||||
wait
|
||||
|
||||
# Collect results
|
||||
echo ""
|
||||
echo "=========================================="
|
||||
echo " Test Summary"
|
||||
echo "=========================================="
|
||||
|
||||
TESTS_RUN=0
|
||||
TESTS_PASSED=0
|
||||
TESTS_FAILED=0
|
||||
|
||||
for result_file in "$TEST_RESULTS_DIR"/test_*.result; do
|
||||
if [ -f "$result_file" ]; then
|
||||
TESTS_RUN=$((TESTS_RUN + 1))
|
||||
status=$(cut -d'|' -f1 < "$result_file")
|
||||
if [ "$status" = "pass" ]; then
|
||||
TESTS_PASSED=$((TESTS_PASSED + 1))
|
||||
else
|
||||
TESTS_FAILED=$((TESTS_FAILED + 1))
|
||||
fi
|
||||
fi
|
||||
done
|
||||
|
||||
echo "Tests run: $TESTS_RUN"
|
||||
echo -e "${GREEN}Tests passed: $TESTS_PASSED${NC}"
|
||||
if [ $TESTS_FAILED -gt 0 ]; then
|
||||
echo -e "${RED}Tests failed: $TESTS_FAILED${NC}"
|
||||
else
|
||||
echo -e "${GREEN}Tests failed: $TESTS_FAILED${NC}"
|
||||
fi
|
||||
echo "=========================================="
|
||||
echo ""
|
||||
|
||||
# Exit with appropriate code
|
||||
if [ $TESTS_FAILED -gt 0 ]; then
|
||||
echo -e "${RED}Some tests failed!${NC}"
|
||||
exit 1
|
||||
else
|
||||
echo -e "${GREEN}All tests passed!${NC}"
|
||||
exit 0
|
||||
fi
|
||||
@@ -5,13 +5,11 @@ from threading import Thread
|
||||
|
||||
import pytest
|
||||
import arrow
|
||||
from changedetectionio import changedetection_app
|
||||
from changedetectionio import store
|
||||
import os
|
||||
import sys
|
||||
from loguru import logger
|
||||
|
||||
from changedetectionio.flask_app import init_app_secret
|
||||
from changedetectionio.flask_app import init_app_secret, changedetection_app
|
||||
from changedetectionio.tests.util import live_server_setup, new_live_server_setup
|
||||
|
||||
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py
|
||||
|
||||
@@ -140,38 +140,13 @@ def delete_all_watches(client=None):
|
||||
def wait_for_all_checks(client=None):
|
||||
"""
|
||||
Waits until the queue is empty and workers are idle.
|
||||
Much faster than the original with adaptive timing.
|
||||
Delegates to worker_handler.wait_for_all_checks for shared logic.
|
||||
"""
|
||||
from changedetectionio.flask_app import update_q as global_update_q
|
||||
from changedetectionio import worker_handler
|
||||
empty_since = None
|
||||
attempt = 0
|
||||
max_attempts = 150 # Still reasonable upper bound
|
||||
|
||||
while attempt < max_attempts:
|
||||
# Start with fast checks, slow down if needed
|
||||
if attempt < 10:
|
||||
time.sleep(0.2) # Very fast initial checks
|
||||
elif attempt < 30:
|
||||
time.sleep(0.4) # Medium speed
|
||||
else:
|
||||
time.sleep(0.8) # Slower for persistent issues
|
||||
|
||||
q_length = global_update_q.qsize()
|
||||
running_uuids = worker_handler.get_running_uuids()
|
||||
any_workers_busy = len(running_uuids) > 0
|
||||
|
||||
if q_length == 0 and not any_workers_busy:
|
||||
if empty_since is None:
|
||||
empty_since = time.time()
|
||||
# Brief stabilization period for async workers
|
||||
elif time.time() - empty_since >= 0.3:
|
||||
break
|
||||
else:
|
||||
empty_since = None
|
||||
|
||||
attempt += 1
|
||||
time.sleep(0.3)
|
||||
# Use the shared wait logic from worker_handler
|
||||
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
|
||||
|
||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||
"""
|
||||
|
||||
@@ -2857,7 +2857,7 @@ msgstr "Echtzeit-Updates offline"
|
||||
|
||||
#: changedetectionio/templates/base.html
|
||||
msgid "Select Language"
|
||||
msgstr "Wählen Sie Sprache aus"
|
||||
msgstr "Wählen Sie eine Sprache aus"
|
||||
|
||||
#: changedetectionio/templates/base.html
|
||||
msgid "Auto-detect from browser"
|
||||
|
||||
@@ -17,6 +17,7 @@ worker_threads = [] # List of WorkerThread objects
|
||||
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
_uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids
|
||||
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
@@ -207,31 +208,80 @@ def get_worker_count():
|
||||
|
||||
def get_running_uuids():
|
||||
"""Get list of UUIDs currently being processed by async workers"""
|
||||
return list(currently_processing_uuids.keys())
|
||||
with _uuid_processing_lock:
|
||||
return list(currently_processing_uuids.keys())
|
||||
|
||||
|
||||
def claim_uuid_for_processing(uuid, worker_id):
|
||||
"""
|
||||
Atomically check if UUID is available and claim it for processing.
|
||||
|
||||
This is thread-safe and prevents race conditions where multiple workers
|
||||
try to process the same UUID simultaneously.
|
||||
|
||||
Args:
|
||||
uuid: The watch UUID to claim
|
||||
worker_id: The ID of the worker claiming this UUID
|
||||
|
||||
Returns:
|
||||
True if successfully claimed (UUID was not being processed)
|
||||
False if already being processed by another worker
|
||||
"""
|
||||
with _uuid_processing_lock:
|
||||
if uuid in currently_processing_uuids:
|
||||
# Already being processed by another worker
|
||||
return False
|
||||
# Claim it atomically
|
||||
currently_processing_uuids[uuid] = worker_id
|
||||
logger.debug(f"Worker {worker_id} claimed UUID: {uuid}")
|
||||
return True
|
||||
|
||||
|
||||
def release_uuid_from_processing(uuid, worker_id):
|
||||
"""
|
||||
Release a UUID from processing (thread-safe).
|
||||
|
||||
Args:
|
||||
uuid: The watch UUID to release
|
||||
worker_id: The ID of the worker releasing this UUID
|
||||
"""
|
||||
with _uuid_processing_lock:
|
||||
# Only remove if this worker owns it (defensive)
|
||||
if currently_processing_uuids.get(uuid) == worker_id:
|
||||
currently_processing_uuids.pop(uuid, None)
|
||||
logger.debug(f"Worker {worker_id} released UUID: {uuid}")
|
||||
else:
|
||||
logger.warning(f"Worker {worker_id} tried to release UUID {uuid} but doesn't own it (owned by {currently_processing_uuids.get(uuid, 'nobody')})")
|
||||
|
||||
|
||||
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
||||
"""Mark a UUID as being processed or completed by a specific worker"""
|
||||
global currently_processing_uuids
|
||||
"""
|
||||
Mark a UUID as being processed or completed by a specific worker.
|
||||
|
||||
DEPRECATED: Use claim_uuid_for_processing() and release_uuid_from_processing() instead.
|
||||
This function is kept for backward compatibility but doesn't provide atomic check-and-set.
|
||||
"""
|
||||
if processing:
|
||||
currently_processing_uuids[uuid] = worker_id
|
||||
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||
with _uuid_processing_lock:
|
||||
currently_processing_uuids[uuid] = worker_id
|
||||
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||
else:
|
||||
currently_processing_uuids.pop(uuid, None)
|
||||
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
|
||||
release_uuid_from_processing(uuid, worker_id)
|
||||
|
||||
|
||||
def is_watch_running(watch_uuid):
|
||||
"""Check if a specific watch is currently being processed by any worker"""
|
||||
return watch_uuid in currently_processing_uuids
|
||||
with _uuid_processing_lock:
|
||||
return watch_uuid in currently_processing_uuids
|
||||
|
||||
|
||||
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
||||
"""Check if a specific watch is currently being processed by a different worker"""
|
||||
if watch_uuid not in currently_processing_uuids:
|
||||
return False
|
||||
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||
return processing_worker_id != current_worker_id
|
||||
with _uuid_processing_lock:
|
||||
if watch_uuid not in currently_processing_uuids:
|
||||
return False
|
||||
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||
return processing_worker_id != current_worker_id
|
||||
|
||||
|
||||
def queue_item_async_safe(update_q, item, silent=False):
|
||||
@@ -381,6 +431,53 @@ def get_worker_status():
|
||||
}
|
||||
|
||||
|
||||
def wait_for_all_checks(update_q, timeout=150):
|
||||
"""
|
||||
Wait for queue to be empty and all workers to be idle.
|
||||
|
||||
Args:
|
||||
update_q: The update queue to monitor
|
||||
timeout: Maximum wait time in seconds (default 150 = 150 iterations * 0.2-0.8s)
|
||||
|
||||
Returns:
|
||||
bool: True if all checks completed, False if timeout
|
||||
"""
|
||||
import time
|
||||
empty_since = None
|
||||
attempt = 0
|
||||
max_attempts = timeout
|
||||
|
||||
while attempt < max_attempts:
|
||||
# Adaptive sleep - start fast, slow down if needed
|
||||
if attempt < 10:
|
||||
sleep_time = 0.2 # Very fast initial checks
|
||||
elif attempt < 30:
|
||||
sleep_time = 0.4 # Medium speed
|
||||
else:
|
||||
sleep_time = 0.8 # Slower for persistent issues
|
||||
|
||||
time.sleep(sleep_time)
|
||||
|
||||
q_length = update_q.qsize()
|
||||
running_uuids = get_running_uuids()
|
||||
any_workers_busy = len(running_uuids) > 0
|
||||
|
||||
if q_length == 0 and not any_workers_busy:
|
||||
if empty_since is None:
|
||||
empty_since = time.time()
|
||||
# Brief stabilization period for async workers
|
||||
elif time.time() - empty_since >= 0.3:
|
||||
# Add small buffer for filesystem operations to complete
|
||||
time.sleep(0.2)
|
||||
return True
|
||||
else:
|
||||
empty_since = None
|
||||
|
||||
attempt += 1
|
||||
|
||||
return False # Timeout
|
||||
|
||||
|
||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Check if the expected number of async workers are running and restart any missing ones.
|
||||
|
||||
@@ -42,7 +42,7 @@ orjson~=3.11
|
||||
# jq not available on Windows so must be installed manually
|
||||
|
||||
# Notification library
|
||||
apprise==1.9.6
|
||||
apprise==1.9.7
|
||||
|
||||
diff_match_patch
|
||||
|
||||
|
||||
Reference in New Issue
Block a user