mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-03-28 22:57:57 +00:00
Compare commits
8 Commits
JSONP-supp
...
CLI-option
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
04e22f1b0a | ||
|
|
2a0e8cbf6c | ||
|
|
901d69af42 | ||
|
|
528c305928 | ||
|
|
a4357c7bb7 | ||
|
|
75db43fc09 | ||
|
|
f8c6c62107 | ||
|
|
4523918752 |
@@ -111,6 +111,32 @@ jobs:
|
|||||||
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
|
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
|
||||||
docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh'
|
docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh'
|
||||||
|
|
||||||
|
- name: Test CLI options
|
||||||
|
run: |
|
||||||
|
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
|
||||||
|
docker run --name test-cdio-cli-opts --network changedet-network test-changedetectionio bash -c 'changedetectionio/test_cli_opts.sh' &> cli-opts-output.txt
|
||||||
|
echo "=== CLI Options Test Output ==="
|
||||||
|
cat cli-opts-output.txt
|
||||||
|
|
||||||
|
- name: CLI Memory Test
|
||||||
|
run: |
|
||||||
|
echo "=== Checking CLI batch mode memory usage ==="
|
||||||
|
# Extract RSS memory value from output
|
||||||
|
RSS_MB=$(grep -oP "Memory consumption before worker shutdown: RSS=\K[\d.]+" cli-opts-output.txt | head -1 || echo "0")
|
||||||
|
echo "RSS Memory: ${RSS_MB} MB"
|
||||||
|
|
||||||
|
# Check if RSS is less than 100MB
|
||||||
|
if [ -n "$RSS_MB" ]; then
|
||||||
|
if (( $(echo "$RSS_MB < 100" | bc -l) )); then
|
||||||
|
echo "✓ Memory usage is acceptable: ${RSS_MB} MB < 100 MB"
|
||||||
|
else
|
||||||
|
echo "✗ Memory usage too high: ${RSS_MB} MB >= 100 MB"
|
||||||
|
exit 1
|
||||||
|
fi
|
||||||
|
else
|
||||||
|
echo "⚠ Could not extract memory usage, skipping check"
|
||||||
|
fi
|
||||||
|
|
||||||
- name: Extract memory report and logs
|
- name: Extract memory report and logs
|
||||||
if: always()
|
if: always()
|
||||||
uses: ./.github/actions/extract-memory-report
|
uses: ./.github/actions/extract-memory-report
|
||||||
@@ -125,6 +151,13 @@ jobs:
|
|||||||
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
|
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
|
||||||
path: output-logs
|
path: output-logs
|
||||||
|
|
||||||
|
- name: Store CLI test output
|
||||||
|
if: always()
|
||||||
|
uses: actions/upload-artifact@v6
|
||||||
|
with:
|
||||||
|
name: test-cdio-cli-opts-output-py${{ env.PYTHON_VERSION }}
|
||||||
|
path: cli-opts-output.txt
|
||||||
|
|
||||||
# Playwright tests
|
# Playwright tests
|
||||||
playwright-tests:
|
playwright-tests:
|
||||||
runs-on: ubuntu-latest
|
runs-on: ubuntu-latest
|
||||||
|
|||||||
@@ -6,19 +6,20 @@ __version__ = '0.52.9'
|
|||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
import getopt
|
||||||
import logging
|
import logging
|
||||||
import os
|
import os
|
||||||
import getopt
|
|
||||||
import platform
|
import platform
|
||||||
import signal
|
import signal
|
||||||
|
import threading
|
||||||
import sys
|
import time
|
||||||
|
|
||||||
# Eventlet completely removed - using threading mode for SocketIO
|
# Eventlet completely removed - using threading mode for SocketIO
|
||||||
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
|
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
|
||||||
from changedetectionio import store
|
# Note: store and changedetection_app are imported inside main() to avoid
|
||||||
from changedetectionio.flask_app import changedetection_app
|
# initialization before argument parsing (allows --help to work without loading everything)
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
# ==============================================================================
|
# ==============================================================================
|
||||||
# Multiprocessing Configuration - CRITICAL for Thread Safety
|
# Multiprocessing Configuration - CRITICAL for Thread Safety
|
||||||
@@ -83,11 +84,22 @@ def get_version():
|
|||||||
def sigshutdown_handler(_signo, _stack_frame):
|
def sigshutdown_handler(_signo, _stack_frame):
|
||||||
name = signal.Signals(_signo).name
|
name = signal.Signals(_signo).name
|
||||||
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
|
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
|
||||||
|
|
||||||
# Set exit flag immediately to stop all loops
|
# Set exit flag immediately to stop all loops
|
||||||
app.config.exit.set()
|
app.config.exit.set()
|
||||||
datastore.stop_thread = True
|
datastore.stop_thread = True
|
||||||
|
|
||||||
|
# Log memory consumption before shutting down workers (cross-platform)
|
||||||
|
try:
|
||||||
|
import psutil
|
||||||
|
process = psutil.Process()
|
||||||
|
mem_info = process.memory_info()
|
||||||
|
rss_mb = mem_info.rss / 1024 / 1024
|
||||||
|
vms_mb = mem_info.vms / 1024 / 1024
|
||||||
|
logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB")
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Could not retrieve memory stats: {str(e)}")
|
||||||
|
|
||||||
# Shutdown workers and queues immediately
|
# Shutdown workers and queues immediately
|
||||||
try:
|
try:
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
@@ -125,10 +137,51 @@ def main():
|
|||||||
global datastore
|
global datastore
|
||||||
global app
|
global app
|
||||||
|
|
||||||
|
# Early help/version check before any initialization
|
||||||
|
if '--help' in sys.argv or '-help' in sys.argv:
|
||||||
|
print('Usage: changedetection.py [options]')
|
||||||
|
print('')
|
||||||
|
print('Standard options:')
|
||||||
|
print(' -s SSL enable')
|
||||||
|
print(' -h HOST Listen host (default: 0.0.0.0)')
|
||||||
|
print(' -p PORT Listen port (default: 5000)')
|
||||||
|
print(' -d PATH Datastore path')
|
||||||
|
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
|
||||||
|
print(' -c Cleanup unused snapshots')
|
||||||
|
print(' -C Create datastore directory if it doesn\'t exist')
|
||||||
|
print('')
|
||||||
|
print('Add URLs on startup:')
|
||||||
|
print(' -u URL Add URL to watch (can be used multiple times)')
|
||||||
|
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
|
||||||
|
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
|
||||||
|
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
|
||||||
|
print(' Available options: processor, fetch_backend, headers, method, etc.')
|
||||||
|
print(' See model/Watch.py for all available options')
|
||||||
|
print('')
|
||||||
|
print('Recheck on startup:')
|
||||||
|
print(' -r all Queue all watches for recheck on startup')
|
||||||
|
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
|
||||||
|
print(' -r all N Queue all watches, wait for completion, repeat N times')
|
||||||
|
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
|
||||||
|
print('')
|
||||||
|
print('Batch mode:')
|
||||||
|
print(' -b Run in batch mode (process queue then exit)')
|
||||||
|
print(' Useful for CI/CD, cron jobs, or one-time checks')
|
||||||
|
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
|
||||||
|
print(' Use -p PORT to specify a different port if needed')
|
||||||
|
print('')
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
if '--version' in sys.argv or '-v' in sys.argv:
|
||||||
|
print(f'changedetection.io {__version__}')
|
||||||
|
sys.exit(0)
|
||||||
|
|
||||||
|
# Import heavy modules after help/version checks to keep startup fast for those flags
|
||||||
|
from changedetectionio import store
|
||||||
|
from changedetectionio.flask_app import changedetection_app
|
||||||
|
|
||||||
datastore_path = None
|
datastore_path = None
|
||||||
do_cleanup = False
|
do_cleanup = False
|
||||||
# Optional URL to watch since start
|
|
||||||
default_url = None
|
|
||||||
# Set a default logger level
|
# Set a default logger level
|
||||||
logger_level = 'DEBUG'
|
logger_level = 'DEBUG'
|
||||||
include_default_watches = True
|
include_default_watches = True
|
||||||
@@ -137,6 +190,13 @@ def main():
|
|||||||
port = int(os.environ.get('PORT', 5000))
|
port = int(os.environ.get('PORT', 5000))
|
||||||
ssl_mode = False
|
ssl_mode = False
|
||||||
|
|
||||||
|
# Lists for multiple URLs and their options
|
||||||
|
urls_to_add = []
|
||||||
|
url_options = {} # Key: index (0-based), Value: dict of options
|
||||||
|
recheck_watches = None # None, 'all', or list of UUIDs
|
||||||
|
recheck_repeat_count = 1 # Number of times to repeat recheck cycle
|
||||||
|
batch_mode = False # Run once then exit when queue is empty
|
||||||
|
|
||||||
# On Windows, create and use a default path.
|
# On Windows, create and use a default path.
|
||||||
if os.name == 'nt':
|
if os.name == 'nt':
|
||||||
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
|
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
|
||||||
@@ -145,10 +205,98 @@ def main():
|
|||||||
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
|
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
|
||||||
datastore_path = os.path.join(os.getcwd(), "../datastore")
|
datastore_path = os.path.join(os.getcwd(), "../datastore")
|
||||||
|
|
||||||
|
# Pre-process arguments to extract -u, -u<N>, and -r options before getopt
|
||||||
|
# This allows unlimited -u0, -u1, -u2, ... options without predefining them
|
||||||
|
cleaned_argv = ['changedetection.py'] # Start with program name
|
||||||
|
i = 1
|
||||||
|
while i < len(sys.argv):
|
||||||
|
arg = sys.argv[i]
|
||||||
|
|
||||||
|
# Handle -u (add URL)
|
||||||
|
if arg == '-u' and i + 1 < len(sys.argv):
|
||||||
|
urls_to_add.append(sys.argv[i + 1])
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle -u<N> (set options for URL at index N)
|
||||||
|
if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit():
|
||||||
|
idx = int(arg[2:])
|
||||||
|
if i + 1 < len(sys.argv):
|
||||||
|
try:
|
||||||
|
import json
|
||||||
|
url_options[idx] = json.loads(sys.argv[i + 1])
|
||||||
|
except json.JSONDecodeError as e:
|
||||||
|
print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}')
|
||||||
|
print(f'JSON decode error: {e}')
|
||||||
|
sys.exit(2)
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle -r (recheck watches)
|
||||||
|
if arg == '-r' and i + 1 < len(sys.argv):
|
||||||
|
recheck_arg = sys.argv[i + 1]
|
||||||
|
if recheck_arg.lower() == 'all':
|
||||||
|
recheck_watches = 'all'
|
||||||
|
else:
|
||||||
|
# Parse comma-separated list of UUIDs
|
||||||
|
recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()]
|
||||||
|
|
||||||
|
# Check for optional repeat count as third argument
|
||||||
|
if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit():
|
||||||
|
recheck_repeat_count = int(sys.argv[i + 2])
|
||||||
|
if recheck_repeat_count < 1:
|
||||||
|
print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}')
|
||||||
|
sys.exit(2)
|
||||||
|
i += 3
|
||||||
|
else:
|
||||||
|
i += 2
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Handle -b (batch mode - run once and exit)
|
||||||
|
if arg == '-b':
|
||||||
|
batch_mode = True
|
||||||
|
i += 1
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Keep other arguments for getopt
|
||||||
|
cleaned_argv.append(arg)
|
||||||
|
i += 1
|
||||||
|
|
||||||
try:
|
try:
|
||||||
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
|
opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port")
|
||||||
except getopt.GetoptError:
|
except getopt.GetoptError as e:
|
||||||
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
|
print('Usage: changedetection.py [options]')
|
||||||
|
print('')
|
||||||
|
print('Standard options:')
|
||||||
|
print(' -s SSL enable')
|
||||||
|
print(' -h HOST Listen host (default: 0.0.0.0)')
|
||||||
|
print(' -p PORT Listen port (default: 5000)')
|
||||||
|
print(' -d PATH Datastore path')
|
||||||
|
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
|
||||||
|
print(' -c Cleanup unused snapshots')
|
||||||
|
print(' -C Create datastore directory if it doesn\'t exist')
|
||||||
|
print('')
|
||||||
|
print('Add URLs on startup:')
|
||||||
|
print(' -u URL Add URL to watch (can be used multiple times)')
|
||||||
|
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
|
||||||
|
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
|
||||||
|
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
|
||||||
|
print(' Available options: processor, fetch_backend, headers, method, etc.')
|
||||||
|
print(' See model/Watch.py for all available options')
|
||||||
|
print('')
|
||||||
|
print('Recheck on startup:')
|
||||||
|
print(' -r all Queue all watches for recheck on startup')
|
||||||
|
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
|
||||||
|
print(' -r all N Queue all watches, wait for completion, repeat N times')
|
||||||
|
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
|
||||||
|
print('')
|
||||||
|
print('Batch mode:')
|
||||||
|
print(' -b Run in batch mode (process queue then exit)')
|
||||||
|
print(' Useful for CI/CD, cron jobs, or one-time checks')
|
||||||
|
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
|
||||||
|
print(' Use -p PORT to specify a different port if needed')
|
||||||
|
print('')
|
||||||
|
print(f'Error: {e}')
|
||||||
sys.exit(2)
|
sys.exit(2)
|
||||||
|
|
||||||
create_datastore_dir = False
|
create_datastore_dir = False
|
||||||
@@ -173,10 +321,6 @@ def main():
|
|||||||
if opt == '-d':
|
if opt == '-d':
|
||||||
datastore_path = arg
|
datastore_path = arg
|
||||||
|
|
||||||
if opt == '-u':
|
|
||||||
default_url = arg
|
|
||||||
include_default_watches = False
|
|
||||||
|
|
||||||
# Cleanup (remove text files that arent in the index)
|
# Cleanup (remove text files that arent in the index)
|
||||||
if opt == '-c':
|
if opt == '-c':
|
||||||
do_cleanup = True
|
do_cleanup = True
|
||||||
@@ -188,6 +332,10 @@ def main():
|
|||||||
if opt == '-l':
|
if opt == '-l':
|
||||||
logger_level = int(arg) if arg.isdigit() else arg.upper()
|
logger_level = int(arg) if arg.isdigit() else arg.upper()
|
||||||
|
|
||||||
|
# If URLs are provided, don't include default watches
|
||||||
|
if urls_to_add:
|
||||||
|
include_default_watches = False
|
||||||
|
|
||||||
|
|
||||||
logger.success(f"changedetection.io version {get_version()} starting.")
|
logger.success(f"changedetection.io version {get_version()} starting.")
|
||||||
# Launch using SocketIO run method for proper integration (if enabled)
|
# Launch using SocketIO run method for proper integration (if enabled)
|
||||||
@@ -224,11 +372,16 @@ def main():
|
|||||||
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
|
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
|
||||||
|
|
||||||
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
|
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
|
||||||
app_config = {'datastore_path': datastore_path}
|
app_config = {
|
||||||
|
'datastore_path': datastore_path,
|
||||||
|
'batch_mode': batch_mode,
|
||||||
|
'recheck_watches': recheck_watches,
|
||||||
|
'recheck_repeat_count': recheck_repeat_count
|
||||||
|
}
|
||||||
|
|
||||||
if not os.path.isdir(app_config['datastore_path']):
|
if not os.path.isdir(app_config['datastore_path']):
|
||||||
if create_datastore_dir:
|
if create_datastore_dir:
|
||||||
os.mkdir(app_config['datastore_path'])
|
os.makedirs(app_config['datastore_path'], exist_ok=True)
|
||||||
else:
|
else:
|
||||||
logger.critical(
|
logger.critical(
|
||||||
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
|
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
|
||||||
@@ -249,11 +402,200 @@ def main():
|
|||||||
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
|
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
|
||||||
inject_datastore_into_plugins(datastore)
|
inject_datastore_into_plugins(datastore)
|
||||||
|
|
||||||
if default_url:
|
# Step 1: Add URLs with their options (if provided via -u flags)
|
||||||
datastore.add_watch(url = default_url)
|
added_watch_uuids = []
|
||||||
|
if urls_to_add:
|
||||||
|
logger.info(f"Adding {len(urls_to_add)} URL(s) from command line")
|
||||||
|
for idx, url in enumerate(urls_to_add):
|
||||||
|
extras = url_options.get(idx, {})
|
||||||
|
if extras:
|
||||||
|
logger.debug(f"Adding watch {idx}: {url} with options: {extras}")
|
||||||
|
else:
|
||||||
|
logger.debug(f"Adding watch {idx}: {url}")
|
||||||
|
|
||||||
|
new_uuid = datastore.add_watch(url=url, extras=extras)
|
||||||
|
if new_uuid:
|
||||||
|
added_watch_uuids.append(new_uuid)
|
||||||
|
logger.success(f"Added watch: {url} (UUID: {new_uuid})")
|
||||||
|
else:
|
||||||
|
logger.error(f"Failed to add watch: {url}")
|
||||||
|
|
||||||
app = changedetection_app(app_config, datastore)
|
app = changedetection_app(app_config, datastore)
|
||||||
|
|
||||||
|
# Step 2: Queue newly added watches (if -u was provided in batch mode)
|
||||||
|
# This must happen AFTER app initialization so update_q is available
|
||||||
|
if batch_mode and added_watch_uuids:
|
||||||
|
from changedetectionio.flask_app import update_q
|
||||||
|
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||||
|
|
||||||
|
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
|
||||||
|
for watch_uuid in added_watch_uuids:
|
||||||
|
try:
|
||||||
|
worker_handler.queue_item_async_safe(
|
||||||
|
update_q,
|
||||||
|
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||||
|
)
|
||||||
|
logger.debug(f"Queued newly added watch: {watch_uuid}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||||
|
|
||||||
|
# Step 3: Queue watches for recheck (if -r was provided)
|
||||||
|
# This must happen AFTER app initialization so update_q is available
|
||||||
|
if recheck_watches is not None:
|
||||||
|
from changedetectionio.flask_app import update_q
|
||||||
|
from changedetectionio import queuedWatchMetaData, worker_handler
|
||||||
|
|
||||||
|
watches_to_queue = []
|
||||||
|
if recheck_watches == 'all':
|
||||||
|
# Queue all watches, excluding those already queued in batch mode
|
||||||
|
all_watches = list(datastore.data['watching'].keys())
|
||||||
|
if batch_mode and added_watch_uuids:
|
||||||
|
# Exclude newly added watches that were already queued in batch mode
|
||||||
|
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||||||
|
logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)")
|
||||||
|
else:
|
||||||
|
watches_to_queue = all_watches
|
||||||
|
logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck")
|
||||||
|
else:
|
||||||
|
# Queue specific UUIDs
|
||||||
|
watches_to_queue = recheck_watches
|
||||||
|
logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck")
|
||||||
|
|
||||||
|
queued_count = 0
|
||||||
|
for watch_uuid in watches_to_queue:
|
||||||
|
if watch_uuid in datastore.data['watching']:
|
||||||
|
try:
|
||||||
|
worker_handler.queue_item_async_safe(
|
||||||
|
update_q,
|
||||||
|
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||||
|
)
|
||||||
|
queued_count += 1
|
||||||
|
logger.debug(f"Queued watch for recheck: {watch_uuid}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||||||
|
|
||||||
|
logger.success(f"Successfully queued {queued_count} watches for recheck")
|
||||||
|
|
||||||
|
# Step 4: Setup batch mode monitor (if -b was provided)
|
||||||
|
if batch_mode:
|
||||||
|
from changedetectionio.flask_app import update_q
|
||||||
|
|
||||||
|
# Safety check: Ensure Flask app is not already running on this port
|
||||||
|
# Batch mode should never run alongside the web server
|
||||||
|
import socket
|
||||||
|
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Try to bind to the configured host:port (no SO_REUSEADDR - strict check)
|
||||||
|
test_socket.bind((host, port))
|
||||||
|
test_socket.close()
|
||||||
|
logger.debug(f"Batch mode: Port {port} is available (Flask app not running)")
|
||||||
|
except OSError as e:
|
||||||
|
test_socket.close()
|
||||||
|
# errno 98 = EADDRINUSE (Linux)
|
||||||
|
# errno 48 = EADDRINUSE (macOS)
|
||||||
|
# errno 10048 = WSAEADDRINUSE (Windows)
|
||||||
|
if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower():
|
||||||
|
logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use")
|
||||||
|
logger.critical(f"The Flask web server appears to be running on {host}:{port}")
|
||||||
|
logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)")
|
||||||
|
logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT")
|
||||||
|
sys.exit(1)
|
||||||
|
else:
|
||||||
|
# Some other socket error - log but continue (might be network configuration issue)
|
||||||
|
logger.warning(f"Port availability check failed with unexpected error: {e}")
|
||||||
|
logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts")
|
||||||
|
|
||||||
|
def queue_watches_for_recheck(datastore, iteration):
|
||||||
|
"""Helper function to queue watches for recheck"""
|
||||||
|
watches_to_queue = []
|
||||||
|
if recheck_watches == 'all':
|
||||||
|
all_watches = list(datastore.data['watching'].keys())
|
||||||
|
if batch_mode and added_watch_uuids and iteration == 1:
|
||||||
|
# Only exclude newly added watches on first iteration
|
||||||
|
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||||||
|
else:
|
||||||
|
watches_to_queue = all_watches
|
||||||
|
logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches")
|
||||||
|
elif recheck_watches:
|
||||||
|
watches_to_queue = recheck_watches
|
||||||
|
logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches")
|
||||||
|
|
||||||
|
queued_count = 0
|
||||||
|
for watch_uuid in watches_to_queue:
|
||||||
|
if watch_uuid in datastore.data['watching']:
|
||||||
|
try:
|
||||||
|
worker_handler.queue_item_async_safe(
|
||||||
|
update_q,
|
||||||
|
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||||||
|
)
|
||||||
|
queued_count += 1
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||||||
|
logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches")
|
||||||
|
return queued_count
|
||||||
|
|
||||||
|
def batch_mode_monitor():
|
||||||
|
"""Monitor queue and workers, shutdown or repeat when work is complete"""
|
||||||
|
import time
|
||||||
|
|
||||||
|
# Track iterations if repeat mode is enabled
|
||||||
|
current_iteration = 1
|
||||||
|
total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1
|
||||||
|
|
||||||
|
if total_iterations > 1:
|
||||||
|
logger.info(f"Batch mode: Will repeat recheck {total_iterations} times")
|
||||||
|
else:
|
||||||
|
logger.info("Batch mode: Waiting for all queued items to complete...")
|
||||||
|
|
||||||
|
# Wait a bit for workers to start processing
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
try:
|
||||||
|
while current_iteration <= total_iterations:
|
||||||
|
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
|
||||||
|
|
||||||
|
# Use the shared wait_for_all_checks function
|
||||||
|
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
|
||||||
|
|
||||||
|
if not completed:
|
||||||
|
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
|
||||||
|
|
||||||
|
logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed")
|
||||||
|
|
||||||
|
# Check if we need to repeat
|
||||||
|
if current_iteration < total_iterations:
|
||||||
|
logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...")
|
||||||
|
current_iteration += 1
|
||||||
|
|
||||||
|
# Re-queue watches for next iteration
|
||||||
|
queue_watches_for_recheck(datastore, current_iteration)
|
||||||
|
|
||||||
|
# Brief pause before continuing
|
||||||
|
time.sleep(2)
|
||||||
|
else:
|
||||||
|
# All iterations complete
|
||||||
|
logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown")
|
||||||
|
# Trigger shutdown
|
||||||
|
import os, signal
|
||||||
|
os.kill(os.getpid(), signal.SIGTERM)
|
||||||
|
return
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Batch mode monitor error: {e}")
|
||||||
|
logger.error(f"Initiating emergency shutdown")
|
||||||
|
import os, signal
|
||||||
|
os.kill(os.getpid(), signal.SIGTERM)
|
||||||
|
|
||||||
|
# Start monitor in background thread
|
||||||
|
monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor")
|
||||||
|
monitor_thread.start()
|
||||||
|
logger.info("Batch mode enabled: Will exit after all queued items are processed")
|
||||||
|
|
||||||
# Get the SocketIO instance from the Flask app (created in flask_app.py)
|
# Get the SocketIO instance from the Flask app (created in flask_app.py)
|
||||||
from changedetectionio.flask_app import socketio_server
|
from changedetectionio.flask_app import socketio_server
|
||||||
global socketio
|
global socketio
|
||||||
@@ -314,20 +656,33 @@ def main():
|
|||||||
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
|
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
|
||||||
|
|
||||||
|
|
||||||
# SocketIO instance is already initialized in flask_app.py
|
# In batch mode, skip starting the HTTP server - just keep workers running
|
||||||
if socketio_server:
|
if batch_mode:
|
||||||
if ssl_mode:
|
logger.info("Batch mode: Skipping HTTP server startup, workers will process queue")
|
||||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
logger.info("Batch mode: Main thread will wait for shutdown signal")
|
||||||
socketio.run(app, host=host, port=int(port), debug=False,
|
# Keep main thread alive until batch monitor triggers shutdown
|
||||||
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
|
try:
|
||||||
else:
|
while True:
|
||||||
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
|
time.sleep(1)
|
||||||
|
except KeyboardInterrupt:
|
||||||
|
logger.info("Batch mode: Keyboard interrupt received")
|
||||||
|
pass
|
||||||
else:
|
else:
|
||||||
# Run Flask app without Socket.IO if disabled
|
# Normal mode: Start HTTP server
|
||||||
logger.info("Starting Flask app without Socket.IO server")
|
# SocketIO instance is already initialized in flask_app.py
|
||||||
if ssl_mode:
|
if socketio_server:
|
||||||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
if ssl_mode:
|
||||||
app.run(host=host, port=int(port), debug=False,
|
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||||
ssl_context=(ssl_cert_file, ssl_privkey_file))
|
socketio.run(app, host=host, port=int(port), debug=False,
|
||||||
|
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
|
||||||
|
else:
|
||||||
|
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
|
||||||
else:
|
else:
|
||||||
app.run(host=host, port=int(port), debug=False)
|
# Run Flask app without Socket.IO if disabled
|
||||||
|
logger.info("Starting Flask app without Socket.IO server")
|
||||||
|
if ssl_mode:
|
||||||
|
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||||||
|
app.run(host=host, port=int(port), debug=False,
|
||||||
|
ssl_context=(ssl_cert_file, ssl_privkey_file))
|
||||||
|
else:
|
||||||
|
app.run(host=host, port=int(port), debug=False)
|
||||||
|
|||||||
@@ -89,11 +89,16 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
uuid = queued_item_data.item.get('uuid')
|
uuid = queued_item_data.item.get('uuid')
|
||||||
# RACE CONDITION FIX: Check if this UUID is already being processed by another worker
|
|
||||||
|
# RACE CONDITION FIX: Atomically claim this UUID for processing
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
from changedetectionio.queuedWatchMetaData import PrioritizedItem
|
||||||
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
|
|
||||||
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
|
# Try to claim the UUID atomically - prevents duplicate processing
|
||||||
|
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
|
||||||
|
# Already being processed by another worker
|
||||||
|
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
|
||||||
|
|
||||||
# Sleep to avoid tight loop and give the other worker time to finish
|
# Sleep to avoid tight loop and give the other worker time to finish
|
||||||
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
|
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
|
||||||
|
|
||||||
@@ -105,9 +110,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
continue
|
continue
|
||||||
|
|
||||||
fetch_start_time = round(time.time())
|
fetch_start_time = round(time.time())
|
||||||
|
|
||||||
# Mark this UUID as being processed by this worker
|
|
||||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=True)
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
if uuid in list(datastore.data['watching'].keys()) and datastore.data['watching'][uuid].get('url'):
|
||||||
@@ -487,8 +489,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||||
try:
|
try:
|
||||||
# Mark UUID as no longer being processed by this worker
|
# Release UUID from processing (thread-safe)
|
||||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
|
||||||
|
|
||||||
# Send completion signal
|
# Send completion signal
|
||||||
if watch:
|
if watch:
|
||||||
|
|||||||
@@ -401,7 +401,10 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
||||||
# (instead of the global var)
|
# (instead of the global var)
|
||||||
app.config['DATASTORE'] = datastore_o
|
app.config['DATASTORE'] = datastore_o
|
||||||
|
|
||||||
|
# Store batch mode flag to skip background threads when running in batch mode
|
||||||
|
app.config['batch_mode'] = config.get('batch_mode', False) if config else False
|
||||||
|
|
||||||
# Store the signal in the app config to ensure it's accessible everywhere
|
# Store the signal in the app config to ensure it's accessible everywhere
|
||||||
app.config['watch_check_update_SIGNAL'] = watch_check_update
|
app.config['watch_check_update_SIGNAL'] = watch_check_update
|
||||||
|
|
||||||
@@ -805,6 +808,8 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
# Initialize Socket.IO server conditionally based on settings
|
# Initialize Socket.IO server conditionally based on settings
|
||||||
socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True)
|
socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True)
|
||||||
|
if socket_io_enabled and app.config.get('batch_mode'):
|
||||||
|
socket_io_enabled = False
|
||||||
if socket_io_enabled:
|
if socket_io_enabled:
|
||||||
from changedetectionio.realtime.socket_server import init_socketio
|
from changedetectionio.realtime.socket_server import init_socketio
|
||||||
global socketio_server
|
global socketio_server
|
||||||
@@ -902,14 +907,19 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
logger.info(f"Starting {n_workers} workers during app initialization")
|
logger.info(f"Starting {n_workers} workers during app initialization")
|
||||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||||
|
|
||||||
# @todo handle ctrl break
|
# Skip background threads in batch mode (just process queue and exit)
|
||||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
batch_mode = app.config.get('batch_mode', False)
|
||||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
if not batch_mode:
|
||||||
|
# @todo handle ctrl break
|
||||||
|
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||||
|
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||||
|
|
||||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
# Check for new release version, but not when running in test/build or pytest
|
# Check for new release version, but not when running in test/build or pytest
|
||||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||||
|
else:
|
||||||
|
logger.info("Batch mode: Skipping ticker thread, notification runner, and version checker")
|
||||||
|
|
||||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||||
# This avoids circular dependencies
|
# This avoids circular dependencies
|
||||||
|
|||||||
@@ -338,7 +338,6 @@ class ChangeDetectionStore:
|
|||||||
self.needs_write_urgent = True
|
self.needs_write_urgent = True
|
||||||
|
|
||||||
def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True):
|
def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True):
|
||||||
import requests
|
|
||||||
|
|
||||||
if extras is None:
|
if extras is None:
|
||||||
extras = {}
|
extras = {}
|
||||||
@@ -349,6 +348,8 @@ class ChangeDetectionStore:
|
|||||||
|
|
||||||
# Was it a share link? try to fetch the data
|
# Was it a share link? try to fetch the data
|
||||||
if (url.startswith("https://changedetection.io/share/")):
|
if (url.startswith("https://changedetection.io/share/")):
|
||||||
|
import requests
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = requests.request(method="GET",
|
r = requests.request(method="GET",
|
||||||
url=url,
|
url=url,
|
||||||
|
|||||||
243
changedetectionio/test_cli_opts.sh
Executable file
243
changedetectionio/test_cli_opts.sh
Executable file
@@ -0,0 +1,243 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
# Test script for CLI options - Parallel execution
|
||||||
|
# Tests -u, -uN, -r, -b flags
|
||||||
|
|
||||||
|
set -u # Exit on undefined variables
|
||||||
|
|
||||||
|
# Color output
|
||||||
|
RED='\033[0;31m'
|
||||||
|
GREEN='\033[0;32m'
|
||||||
|
YELLOW='\033[1;33m'
|
||||||
|
NC='\033[0m' # No Color
|
||||||
|
|
||||||
|
# Test results directory (for parallel safety)
|
||||||
|
TEST_RESULTS_DIR="/tmp/cli-test-results-$$"
|
||||||
|
mkdir -p "$TEST_RESULTS_DIR"
|
||||||
|
|
||||||
|
# Cleanup function
|
||||||
|
cleanup() {
|
||||||
|
echo ""
|
||||||
|
echo "=== Cleaning up test directories ==="
|
||||||
|
rm -rf /tmp/cli-test-* 2>/dev/null || true
|
||||||
|
rm -rf "$TEST_RESULTS_DIR" 2>/dev/null || true
|
||||||
|
# Kill any hanging processes
|
||||||
|
pkill -f "changedetection.py.*cli-test" 2>/dev/null || true
|
||||||
|
}
|
||||||
|
trap cleanup EXIT
|
||||||
|
|
||||||
|
# Helper to record test result
|
||||||
|
record_result() {
|
||||||
|
local test_num=$1
|
||||||
|
local status=$2 # pass or fail
|
||||||
|
local message=$3
|
||||||
|
|
||||||
|
echo "$status|$message" > "$TEST_RESULTS_DIR/test_${test_num}.result"
|
||||||
|
}
|
||||||
|
|
||||||
|
# Run a test in background
|
||||||
|
run_test() {
|
||||||
|
local test_num=$1
|
||||||
|
local test_name=$2
|
||||||
|
local test_func=$3
|
||||||
|
|
||||||
|
(
|
||||||
|
echo -e "${YELLOW}[Test $test_num]${NC} $test_name"
|
||||||
|
if $test_func "$test_num"; then
|
||||||
|
record_result "$test_num" "pass" "$test_name"
|
||||||
|
echo -e "${GREEN}✓ PASS${NC}: $test_name"
|
||||||
|
else
|
||||||
|
record_result "$test_num" "fail" "$test_name"
|
||||||
|
echo -e "${RED}✗ FAIL${NC}: $test_name"
|
||||||
|
fi
|
||||||
|
) &
|
||||||
|
}
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Test Functions (each runs independently)
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
test_help_flag() {
|
||||||
|
local test_id=$1
|
||||||
|
timeout 3 python3 changedetection.py --help 2>&1 | grep -q "Add URLs on startup"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_version_flag() {
|
||||||
|
local test_id=$1
|
||||||
|
timeout 3 python3 changedetection.py --version 2>&1 | grep -qE "changedetection.io [0-9]+\.[0-9]+"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_single_url() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-single-${test_id}-$$"
|
||||||
|
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
|
||||||
|
[ -f "$dir/url-watches.json" ] && \
|
||||||
|
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ]
|
||||||
|
}
|
||||||
|
|
||||||
|
test_multiple_urls() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-multi-${test_id}-$$"
|
||||||
|
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-u https://github.com \
|
||||||
|
-u https://httpbin.org \
|
||||||
|
-b &>/dev/null
|
||||||
|
[ -f "$dir/url-watches.json" ] && \
|
||||||
|
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ]
|
||||||
|
}
|
||||||
|
|
||||||
|
test_url_with_options() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-opts-${test_id}-$$"
|
||||||
|
timeout 10 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-u0 '{"title":"Test Site","processor":"text_json_diff"}' \
|
||||||
|
-b &>/dev/null
|
||||||
|
[ -f "$dir/url-watches.json" ] && \
|
||||||
|
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_multiple_urls_with_options() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-multi-opts-${test_id}-$$"
|
||||||
|
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-u0 '{"title":"Site One"}' \
|
||||||
|
-u https://github.com \
|
||||||
|
-u1 '{"title":"Site Two"}' \
|
||||||
|
-b &>/dev/null
|
||||||
|
[ -f "$dir/url-watches.json" ] && \
|
||||||
|
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \
|
||||||
|
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_batch_mode_exit() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-batch-${test_id}-$$"
|
||||||
|
local start=$(date +%s)
|
||||||
|
timeout 15 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-b &>/dev/null
|
||||||
|
local end=$(date +%s)
|
||||||
|
local elapsed=$((end - start))
|
||||||
|
[ $elapsed -lt 14 ]
|
||||||
|
}
|
||||||
|
|
||||||
|
test_recheck_all() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-recheck-all-${test_id}-$$"
|
||||||
|
mkdir -p "$dir"
|
||||||
|
cat > "$dir/url-watches.json" << 'EOF'
|
||||||
|
{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}}
|
||||||
|
EOF
|
||||||
|
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_recheck_specific() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$"
|
||||||
|
mkdir -p "$dir"
|
||||||
|
cat > "$dir/url-watches.json" << 'EOF'
|
||||||
|
{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}}
|
||||||
|
EOF
|
||||||
|
timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_combined_operations() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-combined-${test_id}-$$"
|
||||||
|
timeout 12 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-u https://github.com \
|
||||||
|
-r all \
|
||||||
|
-b &>/dev/null
|
||||||
|
[ -f "$dir/url-watches.json" ] && \
|
||||||
|
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ]
|
||||||
|
}
|
||||||
|
|
||||||
|
test_invalid_json() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-invalid-${test_id}-$$"
|
||||||
|
timeout 5 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-u0 'invalid json here' \
|
||||||
|
2>&1 | grep -qi "invalid json\|json decode error"
|
||||||
|
}
|
||||||
|
|
||||||
|
test_create_directory() {
|
||||||
|
local test_id=$1
|
||||||
|
local dir="/tmp/cli-test-create-${test_id}-$$/nested/path"
|
||||||
|
timeout 10 python3 changedetection.py -d "$dir" -C \
|
||||||
|
-u https://example.com \
|
||||||
|
-b &>/dev/null
|
||||||
|
[ -d "$dir" ]
|
||||||
|
}
|
||||||
|
|
||||||
|
# =============================================================================
|
||||||
|
# Main Test Execution
|
||||||
|
# =============================================================================
|
||||||
|
|
||||||
|
echo "=========================================="
|
||||||
|
echo " CLI Options Test Suite (Parallel)"
|
||||||
|
echo "=========================================="
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Launch all tests in parallel
|
||||||
|
run_test 1 "Help flag (--help) shows usage without initialization" test_help_flag
|
||||||
|
run_test 2 "Version flag (--version) displays version" test_version_flag
|
||||||
|
run_test 3 "Add single URL with -u flag" test_single_url
|
||||||
|
run_test 4 "Add multiple URLs with multiple -u flags" test_multiple_urls
|
||||||
|
run_test 5 "Add URL with JSON options using -u0" test_url_with_options
|
||||||
|
run_test 6 "Add multiple URLs with different options (-u0, -u1)" test_multiple_urls_with_options
|
||||||
|
run_test 7 "Batch mode (-b) exits automatically after processing" test_batch_mode_exit
|
||||||
|
run_test 8 "Recheck all watches with -r all" test_recheck_all
|
||||||
|
run_test 9 "Recheck specific watches with -r UUID" test_recheck_specific
|
||||||
|
run_test 10 "Combined: Add URLs and recheck all with -u and -r all" test_combined_operations
|
||||||
|
run_test 11 "Invalid JSON in -u0 option should show error" test_invalid_json
|
||||||
|
run_test 12 "Create datastore directory with -C flag" test_create_directory
|
||||||
|
|
||||||
|
# Wait for all tests to complete
|
||||||
|
echo ""
|
||||||
|
echo "Waiting for all tests to complete..."
|
||||||
|
wait
|
||||||
|
|
||||||
|
# Collect results
|
||||||
|
echo ""
|
||||||
|
echo "=========================================="
|
||||||
|
echo " Test Summary"
|
||||||
|
echo "=========================================="
|
||||||
|
|
||||||
|
TESTS_RUN=0
|
||||||
|
TESTS_PASSED=0
|
||||||
|
TESTS_FAILED=0
|
||||||
|
|
||||||
|
for result_file in "$TEST_RESULTS_DIR"/test_*.result; do
|
||||||
|
if [ -f "$result_file" ]; then
|
||||||
|
TESTS_RUN=$((TESTS_RUN + 1))
|
||||||
|
status=$(cut -d'|' -f1 < "$result_file")
|
||||||
|
if [ "$status" = "pass" ]; then
|
||||||
|
TESTS_PASSED=$((TESTS_PASSED + 1))
|
||||||
|
else
|
||||||
|
TESTS_FAILED=$((TESTS_FAILED + 1))
|
||||||
|
fi
|
||||||
|
fi
|
||||||
|
done
|
||||||
|
|
||||||
|
echo "Tests run: $TESTS_RUN"
|
||||||
|
echo -e "${GREEN}Tests passed: $TESTS_PASSED${NC}"
|
||||||
|
if [ $TESTS_FAILED -gt 0 ]; then
|
||||||
|
echo -e "${RED}Tests failed: $TESTS_FAILED${NC}"
|
||||||
|
else
|
||||||
|
echo -e "${GREEN}Tests failed: $TESTS_FAILED${NC}"
|
||||||
|
fi
|
||||||
|
echo "=========================================="
|
||||||
|
echo ""
|
||||||
|
|
||||||
|
# Exit with appropriate code
|
||||||
|
if [ $TESTS_FAILED -gt 0 ]; then
|
||||||
|
echo -e "${RED}Some tests failed!${NC}"
|
||||||
|
exit 1
|
||||||
|
else
|
||||||
|
echo -e "${GREEN}All tests passed!${NC}"
|
||||||
|
exit 0
|
||||||
|
fi
|
||||||
@@ -5,13 +5,11 @@ from threading import Thread
|
|||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import arrow
|
import arrow
|
||||||
from changedetectionio import changedetection_app
|
|
||||||
from changedetectionio import store
|
from changedetectionio import store
|
||||||
import os
|
import os
|
||||||
import sys
|
import sys
|
||||||
from loguru import logger
|
|
||||||
|
|
||||||
from changedetectionio.flask_app import init_app_secret
|
from changedetectionio.flask_app import init_app_secret, changedetection_app
|
||||||
from changedetectionio.tests.util import live_server_setup, new_live_server_setup
|
from changedetectionio.tests.util import live_server_setup, new_live_server_setup
|
||||||
|
|
||||||
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py
|
# https://github.com/pallets/flask/blob/1.1.2/examples/tutorial/tests/test_auth.py
|
||||||
|
|||||||
@@ -140,38 +140,13 @@ def delete_all_watches(client=None):
|
|||||||
def wait_for_all_checks(client=None):
|
def wait_for_all_checks(client=None):
|
||||||
"""
|
"""
|
||||||
Waits until the queue is empty and workers are idle.
|
Waits until the queue is empty and workers are idle.
|
||||||
Much faster than the original with adaptive timing.
|
Delegates to worker_handler.wait_for_all_checks for shared logic.
|
||||||
"""
|
"""
|
||||||
from changedetectionio.flask_app import update_q as global_update_q
|
from changedetectionio.flask_app import update_q as global_update_q
|
||||||
from changedetectionio import worker_handler
|
from changedetectionio import worker_handler
|
||||||
empty_since = None
|
|
||||||
attempt = 0
|
|
||||||
max_attempts = 150 # Still reasonable upper bound
|
|
||||||
|
|
||||||
while attempt < max_attempts:
|
# Use the shared wait logic from worker_handler
|
||||||
# Start with fast checks, slow down if needed
|
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
|
||||||
if attempt < 10:
|
|
||||||
time.sleep(0.2) # Very fast initial checks
|
|
||||||
elif attempt < 30:
|
|
||||||
time.sleep(0.4) # Medium speed
|
|
||||||
else:
|
|
||||||
time.sleep(0.8) # Slower for persistent issues
|
|
||||||
|
|
||||||
q_length = global_update_q.qsize()
|
|
||||||
running_uuids = worker_handler.get_running_uuids()
|
|
||||||
any_workers_busy = len(running_uuids) > 0
|
|
||||||
|
|
||||||
if q_length == 0 and not any_workers_busy:
|
|
||||||
if empty_since is None:
|
|
||||||
empty_since = time.time()
|
|
||||||
# Brief stabilization period for async workers
|
|
||||||
elif time.time() - empty_since >= 0.3:
|
|
||||||
break
|
|
||||||
else:
|
|
||||||
empty_since = None
|
|
||||||
|
|
||||||
attempt += 1
|
|
||||||
time.sleep(0.3)
|
|
||||||
|
|
||||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||||
"""
|
"""
|
||||||
|
|||||||
@@ -17,6 +17,7 @@ worker_threads = [] # List of WorkerThread objects
|
|||||||
|
|
||||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||||
currently_processing_uuids = {}
|
currently_processing_uuids = {}
|
||||||
|
_uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids
|
||||||
|
|
||||||
# Configuration - async workers only
|
# Configuration - async workers only
|
||||||
USE_ASYNC_WORKERS = True
|
USE_ASYNC_WORKERS = True
|
||||||
@@ -207,31 +208,80 @@ def get_worker_count():
|
|||||||
|
|
||||||
def get_running_uuids():
|
def get_running_uuids():
|
||||||
"""Get list of UUIDs currently being processed by async workers"""
|
"""Get list of UUIDs currently being processed by async workers"""
|
||||||
return list(currently_processing_uuids.keys())
|
with _uuid_processing_lock:
|
||||||
|
return list(currently_processing_uuids.keys())
|
||||||
|
|
||||||
|
|
||||||
|
def claim_uuid_for_processing(uuid, worker_id):
|
||||||
|
"""
|
||||||
|
Atomically check if UUID is available and claim it for processing.
|
||||||
|
|
||||||
|
This is thread-safe and prevents race conditions where multiple workers
|
||||||
|
try to process the same UUID simultaneously.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uuid: The watch UUID to claim
|
||||||
|
worker_id: The ID of the worker claiming this UUID
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
True if successfully claimed (UUID was not being processed)
|
||||||
|
False if already being processed by another worker
|
||||||
|
"""
|
||||||
|
with _uuid_processing_lock:
|
||||||
|
if uuid in currently_processing_uuids:
|
||||||
|
# Already being processed by another worker
|
||||||
|
return False
|
||||||
|
# Claim it atomically
|
||||||
|
currently_processing_uuids[uuid] = worker_id
|
||||||
|
logger.debug(f"Worker {worker_id} claimed UUID: {uuid}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
|
||||||
|
def release_uuid_from_processing(uuid, worker_id):
|
||||||
|
"""
|
||||||
|
Release a UUID from processing (thread-safe).
|
||||||
|
|
||||||
|
Args:
|
||||||
|
uuid: The watch UUID to release
|
||||||
|
worker_id: The ID of the worker releasing this UUID
|
||||||
|
"""
|
||||||
|
with _uuid_processing_lock:
|
||||||
|
# Only remove if this worker owns it (defensive)
|
||||||
|
if currently_processing_uuids.get(uuid) == worker_id:
|
||||||
|
currently_processing_uuids.pop(uuid, None)
|
||||||
|
logger.debug(f"Worker {worker_id} released UUID: {uuid}")
|
||||||
|
else:
|
||||||
|
logger.warning(f"Worker {worker_id} tried to release UUID {uuid} but doesn't own it (owned by {currently_processing_uuids.get(uuid, 'nobody')})")
|
||||||
|
|
||||||
|
|
||||||
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
def set_uuid_processing(uuid, worker_id=None, processing=True):
|
||||||
"""Mark a UUID as being processed or completed by a specific worker"""
|
"""
|
||||||
global currently_processing_uuids
|
Mark a UUID as being processed or completed by a specific worker.
|
||||||
|
|
||||||
|
DEPRECATED: Use claim_uuid_for_processing() and release_uuid_from_processing() instead.
|
||||||
|
This function is kept for backward compatibility but doesn't provide atomic check-and-set.
|
||||||
|
"""
|
||||||
if processing:
|
if processing:
|
||||||
currently_processing_uuids[uuid] = worker_id
|
with _uuid_processing_lock:
|
||||||
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
currently_processing_uuids[uuid] = worker_id
|
||||||
|
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
|
||||||
else:
|
else:
|
||||||
currently_processing_uuids.pop(uuid, None)
|
release_uuid_from_processing(uuid, worker_id)
|
||||||
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
|
|
||||||
|
|
||||||
|
|
||||||
def is_watch_running(watch_uuid):
|
def is_watch_running(watch_uuid):
|
||||||
"""Check if a specific watch is currently being processed by any worker"""
|
"""Check if a specific watch is currently being processed by any worker"""
|
||||||
return watch_uuid in currently_processing_uuids
|
with _uuid_processing_lock:
|
||||||
|
return watch_uuid in currently_processing_uuids
|
||||||
|
|
||||||
|
|
||||||
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
|
||||||
"""Check if a specific watch is currently being processed by a different worker"""
|
"""Check if a specific watch is currently being processed by a different worker"""
|
||||||
if watch_uuid not in currently_processing_uuids:
|
with _uuid_processing_lock:
|
||||||
return False
|
if watch_uuid not in currently_processing_uuids:
|
||||||
processing_worker_id = currently_processing_uuids[watch_uuid]
|
return False
|
||||||
return processing_worker_id != current_worker_id
|
processing_worker_id = currently_processing_uuids[watch_uuid]
|
||||||
|
return processing_worker_id != current_worker_id
|
||||||
|
|
||||||
|
|
||||||
def queue_item_async_safe(update_q, item, silent=False):
|
def queue_item_async_safe(update_q, item, silent=False):
|
||||||
@@ -381,6 +431,53 @@ def get_worker_status():
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
def wait_for_all_checks(update_q, timeout=150):
|
||||||
|
"""
|
||||||
|
Wait for queue to be empty and all workers to be idle.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
update_q: The update queue to monitor
|
||||||
|
timeout: Maximum wait time in seconds (default 150 = 150 iterations * 0.2-0.8s)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if all checks completed, False if timeout
|
||||||
|
"""
|
||||||
|
import time
|
||||||
|
empty_since = None
|
||||||
|
attempt = 0
|
||||||
|
max_attempts = timeout
|
||||||
|
|
||||||
|
while attempt < max_attempts:
|
||||||
|
# Adaptive sleep - start fast, slow down if needed
|
||||||
|
if attempt < 10:
|
||||||
|
sleep_time = 0.2 # Very fast initial checks
|
||||||
|
elif attempt < 30:
|
||||||
|
sleep_time = 0.4 # Medium speed
|
||||||
|
else:
|
||||||
|
sleep_time = 0.8 # Slower for persistent issues
|
||||||
|
|
||||||
|
time.sleep(sleep_time)
|
||||||
|
|
||||||
|
q_length = update_q.qsize()
|
||||||
|
running_uuids = get_running_uuids()
|
||||||
|
any_workers_busy = len(running_uuids) > 0
|
||||||
|
|
||||||
|
if q_length == 0 and not any_workers_busy:
|
||||||
|
if empty_since is None:
|
||||||
|
empty_since = time.time()
|
||||||
|
# Brief stabilization period for async workers
|
||||||
|
elif time.time() - empty_since >= 0.3:
|
||||||
|
# Add small buffer for filesystem operations to complete
|
||||||
|
time.sleep(0.2)
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
empty_since = None
|
||||||
|
|
||||||
|
attempt += 1
|
||||||
|
|
||||||
|
return False # Timeout
|
||||||
|
|
||||||
|
|
||||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||||
"""
|
"""
|
||||||
Check if the expected number of async workers are running and restart any missing ones.
|
Check if the expected number of async workers are running and restart any missing ones.
|
||||||
|
|||||||
Reference in New Issue
Block a user