Compare commits

..

10 Commits

Author SHA1 Message Date
dgtlmoon
bee6713f12 WIP
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-22 19:54:28 +01:00
dgtlmoon
ef6208fdbd tweaks 2026-01-22 17:36:29 +01:00
dgtlmoon
1ef29ab1b8 Small tweak 2026-01-22 17:19:12 +01:00
dgtlmoon
445ce88114 Misc performance improvements 2026-01-22 17:16:40 +01:00
dgtlmoon
eb83a253f4 Merge branch 'master' into datastore-refactor 2026-01-22 16:22:27 +01:00
dgtlmoon
746990391a test fix
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-19 18:58:39 +01:00
dgtlmoon
1661f7b85b tweak 2026-01-19 18:55:16 +01:00
dgtlmoon
1f37b358b1 Merge branch 'master' into datastore-refactor 2026-01-19 18:52:06 +01:00
dgtlmoon
639c53f0f8 left out pypi file 2026-01-19 13:11:57 +01:00
dgtlmoon
48e8295433 Big refactor to save watches as their own datafile with some agnostic data store backend 2026-01-19 12:59:10 +01:00
20 changed files with 2874 additions and 1779 deletions

View File

@@ -61,8 +61,8 @@ jobs:
# --- API test ---
# This also means that the docs/api-spec.yml was shipped and could be read
test -f /tmp/url-watches.json
API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/url-watches.json)
test -f /tmp/changedetection.json
API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/changedetection.json)
echo Test API KEY is $API_KEY
curl -X POST "http://127.0.0.1:10000/api/v1/watch" \
-H "x-api-key: ${API_KEY}" \

View File

@@ -111,32 +111,6 @@ jobs:
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
docker run --name test-cdio-basic-tests --network changedet-network test-changedetectionio bash -c 'cd changedetectionio && ./run_basic_tests.sh'
- name: Test CLI options
run: |
docker network inspect changedet-network >/dev/null 2>&1 || docker network create changedet-network
docker run --name test-cdio-cli-opts --network changedet-network test-changedetectionio bash -c 'changedetectionio/test_cli_opts.sh' &> cli-opts-output.txt
echo "=== CLI Options Test Output ==="
cat cli-opts-output.txt
- name: CLI Memory Test
run: |
echo "=== Checking CLI batch mode memory usage ==="
# Extract RSS memory value from output
RSS_MB=$(grep -oP "Memory consumption before worker shutdown: RSS=\K[\d.]+" cli-opts-output.txt | head -1 || echo "0")
echo "RSS Memory: ${RSS_MB} MB"
# Check if RSS is less than 100MB
if [ -n "$RSS_MB" ]; then
if (( $(echo "$RSS_MB < 100" | bc -l) )); then
echo "✓ Memory usage is acceptable: ${RSS_MB} MB < 100 MB"
else
echo "✗ Memory usage too high: ${RSS_MB} MB >= 100 MB"
exit 1
fi
else
echo "⚠ Could not extract memory usage, skipping check"
fi
- name: Extract memory report and logs
if: always()
uses: ./.github/actions/extract-memory-report
@@ -151,13 +125,6 @@ jobs:
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
path: output-logs
- name: Store CLI test output
if: always()
uses: actions/upload-artifact@v6
with:
name: test-cdio-cli-opts-output-py${{ env.PYTHON_VERSION }}
path: cli-opts-output.txt
# Playwright tests
playwright-tests:
runs-on: ubuntu-latest

View File

@@ -9,6 +9,7 @@ recursive-include changedetectionio/notification *
recursive-include changedetectionio/processors *
recursive-include changedetectionio/realtime *
recursive-include changedetectionio/static *
recursive-include changedetectionio/store *
recursive-include changedetectionio/templates *
recursive-include changedetectionio/tests *
recursive-include changedetectionio/translations *

View File

@@ -6,20 +6,19 @@ __version__ = '0.52.9'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError
from loguru import logger
import getopt
import logging
import os
import getopt
import platform
import signal
import threading
import time
import sys
# Eventlet completely removed - using threading mode for SocketIO
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
# Note: store and changedetection_app are imported inside main() to avoid
# initialization before argument parsing (allows --help to work without loading everything)
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app
from loguru import logger
# ==============================================================================
# Multiprocessing Configuration - CRITICAL for Thread Safety
@@ -84,22 +83,11 @@ def get_version():
def sigshutdown_handler(_signo, _stack_frame):
name = signal.Signals(_signo).name
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
# Set exit flag immediately to stop all loops
app.config.exit.set()
datastore.stop_thread = True
# Log memory consumption before shutting down workers (cross-platform)
try:
import psutil
process = psutil.Process()
mem_info = process.memory_info()
rss_mb = mem_info.rss / 1024 / 1024
vms_mb = mem_info.vms / 1024 / 1024
logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB")
except Exception as e:
logger.warning(f"Could not retrieve memory stats: {str(e)}")
# Shutdown workers and queues immediately
try:
from changedetectionio import worker_handler
@@ -124,12 +112,12 @@ def sigshutdown_handler(_signo, _stack_frame):
except Exception as e:
logger.error(f"Error shutting down Socket.IO server: {str(e)}")
# Save data quickly
# Save data quickly - force immediate save using abstract method
try:
datastore.sync_to_json()
logger.success('Fast sync to disk complete.')
datastore.force_save_all()
logger.success('Fast sync to storage complete.')
except Exception as e:
logger.error(f"Error syncing to disk: {str(e)}")
logger.error(f"Error syncing to storage: {str(e)}")
sys.exit()
@@ -137,47 +125,10 @@ def main():
global datastore
global app
# Early help/version check before any initialization
if '--help' in sys.argv or '-help' in sys.argv:
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print('')
sys.exit(0)
if '--version' in sys.argv or '-v' in sys.argv:
print(f'changedetection.io {__version__}')
sys.exit(0)
# Import heavy modules after help/version checks to keep startup fast for those flags
from changedetectionio import store
from changedetectionio.flask_app import changedetection_app
datastore_path = None
do_cleanup = False
# Optional URL to watch since start
default_url = None
# Set a default logger level
logger_level = 'DEBUG'
include_default_watches = True
@@ -186,12 +137,6 @@ def main():
port = int(os.environ.get('PORT', 5000))
ssl_mode = False
# Lists for multiple URLs and their options
urls_to_add = []
url_options = {} # Key: index (0-based), Value: dict of options
recheck_watches = None # None, 'all', or list of UUIDs
batch_mode = False # Run once then exit when queue is empty
# On Windows, create and use a default path.
if os.name == 'nt':
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
@@ -200,85 +145,10 @@ def main():
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
datastore_path = os.path.join(os.getcwd(), "../datastore")
# Pre-process arguments to extract -u, -u<N>, and -r options before getopt
# This allows unlimited -u0, -u1, -u2, ... options without predefining them
cleaned_argv = ['changedetection.py'] # Start with program name
i = 1
while i < len(sys.argv):
arg = sys.argv[i]
# Handle -u (add URL)
if arg == '-u' and i + 1 < len(sys.argv):
urls_to_add.append(sys.argv[i + 1])
i += 2
continue
# Handle -u<N> (set options for URL at index N)
if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit():
idx = int(arg[2:])
if i + 1 < len(sys.argv):
try:
import json
url_options[idx] = json.loads(sys.argv[i + 1])
except json.JSONDecodeError as e:
print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}')
print(f'JSON decode error: {e}')
sys.exit(2)
i += 2
continue
# Handle -r (recheck watches)
if arg == '-r' and i + 1 < len(sys.argv):
recheck_arg = sys.argv[i + 1]
if recheck_arg.lower() == 'all':
recheck_watches = 'all'
else:
# Parse comma-separated list of UUIDs
recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()]
i += 2
continue
# Handle -b (batch mode - run once and exit)
if arg == '-b':
batch_mode = True
i += 1
continue
# Keep other arguments for getopt
cleaned_argv.append(arg)
i += 1
try:
opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port")
except getopt.GetoptError as e:
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print('')
print(f'Error: {e}')
opts, args = getopt.getopt(sys.argv[1:], "6Ccsd:h:p:l:u:", "port")
except getopt.GetoptError:
print('backend.py -s SSL enable -h [host] -p [port] -d [datastore path] -u [default URL to watch] -l [debug level - TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL]')
sys.exit(2)
create_datastore_dir = False
@@ -303,6 +173,10 @@ def main():
if opt == '-d':
datastore_path = arg
if opt == '-u':
default_url = arg
include_default_watches = False
# Cleanup (remove text files that arent in the index)
if opt == '-c':
do_cleanup = True
@@ -314,10 +188,6 @@ def main():
if opt == '-l':
logger_level = int(arg) if arg.isdigit() else arg.upper()
# If URLs are provided, don't include default watches
if urls_to_add:
include_default_watches = False
logger.success(f"changedetection.io version {get_version()} starting.")
# Launch using SocketIO run method for proper integration (if enabled)
@@ -354,14 +224,11 @@ def main():
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
app_config = {
'datastore_path': datastore_path,
'batch_mode': batch_mode
}
app_config = {'datastore_path': datastore_path}
if not os.path.isdir(app_config['datastore_path']):
if create_datastore_dir:
os.makedirs(app_config['datastore_path'], exist_ok=True)
os.mkdir(app_config['datastore_path'])
else:
logger.critical(
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
@@ -382,132 +249,11 @@ def main():
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
inject_datastore_into_plugins(datastore)
# Step 1: Add URLs with their options (if provided via -u flags)
added_watch_uuids = []
if urls_to_add:
logger.info(f"Adding {len(urls_to_add)} URL(s) from command line")
for idx, url in enumerate(urls_to_add):
extras = url_options.get(idx, {})
if extras:
logger.debug(f"Adding watch {idx}: {url} with options: {extras}")
else:
logger.debug(f"Adding watch {idx}: {url}")
new_uuid = datastore.add_watch(url=url, extras=extras)
if new_uuid:
added_watch_uuids.append(new_uuid)
logger.success(f"Added watch: {url} (UUID: {new_uuid})")
else:
logger.error(f"Failed to add watch: {url}")
if default_url:
datastore.add_watch(url = default_url)
app = changedetection_app(app_config, datastore)
# Step 2: Queue newly added watches (if -u was provided in batch mode)
# This must happen AFTER app initialization so update_q is available
if batch_mode and added_watch_uuids:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
for watch_uuid in added_watch_uuids:
try:
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
logger.debug(f"Queued newly added watch: {watch_uuid}")
except Exception as e:
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
# Step 3: Queue watches for recheck (if -r was provided)
# This must happen AFTER app initialization so update_q is available
if recheck_watches is not None:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
watches_to_queue = []
if recheck_watches == 'all':
# Queue all watches, excluding those already queued in batch mode
all_watches = list(datastore.data['watching'].keys())
if batch_mode and added_watch_uuids:
# Exclude newly added watches that were already queued in batch mode
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)")
else:
watches_to_queue = all_watches
logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck")
else:
# Queue specific UUIDs
watches_to_queue = recheck_watches
logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck")
queued_count = 0
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_handler.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
queued_count += 1
logger.debug(f"Queued watch for recheck: {watch_uuid}")
except Exception as e:
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
else:
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
logger.success(f"Successfully queued {queued_count} watches for recheck")
# Step 4: Setup batch mode monitor (if -b was provided)
if batch_mode:
from changedetectionio.flask_app import update_q
def batch_mode_monitor():
"""Monitor queue and workers, shutdown when all work is complete"""
import time
logger.info("Batch mode: Waiting for all queued items to complete...")
# Wait a bit for workers to start processing
time.sleep(3)
idle_start = None
idle_threshold = 3 # Seconds to wait after queue is empty
while True:
try:
queue_size = update_q.qsize()
running_uuids = worker_handler.get_running_uuids()
running_count = len(running_uuids)
logger.info(f"Batch mode: Queue size: {queue_size}, Running workers: {running_count}, Running UUIDs: {running_uuids}")
if queue_size == 0 and running_count == 0:
if idle_start is None:
idle_start = time.time()
logger.info(f"Batch mode: Queue empty and workers idle, waiting {idle_threshold}s before shutdown...")
elif time.time() - idle_start >= idle_threshold:
logger.success("Batch mode: All work completed, initiating shutdown")
# Trigger shutdown
import os, signal
os.kill(os.getpid(), signal.SIGTERM)
return
else:
if idle_start is not None:
logger.info(f"Batch mode: Activity detected (queue: {queue_size}, running: {running_count})")
idle_start = None
time.sleep(2)
except Exception as e:
logger.error(f"Batch mode monitor error: {e}")
time.sleep(2)
# Start monitor in background thread
monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor")
monitor_thread.start()
logger.info("Batch mode enabled: Will exit after all queued items are processed")
# Get the SocketIO instance from the Flask app (created in flask_app.py)
from changedetectionio.flask_app import socketio_server
global socketio
@@ -568,33 +314,20 @@ def main():
app.wsgi_app = ProxyFix(app.wsgi_app, x_prefix=1, x_host=1)
# In batch mode, skip starting the HTTP server - just keep workers running
if batch_mode:
logger.info("Batch mode: Skipping HTTP server startup, workers will process queue")
logger.info("Batch mode: Main thread will wait for shutdown signal")
# Keep main thread alive until batch monitor triggers shutdown
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
logger.info("Batch mode: Keyboard interrupt received")
pass
else:
# Normal mode: Start HTTP server
# SocketIO instance is already initialized in flask_app.py
if socketio_server:
if ssl_mode:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
socketio.run(app, host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
else:
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
# SocketIO instance is already initialized in flask_app.py
if socketio_server:
if ssl_mode:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
socketio.run(app, host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
else:
# Run Flask app without Socket.IO if disabled
logger.info("Starting Flask app without Socket.IO server")
if ssl_mode:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
app.run(host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file))
else:
app.run(host=host, port=int(port), debug=False)
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
else:
# Run Flask app without Socket.IO if disabled
logger.info("Starting Flask app without Socket.IO server")
if ssl_mode:
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
app.run(host=host, port=int(port), debug=False,
ssl_context=(ssl_cert_file, ssl_privkey_file))
else:
app.run(host=host, port=int(port), debug=False)

View File

@@ -27,11 +27,23 @@ def create_backup(datastore_path, watches: dict):
compression=zipfile.ZIP_DEFLATED,
compresslevel=8) as zipObj:
# Add the index
zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json")
# Add the settings file (supports both formats)
# New format: changedetection.json
changedetection_json = os.path.join(datastore_path, "changedetection.json")
if os.path.isfile(changedetection_json):
zipObj.write(changedetection_json, arcname="changedetection.json")
logger.debug("Added changedetection.json to backup")
# Add the flask app secret
zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt")
# Legacy format: url-watches.json (for backward compatibility)
url_watches_json = os.path.join(datastore_path, "url-watches.json")
if os.path.isfile(url_watches_json):
zipObj.write(url_watches_json, arcname="url-watches.json")
logger.debug("Added url-watches.json to backup")
# Add the flask app secret (if it exists)
secret_file = os.path.join(datastore_path, "secret.txt")
if os.path.isfile(secret_file):
zipObj.write(secret_file, arcname="secret.txt")
# Add any data in the watch data directory.
for uuid, w in watches.items():
@@ -90,8 +102,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Maximum number of backups reached, please remove some"), "error")
return redirect(url_for('backups.index'))
# Be sure we're written fresh
datastore.sync_to_json()
# Be sure we're written fresh - force immediate save using abstract method
datastore.force_save_all()
zip_thread = threading.Thread(
target=create_backup,
args=(datastore.datastore_path, datastore.data.get("watching")),

View File

@@ -62,7 +62,7 @@ class import_url_list(Importer):
extras = None
if processor:
extras = {'processor': processor}
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, write_to_disk_now=False, extras=extras)
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, save_immediately=False, extras=extras)
if new_uuid:
# Straight into the queue.
@@ -129,7 +129,7 @@ class import_distill_io_json(Importer):
new_uuid = datastore.add_watch(url=d['uri'].strip(),
tag=",".join(d.get('tags', [])),
extras=extras,
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
@@ -204,7 +204,7 @@ class import_xlsx_wachete(Importer):
new_uuid = datastore.add_watch(url=data['url'].strip(),
extras=extras,
tag=data.get('folder'),
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
self.new_uuids.append(new_uuid)
@@ -287,7 +287,7 @@ class import_xlsx_custom(Importer):
new_uuid = datastore.add_watch(url=url,
extras=extras,
tag=tags,
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
self.new_uuids.append(new_uuid)

View File

@@ -24,6 +24,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['paused'] = True
datastore.mark_watch_dirty(uuid)
if emit_flash:
flash(gettext("{} watches paused").format(len(uuids)))
@@ -31,6 +32,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid.strip()]['paused'] = False
datastore.mark_watch_dirty(uuid)
if emit_flash:
flash(gettext("{} watches unpaused").format(len(uuids)))
@@ -45,6 +47,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['notification_muted'] = True
datastore.mark_watch_dirty(uuid)
if emit_flash:
flash(gettext("{} watches muted").format(len(uuids)))
@@ -52,6 +55,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['notification_muted'] = False
datastore.mark_watch_dirty(uuid)
if emit_flash:
flash(gettext("{} watches un-muted").format(len(uuids)))
@@ -67,6 +71,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]["last_error"] = False
datastore.mark_watch_dirty(uuid)
if emit_flash:
flash(gettext("{} watches errors cleared").format(len(uuids)))

View File

@@ -401,10 +401,7 @@ def changedetection_app(config=None, datastore_o=None):
# so far just for read-only via tests, but this will be moved eventually to be the main source
# (instead of the global var)
app.config['DATASTORE'] = datastore_o
# Store batch mode flag to skip background threads when running in batch mode
app.config['batch_mode'] = config.get('batch_mode', False) if config else False
# Store the signal in the app config to ensure it's accessible everywhere
app.config['watch_check_update_SIGNAL'] = watch_check_update
@@ -905,19 +902,14 @@ def changedetection_app(config=None, datastore_o=None):
logger.info(f"Starting {n_workers} workers during app initialization")
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# Skip background threads in batch mode (just process queue and exit)
batch_mode = app.config.get('batch_mode', False)
if not batch_mode:
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
else:
logger.info("Batch mode: Skipping ticker thread, notification runner, and version checker")
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
# This avoids circular dependencies

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,972 @@
import shutil
from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_safe_valid_url
from flask import (
flash
)
from flask_babel import gettext
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
from ..model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from copy import deepcopy, copy
from os import path, unlink
from threading import Lock
import json
import os
import re
import secrets
import sys
import threading
import time
import uuid as uuid_builder
from loguru import logger
from blinker import signal
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
from ..processors import get_custom_watch_obj_for_processor
from ..processors.restock_diff import Restock
# Import the base class and helpers
from .file_saving_datastore import FileSavingDataStore, load_all_watches, save_watch_atomic, save_json_atomic
from .updates import DatastoreUpdatesMixin
from .legacy_loader import has_legacy_datastore
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
# Open a github issue if you know something :)
# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change
class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
__version_check = True
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
# Initialize parent class
super().__init__()
# Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.datastore_path = datastore_path
self.needs_write = False
self.start_time = time.time()
self.stop_thread = False
self.save_version_copy_json_db(version_tag)
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
def save_version_copy_json_db(self, version_tag):
"""
Create version-tagged backup of changedetection.json.
This is called on version upgrades to preserve a backup in case
the new version has issues.
"""
import re
version_text = re.sub(r'\D+', '-', version_tag)
db_path = os.path.join(self.datastore_path, "changedetection.json")
db_path_version_backup = os.path.join(self.datastore_path, f"changedetection-{version_text}.json")
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
from shutil import copyfile
logger.info(f"Backing up changedetection.json due to new version to '{db_path_version_backup}'.")
copyfile(db_path, db_path_version_backup)
def _load_settings(self):
"""
Load settings from storage.
File backend implementation: reads from changedetection.json
Returns:
dict: Settings data loaded from storage
"""
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
logger.info(f"Loading settings from {changedetection_json}")
if HAS_ORJSON:
with open(changedetection_json, 'rb') as f:
return orjson.loads(f.read())
else:
with open(changedetection_json, 'r', encoding='utf-8') as f:
return json.load(f)
def _apply_settings(self, settings_data):
"""
Apply loaded settings data to internal data structure.
Args:
settings_data: Dictionary loaded from changedetection.json
"""
# Apply top-level fields
if 'app_guid' in settings_data:
self.__data['app_guid'] = settings_data['app_guid']
if 'build_sha' in settings_data:
self.__data['build_sha'] = settings_data['build_sha']
if 'version_tag' in settings_data:
self.__data['version_tag'] = settings_data['version_tag']
# Apply settings sections
if 'settings' in settings_data:
if 'headers' in settings_data['settings']:
self.__data['settings']['headers'].update(settings_data['settings']['headers'])
if 'requests' in settings_data['settings']:
self.__data['settings']['requests'].update(settings_data['settings']['requests'])
if 'application' in settings_data['settings']:
self.__data['settings']['application'].update(settings_data['settings']['application'])
def _rehydrate_tags(self):
"""Rehydrate tag entities from stored data."""
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(
uuid, tag, processor_override='restock_diff'
)
logger.info(f"Tag: {uuid} {tag['title']}")
def _load_state(self):
"""
Load complete datastore state from storage.
Orchestrates loading of settings and watches using polymorphic methods.
"""
# Load settings
settings_data = self._load_settings()
self._apply_settings(settings_data)
# Load watches (polymorphic - parent class method)
self._load_watches()
# Rehydrate tags
self._rehydrate_tags()
def reload_state(self, datastore_path, include_default_watches, version_tag):
"""
Load datastore from storage or create new one.
Supports two scenarios:
1. NEW format: changedetection.json exists → load and run updates if needed
2. EMPTY: No changedetection.json → create new OR trigger migration from legacy
Note: Legacy url-watches.json migration happens in update_26, not here.
"""
logger.info(f"Datastore path is '{datastore_path}'")
# Initialize data structure
self.__data = App.model()
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
# Base definition for all watchers (deepcopy part of #569)
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
# Load build SHA if available (Docker deployments)
if path.isfile('changedetectionio/source.txt'):
with open('changedetectionio/source.txt') as f:
self.__data['build_sha'] = f.read()
# Check if datastore already exists
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
if os.path.exists(changedetection_json):
# Load existing datastore (changedetection.json + watch.json files)
logger.info("Loading existing datastore")
try:
self._load_state()
except Exception as e:
logger.critical(f"Failed to load datastore: {e}")
raise
# Run schema updates if needed
# Pass current schema version from loaded datastore (defaults to 0 if not set)
current_schema = self.__data['settings']['application'].get('schema_version', 0)
self.run_updates(current_schema_version=current_schema)
else:
# No datastore yet - check if this is a fresh install or legacy migration
# Generate app_guid FIRST (required for all operations)
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else:
self.__data['app_guid'] = str(uuid_builder.uuid4())
# Generate RSS access token
self.__data['settings']['application']['rss_access_token'] = secrets.token_hex(16)
# Generate API access token
self.__data['settings']['application']['api_access_token'] = secrets.token_hex(16)
# Check if legacy datastore exists (url-watches.json)
if has_legacy_datastore(self.datastore_path):
# Legacy datastore detected - trigger migration
logger.critical(f"Legacy datastore detected at {self.datastore_path}/url-watches.json")
logger.critical("Migration will be triggered via update_26")
# Load the legacy datastore to get its schema_version
from .legacy_loader import load_legacy_format
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
legacy_data = load_legacy_format(legacy_path)
if not legacy_data:
raise Exception("Failed to load legacy datastore from url-watches.json")
# Get the schema version from legacy datastore (defaults to 0 if not present)
legacy_schema_version = legacy_data.get('settings', {}).get('application', {}).get('schema_version', 0)
logger.info(f"Legacy datastore schema version: {legacy_schema_version}")
# Set our schema version to match the legacy one
self.__data['settings']['application']['schema_version'] = legacy_schema_version
# update_26 will load the legacy data again and migrate to new format
# Only run updates AFTER the legacy schema version (e.g., if legacy is at 25, only run 26+)
self.run_updates(current_schema_version=legacy_schema_version)
else:
# Fresh install - create new datastore
logger.critical(f"No datastore found, creating new datastore at {self.datastore_path}")
# Set schema version to latest (no updates needed)
updates_available = self.get_updates_available()
self.__data['settings']['application']['schema_version'] = updates_available.pop() if updates_available else 26
# Add default watches if requested
if include_default_watches:
self.add_watch(
url='https://news.ycombinator.com/',
tag='Tech news',
extras={'fetch_backend': 'html_requests'}
)
self.add_watch(
url='https://changedetection.io/CHANGELOG.txt',
tag='changedetection.io',
extras={'fetch_backend': 'html_requests'}
)
# Create changedetection.json immediately
try:
self._save_settings()
logger.info("Created changedetection.json for new datastore")
except Exception as e:
logger.error(f"Failed to create initial changedetection.json: {e}")
# Set version tag
self.__data['version_tag'] = version_tag
# Validate proxies.json if it exists
_ = self.proxy_list # Just to test parsing
# Ensure app_guid exists (for datastores loaded from existing files)
if 'app_guid' not in self.__data:
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else:
self.__data['app_guid'] = str(uuid_builder.uuid4())
self.mark_settings_dirty()
# Ensure RSS access token exists
if not self.__data['settings']['application'].get('rss_access_token'):
secret = secrets.token_hex(16)
self.__data['settings']['application']['rss_access_token'] = secret
self.mark_settings_dirty()
# Ensure API access token exists
if not self.__data['settings']['application'].get('api_access_token'):
secret = secrets.token_hex(16)
self.__data['settings']['application']['api_access_token'] = secret
self.mark_settings_dirty()
# Handle password reset lockfile
password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock")
if path.isfile(password_reset_lockfile):
self.remove_password()
unlink(password_reset_lockfile)
# Start the background save thread
self.start_save_thread()
def rehydrate_entity(self, uuid, entity, processor_override=None):
"""Set the dict back to the dict Watch object"""
entity['uuid'] = uuid
if processor_override:
watch_class = get_custom_watch_obj_for_processor(processor_override)
entity['processor'] = processor_override
else:
watch_class = get_custom_watch_obj_for_processor(entity.get('processor'))
if entity.get('uuid') != 'text_json_diff':
logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}")
entity = watch_class(datastore_path=self.datastore_path, default=entity)
return entity
# ============================================================================
# FileSavingDataStore Abstract Method Implementations
# ============================================================================
def _watch_exists(self, uuid):
"""Check if watch exists in datastore."""
return uuid in self.__data['watching']
def _get_watch_dict(self, uuid):
"""Get watch as dictionary."""
return dict(self.__data['watching'][uuid])
def _build_settings_data(self):
"""
Build settings data structure for saving.
Returns:
dict: Settings data ready for serialization
"""
return {
'note': 'Settings file - watches are stored in individual {uuid}/watch.json files',
'app_guid': self.__data['app_guid'],
'settings': self.__data['settings'],
'build_sha': self.__data.get('build_sha'),
'version_tag': self.__data.get('version_tag')
}
def _save_settings(self):
"""
Save settings to storage.
File backend implementation: saves to changedetection.json
Implementation of abstract method from FileSavingDataStore.
Uses the generic save_json_atomic helper.
Raises:
OSError: If disk is full or other I/O error
"""
settings_data = self._build_settings_data()
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
save_json_atomic(changedetection_json, settings_data, label="settings", max_size_mb=10)
def _load_watches(self):
"""
Load all watches from storage.
File backend implementation: reads individual watch.json files
Implementation of abstract method from FileSavingDataStore.
Delegates to helper function and stores results in internal data structure.
"""
watching, watch_hashes = load_all_watches(
self.datastore_path,
self.rehydrate_entity,
self._compute_hash
)
# Store loaded data
self.__data['watching'] = watching
self._watch_hashes = watch_hashes
def _delete_watch(self, uuid):
"""
Delete a watch from storage.
File backend implementation: deletes entire {uuid}/ directory recursively.
Implementation of abstract method from FileSavingDataStore.
Args:
uuid: Watch UUID to delete
"""
watch_dir = os.path.join(self.datastore_path, uuid)
if os.path.exists(watch_dir):
shutil.rmtree(watch_dir)
logger.info(f"Deleted watch directory: {watch_dir}")
# ============================================================================
# Watch Management Methods
# ============================================================================
def set_last_viewed(self, uuid, timestamp):
logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}")
self.data['watching'][uuid].update({'last_viewed': int(timestamp)})
self.mark_watch_dirty(uuid)
watch_check_update = signal('watch_check_update')
if watch_check_update:
watch_check_update.send(watch_uuid=uuid)
def remove_password(self):
self.__data['settings']['application']['password'] = False
self.mark_settings_dirty()
def update_watch(self, uuid, update_obj):
# It's possible that the watch could be deleted before update
if not self.__data['watching'].get(uuid):
return
with self.lock:
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
for dict_key, d in self.generic_definition.items():
if isinstance(d, dict):
if update_obj is not None and dict_key in update_obj:
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
del (update_obj[dict_key])
self.__data['watching'][uuid].update(update_obj)
self.mark_watch_dirty(uuid)
@property
def threshold_seconds(self):
seconds = 0
for m, n in Watch.mtable.items():
x = self.__data['settings']['requests']['time_between_check'].get(m)
if x:
seconds += x * n
return seconds
@property
def unread_changes_count(self):
unread_changes_count = 0
for uuid, watch in self.__data['watching'].items():
if watch.history_n >= 2 and watch.viewed == False:
unread_changes_count += 1
return unread_changes_count
@property
def data(self):
# Re #152, Return env base_url if not overriden
# Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services
# like 'Join', so it's always best to atleast set something obvious so that they are not broken.
active_base_url = BASE_URL_NOT_SET_TEXT
if self.__data['settings']['application'].get('base_url'):
active_base_url = self.__data['settings']['application'].get('base_url')
elif os.getenv('BASE_URL'):
active_base_url = os.getenv('BASE_URL')
# I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable
# even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something
d = self.__data
d['settings']['application']['active_base_url'] = active_base_url.strip('" ')
return d
# Delete a single watch by UUID
def delete(self, uuid):
"""
Delete a watch by UUID.
Uses abstracted storage method for backend-agnostic deletion.
Supports 'all' to delete all watches (mainly for testing).
Args:
uuid: Watch UUID to delete, or 'all' to delete all watches
"""
with self.lock:
if uuid == 'all':
# Delete all watches - capture UUIDs first before modifying dict
all_uuids = list(self.__data['watching'].keys())
for watch_uuid in all_uuids:
# Delete from storage using polymorphic method
try:
self._delete_watch(watch_uuid)
except Exception as e:
logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}")
# Clean up tracking data
self._watch_hashes.pop(watch_uuid, None)
self._dirty_watches.discard(watch_uuid)
# Send delete signal
watch_delete_signal = signal('watch_deleted')
if watch_delete_signal:
watch_delete_signal.send(watch_uuid=watch_uuid)
# Clear the dict
self.__data['watching'] = {}
# Mainly used for testing to allow all items to flush before running next test
time.sleep(1)
else:
# Delete single watch from storage using polymorphic method
try:
self._delete_watch(uuid)
except Exception as e:
logger.error(f"Failed to delete watch {uuid} from storage: {e}")
# Remove from watching dict
del self.data['watching'][uuid]
# Clean up tracking data
self._watch_hashes.pop(uuid, None)
self._dirty_watches.discard(uuid)
# Send delete signal
watch_delete_signal = signal('watch_deleted')
if watch_delete_signal:
watch_delete_signal.send(watch_uuid=uuid)
self.needs_write_urgent = True
# Clone a watch by UUID
def clone(self, uuid):
url = self.data['watching'][uuid].get('url')
extras = deepcopy(self.data['watching'][uuid])
new_uuid = self.add_watch(url=url, extras=extras)
watch = self.data['watching'][new_uuid]
return new_uuid
def url_exists(self, url):
# Probably their should be dict...
for watch in self.data['watching'].values():
if watch['url'].lower() == url.lower():
return True
return False
# Remove a watchs data but keep the entry (URL etc)
def clear_watch_history(self, uuid):
self.__data['watching'][uuid].clear_watch()
self.needs_write_urgent = True
def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True):
import requests
if extras is None:
extras = {}
# Incase these are copied across, assume it's a reference and deepcopy()
apply_extras = deepcopy(extras)
apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
# Was it a share link? try to fetch the data
if (url.startswith("https://changedetection.io/share/")):
try:
r = requests.request(method="GET",
url=url,
# So we know to return the JSON instead of the human-friendly "help" page
headers={'App-Guid': self.__data['app_guid']},
timeout=5.0) # 5 second timeout to prevent blocking
res = r.json()
# List of permissible attributes we accept from the wild internet
for k in [
'body',
'browser_steps',
'css_filter',
'extract_text',
'headers',
'ignore_text',
'include_filters',
'method',
'paused',
'previous_md5',
'processor',
'subtractive_selectors',
'tag',
'tags',
'text_should_not_be_present',
'title',
'trigger_text',
'url',
'use_page_title_in_list',
'webdriver_js_execute_code',
]:
if res.get(k):
if k != 'css_filter':
apply_extras[k] = res[k]
else:
# We renamed the field and made it a list
apply_extras['include_filters'] = [res['css_filter']]
except Exception as e:
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
flash(gettext("Error fetching metadata for {}").format(url), 'error')
return False
if not is_safe_valid_url(url):
flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error')
return None
if tag and type(tag) == str:
# Then it's probably a string of the actual tag by name, split and add it
for t in tag.split(','):
# for each stripped tag, add tag as UUID
for a_t in t.split(','):
tag_uuid = self.add_tag(a_t)
apply_extras['tags'].append(tag_uuid)
# Or if UUIDs given directly
if tag_uuids:
for t in tag_uuids:
apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()]))
# Make any uuids unique
if apply_extras.get('tags'):
apply_extras['tags'] = list(set(apply_extras.get('tags')))
# If the processor also has its own Watch implementation
watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor'))
new_watch = watch_class(datastore_path=self.datastore_path, url=url)
new_uuid = new_watch.get('uuid')
logger.debug(f"Adding URL '{url}' - {new_uuid}")
for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']:
if k in apply_extras:
del apply_extras[k]
if not apply_extras.get('date_created'):
apply_extras['date_created'] = int(time.time())
new_watch.update(apply_extras)
new_watch.ensure_data_dir_exists()
self.__data['watching'][new_uuid] = new_watch
if save_immediately:
# Save immediately using polymorphic method
try:
self.save_watch(new_uuid, force=True)
logger.debug(f"Saved new watch {new_uuid}")
except Exception as e:
logger.error(f"Failed to save new watch {new_uuid}: {e}")
# Mark dirty for retry
self.mark_watch_dirty(new_uuid)
else:
self.mark_watch_dirty(new_uuid)
logger.debug(f"Added '{url}'")
return new_uuid
def _watch_resource_exists(self, watch_uuid, resource_name):
"""
Check if a watch-related resource exists.
File backend implementation: checks if file exists in watch directory.
Args:
watch_uuid: Watch UUID
resource_name: Name of resource (e.g., "last-screenshot.png")
Returns:
bool: True if resource exists
"""
resource_path = os.path.join(self.datastore_path, watch_uuid, resource_name)
return path.isfile(resource_path)
def visualselector_data_is_ready(self, watch_uuid):
"""
Check if visual selector data (screenshot + elements) is ready.
Returns:
bool: True if both screenshot and elements data exist
"""
has_screenshot = self._watch_resource_exists(watch_uuid, "last-screenshot.png")
has_elements = self._watch_resource_exists(watch_uuid, "elements.deflate")
return has_screenshot and has_elements
# Old sync_to_json and save_datastore methods removed - now handled by FileSavingDataStore parent class
# Go through the datastore path and remove any snapshots that are not mentioned in the index
# This usually is not used, but can be handy.
def remove_unused_snapshots(self):
logger.info("Removing snapshots from datastore that are not in the index..")
index = []
for uuid in self.data['watching']:
for id in self.data['watching'][uuid].history:
index.append(self.data['watching'][uuid].history[str(id)])
import pathlib
# Only in the sub-directories
for uuid in self.data['watching']:
for item in pathlib.Path(self.datastore_path).rglob(uuid + "/*.txt"):
if not str(item) in index:
logger.info(f"Removing {item}")
unlink(item)
@property
def proxy_list(self):
proxy_list = {}
proxy_list_file = os.path.join(self.datastore_path, 'proxies.json')
# Load from external config file
if path.isfile(proxy_list_file):
if HAS_ORJSON:
# orjson.loads() expects UTF-8 encoded bytes #3611
with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f:
proxy_list = orjson.loads(f.read())
else:
with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f:
proxy_list = json.load(f)
# Mapping from UI config if available
extras = self.data['settings']['requests'].get('extra_proxies')
if extras:
i = 0
for proxy in extras:
i += 0
if proxy.get('proxy_name') and proxy.get('proxy_url'):
k = "ui-" + str(i) + proxy.get('proxy_name')
proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')}
if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')):
proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''}
return proxy_list if len(proxy_list) else None
def get_preferred_proxy_for_watch(self, uuid):
"""
Returns the preferred proxy by ID key
:param uuid: UUID
:return: proxy "key" id
"""
if self.proxy_list is None:
return None
# If it's a valid one
watch = self.data['watching'].get(uuid)
if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy":
return None
if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()):
return watch.get('proxy')
# not valid (including None), try the system one
else:
system_proxy_id = self.data['settings']['requests'].get('proxy')
# Is not None and exists
if self.proxy_list.get(system_proxy_id):
return system_proxy_id
# Fallback - Did not resolve anything, or doesnt exist, use the first available
if system_proxy_id is None or not self.proxy_list.get(system_proxy_id):
first_default = list(self.proxy_list)[0]
return first_default
return None
@property
def has_extra_headers_file(self):
filepath = os.path.join(self.datastore_path, 'headers.txt')
return os.path.isfile(filepath)
def get_all_base_headers(self):
headers = {}
# Global app settings
headers.update(self.data['settings'].get('headers', {}))
return headers
def get_all_headers_in_textfile_for_watch(self, uuid):
from ..model.App import parse_headers_from_text_file
headers = {}
# Global in /datastore/headers.txt
filepath = os.path.join(self.datastore_path, 'headers.txt')
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
watch = self.data['watching'].get(uuid)
if watch:
# In /datastore/xyz-xyz/headers.txt
filepath = os.path.join(watch.watch_data_dir, 'headers.txt')
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
# In /datastore/tag-name.txt
tags = self.get_all_tags_for_watch(uuid=uuid)
for tag_uuid, tag in tags.items():
fname = "headers-" + re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt"
filepath = os.path.join(self.datastore_path, fname)
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
return headers
def get_tag_overrides_for_watch(self, uuid, attr):
tags = self.get_all_tags_for_watch(uuid=uuid)
ret = []
if tags:
for tag_uuid, tag in tags.items():
if attr in tag and tag[attr]:
ret = [*ret, *tag[attr]]
return ret
def add_tag(self, title):
# If name exists, return that
n = title.strip().lower()
logger.debug(f">>> Adding new tag - '{n}'")
if not n:
return False
for uuid, tag in self.__data['settings']['application'].get('tags', {}).items():
if n == tag.get('title', '').lower().strip():
logger.warning(f"Tag '{title}' already exists, skipping creation.")
return uuid
# Eventually almost everything todo with a watch will apply as a Tag
# So we use the same model as a Watch
with self.lock:
from ..model import Tag
new_tag = Tag.model(datastore_path=self.datastore_path, default={
'title': title.strip(),
'date_created': int(time.time())
})
new_uuid = new_tag.get('uuid')
self.__data['settings']['application']['tags'][new_uuid] = new_tag
self.mark_settings_dirty()
return new_uuid
def get_all_tags_for_watch(self, uuid):
"""This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet"""
watch = self.data['watching'].get(uuid)
# Should return a dict of full tag info linked by UUID
if watch:
return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', []))
return {}
@property
def extra_browsers(self):
res = []
p = list(filter(
lambda s: (s.get('browser_name') and s.get('browser_connection_url')),
self.__data['settings']['requests'].get('extra_browsers', [])))
if p:
for i in p:
res.append(("extra_browser_" + i['browser_name'], i['browser_name']))
return res
def tag_exists_by_name(self, tag_name):
# Check if any tag dictionary has a 'title' attribute matching the provided tag_name
tags = self.__data['settings']['application']['tags'].values()
return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()),
None)
def any_watches_have_processor_by_name(self, processor_name):
for watch in self.data['watching'].values():
if watch.get('processor') == processor_name:
return True
return False
def search_watches_for_url(self, query, tag_limit=None, partial=False):
"""Search watches by URL, title, or error messages
Args:
query (str): Search term to match against watch URLs, titles, and error messages
tag_limit (str, optional): Optional tag name to limit search results
partial: (bool, optional): sub-string matching
Returns:
list: List of UUIDs of watches that match the search criteria
"""
matching_uuids = []
query = query.lower().strip()
tag = self.tag_exists_by_name(tag_limit) if tag_limit else False
for uuid, watch in self.data['watching'].items():
# Filter by tag if requested
if tag_limit:
if not tag.get('uuid') in watch.get('tags', []):
continue
# Search in URL, title, or error messages
if partial:
if ((watch.get('title') and query in watch.get('title').lower()) or
query in watch.get('url', '').lower() or
(watch.get('last_error') and query in watch.get('last_error').lower())):
matching_uuids.append(uuid)
else:
if ((watch.get('title') and query == watch.get('title').lower()) or
query == watch.get('url', '').lower() or
(watch.get('last_error') and query == watch.get('last_error').lower())):
matching_uuids.append(uuid)
return matching_uuids
def get_unique_notification_tokens_available(self):
# Ask each type of watch if they have any extra notification token to add to the validation
extra_notification_tokens = {}
watch_processors_checked = set()
for watch_uuid, watch in self.__data['watching'].items():
processor = watch.get('processor')
if processor not in watch_processors_checked:
extra_notification_tokens.update(watch.extra_notification_token_values())
watch_processors_checked.add(processor)
return extra_notification_tokens
def get_unique_notification_token_placeholders_available(self):
# The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice
extra_notification_tokens = []
watch_processors_checked = set()
for watch_uuid, watch in self.__data['watching'].items():
processor = watch.get('processor')
if processor not in watch_processors_checked:
extra_notification_tokens += watch.extra_notification_token_placeholder_info()
watch_processors_checked.add(processor)
return extra_notification_tokens
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.mark_settings_dirty()
return notification_url
# Schema update methods moved to store/updates.py (DatastoreUpdatesMixin)
# This includes: get_updates_available(), run_updates(), and update_1() through update_26()

View File

@@ -0,0 +1,100 @@
"""
Base classes for the datastore.
This module defines the abstract interfaces that all datastore implementations must follow.
"""
from abc import ABC, abstractmethod
from threading import Lock
from loguru import logger
class DataStore(ABC):
"""
Abstract base class for all datastore implementations.
Defines the core interface that all datastores must implement for:
- Loading and saving data
- Managing watches
- Handling settings
- Providing data access
"""
lock = Lock()
datastore_path = None
@abstractmethod
def reload_state(self, datastore_path, include_default_watches, version_tag):
"""
Load data from persistent storage.
Args:
datastore_path: Path to the datastore directory
include_default_watches: Whether to create default watches if none exist
version_tag: Application version string
"""
pass
@abstractmethod
def add_watch(self, url, **kwargs):
"""
Add a new watch.
Args:
url: URL to watch
**kwargs: Additional watch parameters
Returns:
UUID of the created watch
"""
pass
@abstractmethod
def update_watch(self, uuid, update_obj):
"""
Update an existing watch.
Args:
uuid: Watch UUID
update_obj: Dictionary of fields to update
"""
pass
@abstractmethod
def delete(self, uuid):
"""
Delete a watch.
Args:
uuid: Watch UUID to delete
"""
pass
@property
@abstractmethod
def data(self):
"""
Access to the underlying data structure.
Returns:
Dictionary containing all datastore data
"""
pass
@abstractmethod
def force_save_all(self):
"""
Force immediate synchronous save of all data to storage.
This is the abstract method for forcing a complete save.
Different backends implement this differently:
- File backend: Mark all watches/settings dirty, then save
- Redis backend: SAVE command or pipeline flush
- SQL backend: COMMIT transaction
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
pass

View File

@@ -0,0 +1,897 @@
"""
File-based datastore with individual watch persistence and dirty tracking.
This module provides the FileSavingDataStore abstract class that implements:
- Individual watch.json file persistence
- Hash-based change detection (only save what changed)
- Periodic audit scan (catches unmarked changes)
- Background save thread with batched parallel saves
- Atomic file writes safe for NFS/NAS
"""
import glob
import hashlib
import json
import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from distutils.util import strtobool
from threading import Thread
from loguru import logger
from .base import DataStore
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
# Fsync configuration: Force file data to disk for crash safety
# Default False to match legacy behavior (write-and-rename without fsync)
# Set to True for mission-critical deployments requiring crash consistency
FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False')))
# Save interval configuration: How often the background thread saves dirty items
# Default 10 seconds - increase for less frequent saves, decrease for more frequent
DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS = int(os.getenv('DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS', '10'))
# Rolling audit configuration: Scans a fraction of watches each cycle
# Default: Run audit every 10s, split into 5 shards
# Full audit completes every 50s (10s × 5 shards)
# With 56k watches: 56k / 5 = ~11k watches per cycle (~60ms vs 316ms for all)
# Handles dynamic watch count - recalculates shard boundaries each cycle
DATASTORE_AUDIT_INTERVAL_SECONDS = int(os.getenv('DATASTORE_AUDIT_INTERVAL_SECONDS', '10'))
DATASTORE_AUDIT_SHARDS = int(os.getenv('DATASTORE_AUDIT_SHARDS', '5'))
# ============================================================================
# Helper Functions for Atomic File Operations
# ============================================================================
def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10):
"""
Save JSON data to disk using atomic write pattern.
Generic helper for saving any JSON data (settings, watches, etc.) with:
- Atomic write (temp file + rename)
- Directory fsync for crash consistency (only for new files)
- Size validation
- Proper error handling
Args:
file_path: Full path to target JSON file
data_dict: Dictionary to serialize
label: Human-readable label for error messages (e.g., "watch", "settings")
max_size_mb: Maximum allowed file size in MB
Raises:
ValueError: If serialized data exceeds max_size_mb
OSError: If disk is full (ENOSPC) or other I/O error
"""
# Check if file already exists (before we start writing)
# Directory fsync only needed for NEW files to persist the filename
file_exists = os.path.exists(file_path)
# Ensure parent directory exists
parent_dir = os.path.dirname(file_path)
os.makedirs(parent_dir, exist_ok=True)
# Create temp file in same directory (required for NFS atomicity)
fd, temp_path = tempfile.mkstemp(
suffix='.tmp',
prefix='json-',
dir=parent_dir,
text=False
)
fd_closed = False
try:
# Serialize data
t0 = time.time()
if HAS_ORJSON:
data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2)
else:
data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8')
serialize_ms = (time.time() - t0) * 1000
# Safety check: validate size
MAX_SIZE = max_size_mb * 1024 * 1024
data_size = len(data)
if data_size > MAX_SIZE:
raise ValueError(
f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB "
f"(max: {max_size_mb}MB). This indicates a bug or data corruption."
)
# Write to temp file
t1 = time.time()
os.write(fd, data)
write_ms = (time.time() - t1) * 1000
# Optional fsync: Force file data to disk for crash safety
# Only if FORCE_FSYNC_DATA_IS_CRITICAL=True (default: False, matches legacy behavior)
t2 = time.time()
if FORCE_FSYNC_DATA_IS_CRITICAL:
os.fsync(fd)
file_fsync_ms = (time.time() - t2) * 1000
os.close(fd)
fd_closed = True
# Atomic rename
t3 = time.time()
os.replace(temp_path, file_path)
rename_ms = (time.time() - t3) * 1000
# Sync directory to ensure filename metadata is durable
# OPTIMIZATION: Only needed for NEW files. Existing files already have
# directory entry persisted, so we only need file fsync for data durability.
dir_fsync_ms = 0
if not file_exists:
try:
dir_fd = os.open(parent_dir, os.O_RDONLY)
try:
t4 = time.time()
os.fsync(dir_fd)
dir_fsync_ms = (time.time() - t4) * 1000
finally:
os.close(dir_fd)
except (OSError, AttributeError):
# Windows doesn't support fsync on directories
pass
# Log timing breakdown for slow saves
# total_ms = serialize_ms + write_ms + file_fsync_ms + rename_ms + dir_fsync_ms
# if total_ms: # Log if save took more than 10ms
# file_status = "new" if not file_exists else "update"
# logger.trace(
# f"Save timing breakdown ({total_ms:.1f}ms total, {file_status}): "
# f"serialize={serialize_ms:.1f}ms, write={write_ms:.1f}ms, "
# f"file_fsync={file_fsync_ms:.1f}ms, rename={rename_ms:.1f}ms, "
# f"dir_fsync={dir_fsync_ms:.1f}ms, using_orjson={HAS_ORJSON}"
# )
except OSError as e:
# Cleanup temp file
if not fd_closed:
try:
os.close(fd)
except:
pass
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
# Provide helpful error messages
if e.errno == 28: # ENOSPC
raise OSError(f"Disk full: Cannot save {label}") from e
elif e.errno == 122: # EDQUOT
raise OSError(f"Disk quota exceeded: Cannot save {label}") from e
else:
raise OSError(f"I/O error saving {label}: {e}") from e
except Exception as e:
# Cleanup temp file
if not fd_closed:
try:
os.close(fd)
except:
pass
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
raise e
def save_watch_atomic(watch_dir, uuid, watch_dict):
"""
Save a watch to disk using atomic write pattern.
Convenience wrapper around save_json_atomic for watches.
Args:
watch_dir: Directory for this watch (e.g., /datastore/{uuid})
uuid: Watch UUID (for logging)
watch_dict: Dictionary representation of the watch
Raises:
ValueError: If serialized data exceeds 10MB (indicates bug or corruption)
OSError: If disk is full (ENOSPC) or other I/O error
"""
watch_json = os.path.join(watch_dir, "watch.json")
save_json_atomic(watch_json, watch_dict, label=f"watch {uuid}", max_size_mb=10)
def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
"""
Load a watch from its JSON file.
Args:
watch_json: Path to the watch.json file
uuid: Watch UUID
rehydrate_entity_func: Function to convert dict to Watch object
Returns:
Tuple of (Watch object, raw_data_dict) or (None, None) if failed
The raw_data_dict is needed to compute the hash before rehydration
"""
try:
# Check file size before reading
file_size = os.path.getsize(watch_json)
MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB
if file_size > MAX_WATCH_SIZE:
logger.critical(
f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: "
f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). "
f"File: {watch_json}. This indicates a bug or data corruption. "
f"Watch will be skipped."
)
return None, None
if HAS_ORJSON:
with open(watch_json, 'rb') as f:
watch_data = orjson.loads(f.read())
else:
with open(watch_json, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
if watch_data.get('time_schedule_limit'):
del watch_data['time_schedule_limit']
if watch_data.get('time_between_check'):
del watch_data['time_between_check']
# Return both the raw data and the rehydrated watch
# Raw data is needed to compute hash before rehydration changes anything
watch_obj = rehydrate_entity_func(uuid, watch_data)
return watch_obj, watch_data
except json.JSONDecodeError as e:
logger.critical(
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
except ValueError as e:
# orjson raises ValueError for invalid JSON
if "invalid json" in str(e).lower() or HAS_ORJSON:
logger.critical(
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
# Re-raise if it's not a JSON parsing error
raise
except FileNotFoundError:
logger.error(f"Watch file not found: {watch_json} for watch {uuid}")
return None, None
except Exception as e:
logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}")
return None, None
def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
"""
Load all watches from individual watch.json files.
SYNCHRONOUS loading: Blocks until all watches are loaded.
This ensures data consistency - web server won't accept requests
until all watches are available. Progress logged every 100 watches.
Args:
datastore_path: Path to the datastore directory
rehydrate_entity_func: Function to convert dict to Watch object
compute_hash_func: Function to compute hash from raw watch dict
Returns:
Tuple of (watching_dict, hashes_dict)
- watching_dict: uuid -> Watch object
- hashes_dict: uuid -> hash string (computed from raw data)
"""
start_time = time.time()
logger.info("Loading watches from individual watch.json files...")
watching = {}
watch_hashes = {}
if not os.path.exists(datastore_path):
return watching, watch_hashes
# Find all watch.json files using glob (faster than manual directory traversal)
glob_start = time.time()
watch_files = glob.glob(os.path.join(datastore_path, "*", "watch.json"))
glob_time = time.time() - glob_start
total = len(watch_files)
logger.debug(f"Found {total} watch.json files in {glob_time:.3f}s")
loaded = 0
failed = 0
for watch_json in watch_files:
# Extract UUID from path: /datastore/{uuid}/watch.json
uuid_dir = os.path.basename(os.path.dirname(watch_json))
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch and raw_data:
watching[uuid_dir] = watch
# Compute hash from raw data BEFORE rehydration to match saved hash
watch_hashes[uuid_dir] = compute_hash_func(raw_data)
loaded += 1
if loaded % 100 == 0:
logger.info(f"Loaded {loaded}/{total} watches...")
else:
# load_watch_from_file already logged the specific error
failed += 1
elapsed = time.time() - start_time
if failed > 0:
logger.critical(
f"LOAD COMPLETE: {loaded} watches loaded successfully, "
f"{failed} watches FAILED to load (corrupted or invalid) "
f"in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)"
)
else:
logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)")
return watching, watch_hashes
# ============================================================================
# FileSavingDataStore Class
# ============================================================================
class FileSavingDataStore(DataStore):
"""
Abstract datastore that provides file persistence with change tracking.
Features:
- Individual watch.json files (one per watch)
- Dirty tracking: Only saves items that have changed
- Hash-based change detection: Prevents unnecessary writes
- Background save thread: Non-blocking persistence
- Two-tier urgency: Standard (60s) and urgent (immediate) saves
Subclasses must implement:
- rehydrate_entity(): Convert dict to Watch object
- Access to internal __data structure for watch management
"""
needs_write = False
needs_write_urgent = False
stop_thread = False
# Change tracking
_dirty_watches = set() # Watch UUIDs that need saving
_dirty_settings = False # Settings changed
_watch_hashes = {} # UUID -> SHA256 hash for change detection
# Health monitoring
_last_save_time = 0 # Timestamp of last successful save
_last_audit_time = 0 # Timestamp of last audit scan
_save_cycle_count = 0 # Number of save cycles completed
_total_saves = 0 # Total watches saved (lifetime)
_save_errors = 0 # Total save errors (lifetime)
_audit_count = 0 # Number of audit scans completed
_audit_found_changes = 0 # Total unmarked changes found by audits
_audit_shard_index = 0 # Current shard being audited (rolling audit)
def __init__(self):
super().__init__()
self.save_data_thread = None
self._last_save_time = time.time()
self._last_audit_time = time.time()
def mark_watch_dirty(self, uuid):
"""
Mark a watch as needing save.
Args:
uuid: Watch UUID
"""
with self.lock:
self._dirty_watches.add(uuid)
dirty_count = len(self._dirty_watches)
# Backpressure detection - warn if dirty set grows too large
if dirty_count > 1000:
logger.critical(
f"BACKPRESSURE WARNING: {dirty_count} watches pending save! "
f"Save thread may not be keeping up with write rate. "
f"This could indicate disk I/O bottleneck or save thread failure."
)
elif dirty_count > 500:
logger.warning(
f"Dirty watch count high: {dirty_count} watches pending save. "
f"Monitoring for potential backpressure."
)
self.needs_write = True
def mark_settings_dirty(self):
"""Mark settings as needing save."""
with self.lock:
self._dirty_settings = True
self.needs_write = True
def _compute_hash(self, watch_dict):
"""
Compute SHA256 hash of watch for change detection.
Args:
watch_dict: Dictionary representation of watch
Returns:
Hex string of SHA256 hash
"""
# Use orjson for deterministic serialization if available
if HAS_ORJSON:
json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS)
else:
json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
return hashlib.sha256(json_bytes).hexdigest()
def save_watch(self, uuid, force=False, watch_dict=None, current_hash=None):
"""
Save a single watch if it has changed (polymorphic method).
Args:
uuid: Watch UUID
force: If True, skip hash check and save anyway
watch_dict: Pre-computed watch dictionary (optimization)
current_hash: Pre-computed hash (optimization)
Returns:
True if saved, False if skipped (unchanged)
"""
if not self._watch_exists(uuid):
logger.warning(f"Cannot save watch {uuid} - does not exist")
return False
# Get watch dict if not provided
if watch_dict is None:
watch_dict = self._get_watch_dict(uuid)
# Compute hash if not provided
if current_hash is None:
current_hash = self._compute_hash(watch_dict)
# Skip save if unchanged (unless forced)
if not force and current_hash == self._watch_hashes.get(uuid):
return False
try:
self._save_watch(uuid, watch_dict)
self._watch_hashes[uuid] = current_hash
logger.debug(f"Saved watch {uuid}")
return True
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
raise
def _save_watch(self, uuid, watch_dict):
"""
Save a single watch to storage (polymorphic).
Backend-specific implementation. Subclasses override for different storage:
- File backend: Writes to {uuid}/watch.json
- Redis backend: SET watch:{uuid}
- SQL backend: UPDATE watches WHERE uuid=?
Args:
uuid: Watch UUID
watch_dict: Dictionary representation of watch
"""
# Default file implementation
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
def _save_settings(self):
"""
Save settings to storage (polymorphic).
Subclasses must implement for their backend.
- File: changedetection.json
- Redis: SET settings
- SQL: UPDATE settings table
"""
raise NotImplementedError("Subclass must implement _save_settings")
def _load_watches(self):
"""
Load all watches from storage (polymorphic).
Subclasses must implement for their backend.
- File: Read individual watch.json files
- Redis: SCAN watch:* keys
- SQL: SELECT * FROM watches
"""
raise NotImplementedError("Subclass must implement _load_watches")
def _delete_watch(self, uuid):
"""
Delete a watch from storage (polymorphic).
Subclasses must implement for their backend.
- File: Delete {uuid}/ directory recursively
- Redis: DEL watch:{uuid}
- SQL: DELETE FROM watches WHERE uuid=?
Args:
uuid: Watch UUID to delete
"""
raise NotImplementedError("Subclass must implement _delete_watch")
def _save_dirty_items(self):
"""
Save dirty watches and settings.
This is the core optimization: instead of saving the entire datastore,
we only save watches that were marked dirty and settings if changed.
"""
start_time = time.time()
# Capture dirty sets under lock
with self.lock:
dirty_watches = list(self._dirty_watches)
dirty_settings = self._dirty_settings
self._dirty_watches.clear()
self._dirty_settings = False
if not dirty_watches and not dirty_settings:
return
logger.trace(f"Saving {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}")
# Save each dirty watch using the polymorphic save method
saved_count = 0
error_count = 0
skipped_unchanged = 0
# Process in batches of 50, using thread pool for parallel saves
BATCH_SIZE = 50
MAX_WORKERS = 20 # Number of parallel save threads
def save_single_watch(uuid):
"""Helper function for thread pool execution."""
try:
# Check if watch still exists (might have been deleted)
if not self._watch_exists(uuid):
# Watch was deleted, remove hash
self._watch_hashes.pop(uuid, None)
return {'status': 'deleted', 'uuid': uuid}
# Pre-check hash to avoid unnecessary save_watch() calls
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
if current_hash == self._watch_hashes.get(uuid):
# Watch hasn't actually changed, skip
return {'status': 'unchanged', 'uuid': uuid}
# Pass pre-computed values to avoid redundant serialization/hashing
if self.save_watch(uuid, force=True, watch_dict=watch_dict, current_hash=current_hash):
return {'status': 'saved', 'uuid': uuid}
else:
return {'status': 'skipped', 'uuid': uuid}
except Exception as e:
logger.error(f"Error saving watch {uuid}: {e}")
return {'status': 'error', 'uuid': uuid, 'error': e}
# Process dirty watches in batches
for batch_start in range(0, len(dirty_watches), BATCH_SIZE):
batch = dirty_watches[batch_start:batch_start + BATCH_SIZE]
batch_num = (batch_start // BATCH_SIZE) + 1
total_batches = (len(dirty_watches) + BATCH_SIZE - 1) // BATCH_SIZE
if len(dirty_watches) > BATCH_SIZE:
logger.trace(f"Save batch {batch_num}/{total_batches} ({len(batch)} watches)")
# Use thread pool to save watches in parallel
with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor:
# Submit all save tasks
future_to_uuid = {executor.submit(save_single_watch, uuid): uuid for uuid in batch}
# Collect results as they complete
for future in as_completed(future_to_uuid):
result = future.result()
status = result['status']
if status == 'saved':
saved_count += 1
elif status == 'unchanged':
skipped_unchanged += 1
elif status == 'error':
error_count += 1
# Re-mark for retry
with self.lock:
self._dirty_watches.add(result['uuid'])
# 'deleted' and 'skipped' don't need special handling
# Save settings if changed
if dirty_settings:
try:
self._save_settings()
logger.debug("Saved settings")
except Exception as e:
logger.error(f"Failed to save settings: {e}")
error_count += 1
with self.lock:
self._dirty_settings = True
# Update metrics
elapsed = time.time() - start_time
self._save_cycle_count += 1
self._total_saves += saved_count
self._save_errors += error_count
self._last_save_time = time.time()
# Log performance metrics
if saved_count > 0:
avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds
skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else ""
parallel_msg = f" [parallel: {MAX_WORKERS} workers]" if saved_count > 1 else ""
logger.info(
f"Successfully saved {saved_count} watches in {elapsed:.2f}s "
f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}){parallel_msg}. "
f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)"
)
elif skipped_unchanged > 0:
logger.debug(f"Save cycle: {skipped_unchanged} watches verified unchanged (hash match), nothing saved")
if error_count > 0:
logger.error(f"Save cycle completed with {error_count} errors")
self.needs_write = False
self.needs_write_urgent = False
def _watch_exists(self, uuid):
"""
Check if watch exists. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
bool
"""
raise NotImplementedError("Subclass must implement _watch_exists")
def _get_watch_dict(self, uuid):
"""
Get watch as dictionary. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
Dictionary representation of watch
"""
raise NotImplementedError("Subclass must implement _get_watch_dict")
def _audit_all_watches(self):
"""
Rolling audit: Scans a fraction of watches to detect unmarked changes.
Instead of scanning ALL watches at once, this scans 1/N shards per cycle.
The shard rotates each cycle, completing a full audit every N cycles.
Handles dynamic watch count - recalculates shard boundaries each cycle,
so newly added watches will be audited in subsequent cycles.
Benefits:
- Lower CPU per cycle (56k / 5 = ~11k watches vs all 56k)
- More frequent audits overall (every 50s vs every 10s)
- Spreads load evenly across time
"""
audit_start = time.time()
# Get list of all watch UUIDs (read-only, no lock needed)
try:
all_uuids = list(self.data['watching'].keys())
except (KeyError, AttributeError, RuntimeError):
# Data structure not ready or being modified
return
if not all_uuids:
return
total_watches = len(all_uuids)
# Calculate this cycle's shard boundaries
# Example: 56,278 watches / 5 shards = 11,255 watches per shard
# Shard 0: [0:11255], Shard 1: [11255:22510], etc.
shard_size = (total_watches + DATASTORE_AUDIT_SHARDS - 1) // DATASTORE_AUDIT_SHARDS
start_idx = self._audit_shard_index * shard_size
end_idx = min(start_idx + shard_size, total_watches)
# Handle wrap-around (shouldn't happen normally, but defensive)
if start_idx >= total_watches:
self._audit_shard_index = 0
start_idx = 0
end_idx = min(shard_size, total_watches)
# Audit only this shard's watches
shard_uuids = all_uuids[start_idx:end_idx]
changes_found = 0
errors = 0
for uuid in shard_uuids:
try:
# Get current watch dict and compute hash
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
stored_hash = self._watch_hashes.get(uuid)
# If hash changed and not already marked dirty, mark it
if current_hash != stored_hash:
with self.lock:
if uuid not in self._dirty_watches:
self._dirty_watches.add(uuid)
changes_found += 1
logger.warning(
f"Audit detected unmarked change in watch {uuid[:8]}... "
f"(hash changed but not marked dirty)"
)
self.needs_write = True
except Exception as e:
errors += 1
logger.trace(f"Audit error for watch {uuid[:8]}...: {e}")
audit_elapsed = (time.time() - audit_start) * 1000 # milliseconds
# Advance to next shard (wrap around after last shard)
self._audit_shard_index = (self._audit_shard_index + 1) % DATASTORE_AUDIT_SHARDS
# Update metrics
self._audit_count += 1
self._audit_found_changes += changes_found
self._last_audit_time = time.time()
if changes_found > 0:
logger.warning(
f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS} found {changes_found} "
f"unmarked changes in {len(shard_uuids)}/{total_watches} watches ({audit_elapsed:.1f}ms)"
)
else:
logger.trace(
f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS}: "
f"{len(shard_uuids)}/{total_watches} watches checked, 0 changes ({audit_elapsed:.1f}ms)"
)
def save_datastore(self):
"""
Background thread that periodically saves dirty items and audits watches.
Runs two independent cycles:
1. Save dirty items every DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS (default 10s)
2. Rolling audit: every DATASTORE_AUDIT_INTERVAL_SECONDS (default 10s)
- Scans 1/DATASTORE_AUDIT_SHARDS watches per cycle (default 1/5)
- Full audit completes every 50s (10s × 5 shards)
- Automatically handles new/deleted watches
Uses 0.5s sleep intervals for responsiveness to urgent saves.
"""
while True:
if self.stop_thread:
# Graceful shutdown: flush any remaining dirty items before stopping
if self.needs_write or self._dirty_watches or self._dirty_settings:
logger.warning("Datastore save thread stopping - flushing remaining dirty items...")
try:
self._save_dirty_items()
logger.info("Graceful shutdown complete - all data saved")
except Exception as e:
logger.critical(f"FAILED to save dirty items during shutdown: {e}")
else:
logger.info("Datastore save thread stopping - no dirty items")
return
# Check if it's time to run audit scan (every N seconds)
if time.time() - self._last_audit_time >= DATASTORE_AUDIT_INTERVAL_SECONDS:
try:
self._audit_all_watches()
except Exception as e:
logger.error(f"Error in audit cycle: {e}")
# Save dirty items if needed
if self.needs_write or self.needs_write_urgent:
try:
self._save_dirty_items()
except Exception as e:
logger.error(f"Error in save cycle: {e}")
# Timer with early break for urgent saves
# Each iteration is 0.5 seconds, so iterations = DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2
for i in range(DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2):
time.sleep(0.5)
if self.stop_thread or self.needs_write_urgent:
break
def start_save_thread(self):
"""Start the background save thread."""
if not self.save_data_thread or not self.save_data_thread.is_alive():
self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
self.save_data_thread.start()
logger.info("Datastore save thread started")
def force_save_all(self):
"""
Force immediate synchronous save of all changes to storage.
File backend implementation of the abstract force_save_all() method.
Marks all watches and settings as dirty, then saves immediately.
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
logger.info("Force saving all data to storage...")
# Mark everything as dirty to ensure complete save
for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid)
self.mark_settings_dirty()
# Save immediately (synchronous)
self._save_dirty_items()
logger.success("All data saved to storage")
def get_health_status(self):
"""
Get datastore health status for monitoring.
Returns:
dict with health metrics and status
"""
now = time.time()
time_since_last_save = now - self._last_save_time
with self.lock:
dirty_count = len(self._dirty_watches)
is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive()
# Determine health status
if not is_thread_alive:
status = "CRITICAL"
message = "Save thread is DEAD"
elif time_since_last_save > 300: # 5 minutes
status = "WARNING"
message = f"No save activity for {time_since_last_save:.0f}s"
elif dirty_count > 1000:
status = "WARNING"
message = f"High backpressure: {dirty_count} watches pending"
elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01:
status = "WARNING"
message = f"High error rate: {self._save_errors} errors"
else:
status = "HEALTHY"
message = "Operating normally"
return {
"status": status,
"message": message,
"thread_alive": is_thread_alive,
"dirty_watches": dirty_count,
"dirty_settings": self._dirty_settings,
"last_save_seconds_ago": int(time_since_last_save),
"save_cycles": self._save_cycle_count,
"total_saves": self._total_saves,
"total_errors": self._save_errors,
"error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2)
}

View File

@@ -0,0 +1,66 @@
"""
Legacy format loader for url-watches.json.
Provides functions to detect and load from the legacy monolithic JSON format.
Used during migration (update_26) to transition to individual watch.json files.
"""
import os
import json
from loguru import logger
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
def has_legacy_datastore(datastore_path):
"""
Check if a legacy url-watches.json file exists.
This is used by update_26 to determine if migration is needed.
Args:
datastore_path: Path to datastore directory
Returns:
bool: True if url-watches.json exists
"""
url_watches_json = os.path.join(datastore_path, "url-watches.json")
return os.path.exists(url_watches_json)
def load_legacy_format(json_store_path):
"""
Load datastore from legacy url-watches.json format.
Args:
json_store_path: Full path to url-watches.json file
Returns:
dict: Loaded datastore data with 'watching', 'settings', etc.
None: If file doesn't exist or loading failed
"""
logger.info(f"Loading from legacy format: {json_store_path}")
if not os.path.isfile(json_store_path):
logger.warning(f"Legacy file not found: {json_store_path}")
return None
try:
if HAS_ORJSON:
with open(json_store_path, 'rb') as f:
data = orjson.loads(f.read())
else:
with open(json_store_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Loaded {len(data.get('watching', {}))} watches from legacy format")
return data
except Exception as e:
logger.error(f"Failed to load legacy format: {e}")
return None

View File

@@ -0,0 +1,690 @@
"""
Schema update migrations for the datastore.
This module contains all schema version upgrade methods (update_1 through update_N).
These are mixed into ChangeDetectionStore to keep the main store file focused.
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
"""
import os
import re
import shutil
import tarfile
import time
from loguru import logger
from copy import deepcopy
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
from ..processors.restock_diff import Restock
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from .file_saving_datastore import save_watch_atomic
def create_backup_tarball(datastore_path, update_number):
"""
Create a tarball backup of the entire datastore structure before running an update.
Includes:
- All {uuid}/watch.json files
- changedetection.json (settings, if it exists)
- url-watches.json (legacy format, if it exists)
- Directory structure preserved
Args:
datastore_path: Path to datastore directory
update_number: Update number being applied
Returns:
str: Path to created tarball, or None if backup failed
Restoration:
To restore from a backup:
cd /path/to/datastore
tar -xzf before-update-N-timestamp.tar.gz
This will restore all watch.json files and settings to their pre-update state.
"""
timestamp = int(time.time())
backup_filename = f"before-update-{update_number}-{timestamp}.tar.gz"
backup_path = os.path.join(datastore_path, backup_filename)
try:
logger.info(f"Creating backup tarball: {backup_filename}")
with tarfile.open(backup_path, "w:gz") as tar:
# Backup changedetection.json if it exists (new format)
changedetection_json = os.path.join(datastore_path, "changedetection.json")
if os.path.isfile(changedetection_json):
tar.add(changedetection_json, arcname="changedetection.json")
logger.debug("Added changedetection.json to backup")
# Backup url-watches.json if it exists (legacy format)
url_watches_json = os.path.join(datastore_path, "url-watches.json")
if os.path.isfile(url_watches_json):
tar.add(url_watches_json, arcname="url-watches.json")
logger.debug("Added url-watches.json to backup")
# Backup all watch directories with their watch.json files
# This preserves the UUID directory structure
watch_count = 0
for entry in os.listdir(datastore_path):
entry_path = os.path.join(datastore_path, entry)
# Skip if not a directory
if not os.path.isdir(entry_path):
continue
# Skip hidden directories and backup directories
if entry.startswith('.') or entry.startswith('before-update-'):
continue
# Check if this directory has a watch.json (indicates it's a watch UUID directory)
watch_json = os.path.join(entry_path, "watch.json")
if os.path.isfile(watch_json):
# Add the watch.json file preserving directory structure
tar.add(watch_json, arcname=f"{entry}/watch.json")
watch_count += 1
if watch_count % 100 == 0:
logger.debug(f"Backed up {watch_count} watch.json files...")
logger.success(f"Backup created: {backup_filename} ({watch_count} watches)")
return backup_path
except Exception as e:
logger.error(f"Failed to create backup tarball: {e}")
# Try to clean up partial backup
if os.path.exists(backup_path):
try:
os.unlink(backup_path)
except:
pass
return None
class DatastoreUpdatesMixin:
"""
Mixin class containing all schema update methods.
This class is inherited by ChangeDetectionStore to provide schema migration functionality.
Each update_N method upgrades the schema from version N-1 to version N.
"""
def get_updates_available(self):
"""
Discover all available update methods.
Returns:
list: Sorted list of update version numbers (e.g., [1, 2, 3, ..., 26])
"""
import inspect
updates_available = []
for i, o in inspect.getmembers(self, predicate=inspect.ismethod):
m = re.search(r'update_(\d+)$', i)
if m:
updates_available.append(int(m.group(1)))
updates_available.sort()
return updates_available
def run_updates(self, current_schema_version=None):
"""
Run all pending schema updates sequentially.
Args:
current_schema_version: Optional current schema version. If provided, only run updates
greater than this version. If None, uses the schema version from
the datastore. If no schema version exists in datastore and it appears
to be a fresh install, sets to latest update number (no updates needed).
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
Process:
1. Get list of available updates
2. For each update > current schema version:
- Create backup of datastore
- Run update method
- Update schema version
- Mark settings and watches dirty
3. If any update fails, stop processing
4. Save all changes immediately
"""
updates_available = self.get_updates_available()
# Determine current schema version
if current_schema_version is None:
# Check if schema_version exists in datastore
current_schema_version = self.data['settings']['application'].get('schema_version')
if current_schema_version is None:
# No schema version found - could be a fresh install or very old datastore
# If this is a fresh/new config with no watches, assume it's up-to-date
# and set to latest update number (no updates needed)
if len(self.data['watching']) == 0:
# Get the highest update number from available update methods
latest_update = updates_available[-1] if updates_available else 0
logger.info(f"No schema version found and no watches exist - assuming fresh install, setting schema_version to {latest_update}")
self.data['settings']['application']['schema_version'] = latest_update
self.mark_settings_dirty()
return # No updates needed for fresh install
else:
# Has watches but no schema version - likely old datastore, run all updates
logger.warning("No schema version found but watches exist - running all updates from version 0")
current_schema_version = 0
logger.info(f"Current schema version: {current_schema_version}")
updates_ran = []
for update_n in updates_available:
if update_n > current_schema_version:
logger.critical(f"Applying update_{update_n}")
# Create tarball backup of entire datastore structure
# This includes all watch.json files, settings, and preserves directory structure
backup_path = create_backup_tarball(self.datastore_path, update_n)
if backup_path:
logger.info(f"Backup created at: {backup_path}")
else:
logger.warning("Backup creation failed, but continuing with update")
try:
update_method = getattr(self, f"update_{update_n}")()
except Exception as e:
logger.error(f"Error while trying update_{update_n}")
logger.error(e)
# Don't run any more updates
return
else:
# Bump the version, important
self.data['settings']['application']['schema_version'] = update_n
self.mark_settings_dirty()
# CRITICAL: Mark all watches as dirty so changes are persisted
# Most updates modify watches, and in the new individual watch.json structure,
# we need to ensure those changes are saved
logger.info(f"Marking all {len(self.data['watching'])} watches as dirty after update_{update_n} (so that it saves them to disk)")
for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid)
# Save changes immediately after each update (more resilient than batching)
logger.critical(f"Saving all changes after update_{update_n}")
try:
self._save_dirty_items()
logger.success(f"Update {update_n} changes saved successfully")
except Exception as e:
logger.error(f"Failed to save update_{update_n} changes: {e}")
# Don't raise - update already ran, but changes might not be persisted
# The update will try to run again on next startup
# Track which updates ran
updates_ran.append(update_n)
# ============================================================================
# Individual Update Methods
# ============================================================================
def update_1(self):
"""Convert minutes to seconds on settings and each watch."""
if self.data['settings']['requests'].get('minutes_between_check'):
self.data['settings']['requests']['time_between_check']['minutes'] = self.data['settings']['requests']['minutes_between_check']
# Remove the default 'hours' that is set from the model
self.data['settings']['requests']['time_between_check']['hours'] = None
for uuid, watch in self.data['watching'].items():
if 'minutes_between_check' in watch:
# Only upgrade individual watch time if it was set
if watch.get('minutes_between_check', False):
self.data['watching'][uuid]['time_between_check']['minutes'] = watch['minutes_between_check']
def update_2(self):
"""
Move the history list to a flat text file index.
Better than SQLite because this list is only appended to, and works across NAS / NFS type setups.
"""
# @todo test running this on a newly updated one (when this already ran)
for uuid, watch in self.data['watching'].items():
history = []
if watch.get('history', False):
for d, p in watch['history'].items():
d = int(d) # Used to be keyed as str, we'll fix this now too
history.append("{},{}\n".format(d, p))
if len(history):
target_path = os.path.join(self.datastore_path, uuid)
if os.path.exists(target_path):
with open(os.path.join(target_path, "history.txt"), "w") as f:
f.writelines(history)
else:
logger.warning(f"Datastore history directory {target_path} does not exist, skipping history import.")
# No longer needed, dynamically pulled from the disk when needed.
# But we should set it back to a empty dict so we don't break if this schema runs on an earlier version.
# In the distant future we can remove this entirely
self.data['watching'][uuid]['history'] = {}
def update_3(self):
"""We incorrectly stored last_changed when there was not a change, and then confused the output list table."""
# see https://github.com/dgtlmoon/changedetection.io/pull/835
return
def update_4(self):
"""`last_changed` not needed, we pull that information from the history.txt index."""
for uuid, watch in self.data['watching'].items():
try:
# Remove it from the struct
del(watch['last_changed'])
except:
continue
return
def update_5(self):
"""
If the watch notification body, title look the same as the global one, unset it, so the watch defaults back to using the main settings.
In other words - the watch notification_title and notification_body are not needed if they are the same as the default one.
"""
current_system_body = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
current_system_title = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
for uuid, watch in self.data['watching'].items():
try:
watch_body = watch.get('notification_body', '')
if watch_body and watch_body.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_body:
# Looks the same as the default one, so unset it
watch['notification_body'] = None
watch_title = watch.get('notification_title', '')
if watch_title and watch_title.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_title:
# Looks the same as the default one, so unset it
watch['notification_title'] = None
except Exception as e:
continue
return
def update_7(self):
"""
We incorrectly used common header overrides that should only apply to Requests.
These are now handled in content_fetcher::html_requests and shouldnt be passed to Playwright/Selenium.
"""
# These were hard-coded in early versions
for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']:
if self.data['settings']['headers'].get(v):
del self.data['settings']['headers'][v]
def update_8(self):
"""Convert filters to a list of filters css_filter -> include_filters."""
for uuid, watch in self.data['watching'].items():
try:
existing_filter = watch.get('css_filter', '')
if existing_filter:
watch['include_filters'] = [existing_filter]
except:
continue
return
def update_9(self):
"""Convert old static notification tokens to jinja2 tokens."""
# Each watch
# only { } not {{ or }}
r = r'(?<!{){(?!{)(\w+)(?<!})}(?!})'
for uuid, watch in self.data['watching'].items():
try:
n_body = watch.get('notification_body', '')
if n_body:
watch['notification_body'] = re.sub(r, r'{{\1}}', n_body)
n_title = watch.get('notification_title')
if n_title:
watch['notification_title'] = re.sub(r, r'{{\1}}', n_title)
n_urls = watch.get('notification_urls')
if n_urls:
for i, url in enumerate(n_urls):
watch['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
except:
continue
# System wide
n_body = self.data['settings']['application'].get('notification_body')
if n_body:
self.data['settings']['application']['notification_body'] = re.sub(r, r'{{\1}}', n_body)
n_title = self.data['settings']['application'].get('notification_title')
if n_body:
self.data['settings']['application']['notification_title'] = re.sub(r, r'{{\1}}', n_title)
n_urls = self.data['settings']['application'].get('notification_urls')
if n_urls:
for i, url in enumerate(n_urls):
self.data['settings']['application']['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
return
def update_10(self):
"""Some setups may have missed the correct default, so it shows the wrong config in the UI, although it will default to system-wide."""
for uuid, watch in self.data['watching'].items():
try:
if not watch.get('fetch_backend', ''):
watch['fetch_backend'] = 'system'
except:
continue
return
def update_12(self):
"""Create tag objects and their references from existing tag text."""
i = 0
for uuid, watch in self.data['watching'].items():
# Split out and convert old tag string
tag = watch.get('tag')
if tag:
tag_uuids = []
for t in tag.split(','):
tag_uuids.append(self.add_tag(title=t))
self.data['watching'][uuid]['tags'] = tag_uuids
def update_13(self):
"""#1775 - Update 11 did not update the records correctly when adding 'date_created' values for sorting."""
i = 0
for uuid, watch in self.data['watching'].items():
if not watch.get('date_created'):
self.data['watching'][uuid]['date_created'] = i
i += 1
return
def update_14(self):
"""#1774 - protect xpath1 against migration."""
for awatch in self.data["watching"]:
if self.data["watching"][awatch]['include_filters']:
for num, selector in enumerate(self.data["watching"][awatch]['include_filters']):
if selector.startswith('/'):
self.data["watching"][awatch]['include_filters'][num] = 'xpath1:' + selector
if selector.startswith('xpath:'):
self.data["watching"][awatch]['include_filters'][num] = selector.replace('xpath:', 'xpath1:', 1)
def update_15(self):
"""Use more obvious default time setting."""
for uuid in self.data["watching"]:
if self.data["watching"][uuid]['time_between_check'] == self.data['settings']['requests']['time_between_check']:
# What the old logic was, which was pretty confusing
self.data["watching"][uuid]['time_between_check_use_default'] = True
elif all(value is None or value == 0 for value in self.data["watching"][uuid]['time_between_check'].values()):
self.data["watching"][uuid]['time_between_check_use_default'] = True
else:
# Something custom here
self.data["watching"][uuid]['time_between_check_use_default'] = False
def update_16(self):
"""Correctly set datatype for older installs where 'tag' was string and update_12 did not catch it."""
for uuid, watch in self.data['watching'].items():
if isinstance(watch.get('tags'), str):
self.data['watching'][uuid]['tags'] = []
def update_17(self):
"""Migrate old 'in_stock' values to the new Restock."""
for uuid, watch in self.data['watching'].items():
if 'in_stock' in watch:
watch['restock'] = Restock({'in_stock': watch.get('in_stock')})
del watch['in_stock']
def update_18(self):
"""Migrate old restock settings."""
for uuid, watch in self.data['watching'].items():
if not watch.get('restock_settings'):
# So we enable price following by default
self.data['watching'][uuid]['restock_settings'] = {'follow_price_changes': True}
# Migrate and cleanoff old value
self.data['watching'][uuid]['restock_settings']['in_stock_processing'] = 'in_stock_only' if watch.get(
'in_stock_only') else 'all_changes'
if self.data['watching'][uuid].get('in_stock_only'):
del (self.data['watching'][uuid]['in_stock_only'])
def update_19(self):
"""Compress old elements.json to elements.deflate, saving disk, this compression is pretty fast."""
import zlib
for uuid, watch in self.data['watching'].items():
json_path = os.path.join(self.datastore_path, uuid, "elements.json")
deflate_path = os.path.join(self.datastore_path, uuid, "elements.deflate")
if os.path.exists(json_path):
with open(json_path, "rb") as f_j:
with open(deflate_path, "wb") as f_d:
logger.debug(f"Compressing {str(json_path)} to {str(deflate_path)}..")
f_d.write(zlib.compress(f_j.read()))
os.unlink(json_path)
def update_20(self):
"""Migrate extract_title_as_title to use_page_title_in_list."""
for uuid, watch in self.data['watching'].items():
if self.data['watching'][uuid].get('extract_title_as_title'):
self.data['watching'][uuid]['use_page_title_in_list'] = self.data['watching'][uuid].get('extract_title_as_title')
del self.data['watching'][uuid]['extract_title_as_title']
if self.data['settings']['application'].get('extract_title_as_title'):
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def update_21(self):
"""Migrate timezone to scheduler_timezone_default."""
if self.data['settings']['application'].get('timezone'):
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
del self.data['settings']['application']['timezone']
def update_23(self):
"""Some notification formats got the wrong name type."""
def re_run(formats):
sys_n_format = self.data['settings']['application'].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == sys_n_format), None)
if key_exists_as_value: # key of "Plain text"
logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['notification_format'] = key_exists_as_value
for uuid, watch in self.data['watching'].items():
n_format = self.data['watching'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
for uuid, tag in self.data['settings']['application']['tags'].items():
n_format = self.data['settings']['application']['tags'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(
f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['tags'][uuid][
'notification_format'] = key_exists_as_value # should be 'text' or whatever
from ..notification import valid_notification_formats
formats = deepcopy(valid_notification_formats)
re_run(formats)
# And in previous versions, it was "text" instead of Plain text, Markdown instead of "Markdown to HTML"
formats['text'] = 'Text'
formats['markdown'] = 'Markdown'
re_run(formats)
def update_24(self):
"""RSS types should be inline with the same names as notification types."""
rss_format = self.data['settings']['application'].get('rss_content_format')
if not rss_format or 'text' in rss_format:
# might have been 'plaintext, 'plain text' or something
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
elif 'html' in rss_format:
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
else:
# safe fallback to text
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
def update_25(self):
"""Different processors now hold their own history.txt."""
for uuid, watch in self.data['watching'].items():
processor = self.data['watching'][uuid].get('processor')
if processor != 'text_json_diff':
old_history_txt = os.path.join(self.datastore_path, "history.txt")
target_history_name = f"history-{processor}.txt"
if os.path.isfile(old_history_txt) and not os.path.isfile(target_history_name):
new_history_txt = os.path.join(self.datastore_path, target_history_name)
logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...")
shutil.move(old_history_txt, new_history_txt)
def update_26(self):
"""
Migration: Individual watch persistence (COPY-based, safe rollback).
Loads legacy url-watches.json format and migrates to:
- {uuid}/watch.json (per watch)
- changedetection.json (settings only)
IMPORTANT:
- A tarball backup (before-update-26-timestamp.tar.gz) is created before migration
- url-watches.json is LEFT INTACT for rollback safety
- Users can roll back by simply downgrading to the previous version
- Or restore from tarball: tar -xzf before-update-26-*.tar.gz
This is a dedicated migration release - users upgrade at their own pace.
"""
logger.critical("=" * 80)
logger.critical("Running migration: Individual watch persistence (update_26)")
logger.critical("COPY-based migration: url-watches.json will remain intact for rollback")
logger.critical("=" * 80)
# Check if already migrated
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
if os.path.exists(changedetection_json):
logger.info("Migration already completed (changedetection.json exists), skipping")
return
# Check if we need to load legacy data
from .legacy_loader import has_legacy_datastore, load_legacy_format
if not has_legacy_datastore(self.datastore_path):
logger.info("No legacy datastore found, nothing to migrate")
return
# Load legacy data from url-watches.json
logger.critical("Loading legacy datastore from url-watches.json...")
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
legacy_data = load_legacy_format(legacy_path)
if not legacy_data:
raise Exception("Failed to load legacy datastore from url-watches.json")
# Populate settings from legacy data
logger.info("Populating settings from legacy data...")
if 'settings' in legacy_data:
self.data['settings'] = legacy_data['settings']
if 'app_guid' in legacy_data:
self.data['app_guid'] = legacy_data['app_guid']
if 'build_sha' in legacy_data:
self.data['build_sha'] = legacy_data['build_sha']
if 'version_tag' in legacy_data:
self.data['version_tag'] = legacy_data['version_tag']
# Rehydrate watches from legacy data
logger.info("Rehydrating watches from legacy data...")
self.data['watching'] = {}
for uuid, watch_data in legacy_data.get('watching', {}).items():
try:
self.data['watching'][uuid] = self.rehydrate_entity(uuid, watch_data)
except Exception as e:
logger.error(f"Failed to rehydrate watch {uuid}: {e}")
raise Exception(f"Migration failed: Could not rehydrate watch {uuid}. Error: {e}")
watch_count = len(self.data['watching'])
logger.success(f"Loaded {watch_count} watches from legacy format")
# Phase 1: Save all watches to individual files
logger.critical(f"Phase 1/4: Saving {watch_count} watches to individual watch.json files...")
saved_count = 0
for uuid, watch in self.data['watching'].items():
try:
watch_dict = dict(watch)
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
# Initialize hash
self._watch_hashes[uuid] = self._compute_hash(watch_dict)
saved_count += 1
if saved_count % 100 == 0:
logger.info(f" Progress: {saved_count}/{watch_count} watches saved...")
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
raise Exception(
f"Migration failed: Could not save watch {uuid}. "
f"url-watches.json remains intact, safe to retry. Error: {e}"
)
logger.critical(f"Phase 1 complete: Saved {saved_count} watches")
# Phase 2: Verify all files exist
logger.critical("Phase 2/4: Verifying all watch.json files were created...")
missing = []
for uuid in self.data['watching'].keys():
watch_json = os.path.join(self.datastore_path, uuid, "watch.json")
if not os.path.isfile(watch_json):
missing.append(uuid)
if missing:
raise Exception(
f"Migration failed: {len(missing)} watch files missing: {missing[:5]}... "
f"url-watches.json remains intact, safe to retry."
)
logger.critical(f"Phase 2 complete: Verified {watch_count} watch files")
# Phase 3: Create new settings file
logger.critical("Phase 3/4: Creating changedetection.json...")
try:
self._save_settings()
except Exception as e:
logger.error(f"Failed to create changedetection.json: {e}")
raise Exception(
f"Migration failed: Could not create changedetection.json. "
f"url-watches.json remains intact, safe to retry. Error: {e}"
)
# Phase 4: Verify settings file exists
logger.critical("Phase 4/4: Verifying changedetection.json exists...")
if not os.path.isfile(changedetection_json):
raise Exception(
"Migration failed: changedetection.json not found after save. "
"url-watches.json remains intact, safe to retry."
)
logger.critical("Phase 4 complete: Verified changedetection.json exists")
# Success! Now reload from new format
logger.critical("Reloading datastore from new format...")
self._load_state()
logger.success("Datastore reloaded from new format successfully")
logger.critical("=" * 80)
logger.critical("MIGRATION COMPLETED SUCCESSFULLY!")
logger.critical("=" * 80)
logger.info("")
logger.info("New format:")
logger.info(f" - {watch_count} individual watch.json files created")
logger.info(f" - changedetection.json created (settings only)")
logger.info("")
logger.info("Rollback safety:")
logger.info(" - url-watches.json preserved for rollback")
logger.info(" - To rollback: downgrade to previous version and restart")
logger.info(" - No manual file operations needed")
logger.info("")
logger.info("Optional cleanup (after testing new version):")
logger.info(f" - rm {os.path.join(self.datastore_path, 'url-watches.json')}")
logger.info("")
# Schema version will be updated by run_updates()

View File

@@ -1,243 +0,0 @@
#!/bin/bash
# Test script for CLI options - Parallel execution
# Tests -u, -uN, -r, -b flags
set -u # Exit on undefined variables
# Color output
RED='\033[0;31m'
GREEN='\033[0;32m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Test results directory (for parallel safety)
TEST_RESULTS_DIR="/tmp/cli-test-results-$$"
mkdir -p "$TEST_RESULTS_DIR"
# Cleanup function
cleanup() {
echo ""
echo "=== Cleaning up test directories ==="
rm -rf /tmp/cli-test-* 2>/dev/null || true
rm -rf "$TEST_RESULTS_DIR" 2>/dev/null || true
# Kill any hanging processes
pkill -f "changedetection.py.*cli-test" 2>/dev/null || true
}
trap cleanup EXIT
# Helper to record test result
record_result() {
local test_num=$1
local status=$2 # pass or fail
local message=$3
echo "$status|$message" > "$TEST_RESULTS_DIR/test_${test_num}.result"
}
# Run a test in background
run_test() {
local test_num=$1
local test_name=$2
local test_func=$3
(
echo -e "${YELLOW}[Test $test_num]${NC} $test_name"
if $test_func "$test_num"; then
record_result "$test_num" "pass" "$test_name"
echo -e "${GREEN}✓ PASS${NC}: $test_name"
else
record_result "$test_num" "fail" "$test_name"
echo -e "${RED}✗ FAIL${NC}: $test_name"
fi
) &
}
# =============================================================================
# Test Functions (each runs independently)
# =============================================================================
test_help_flag() {
local test_id=$1
timeout 3 python3 changedetection.py --help 2>&1 | grep -q "Add URLs on startup"
}
test_version_flag() {
local test_id=$1
timeout 3 python3 changedetection.py --version 2>&1 | grep -qE "changedetection.io [0-9]+\.[0-9]+"
}
test_single_url() {
local test_id=$1
local dir="/tmp/cli-test-single-${test_id}-$$"
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ]
}
test_multiple_urls() {
local test_id=$1
local dir="/tmp/cli-test-multi-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u https://github.com \
-u https://httpbin.org \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ]
}
test_url_with_options() {
local test_id=$1
local dir="/tmp/cli-test-opts-${test_id}-$$"
timeout 10 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 '{"title":"Test Site","processor":"text_json_diff"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)"
}
test_multiple_urls_with_options() {
local test_id=$1
local dir="/tmp/cli-test-multi-opts-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 '{"title":"Site One"}' \
-u https://github.com \
-u1 '{"title":"Site Two"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)"
}
test_batch_mode_exit() {
local test_id=$1
local dir="/tmp/cli-test-batch-${test_id}-$$"
local start=$(date +%s)
timeout 15 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-b &>/dev/null
local end=$(date +%s)
local elapsed=$((end - start))
[ $elapsed -lt 14 ]
}
test_recheck_all() {
local test_id=$1
local dir="/tmp/cli-test-recheck-all-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all"
}
test_recheck_specific() {
local test_id=$1
local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches"
}
test_combined_operations() {
local test_id=$1
local dir="/tmp/cli-test-combined-${test_id}-$$"
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u https://github.com \
-r all \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ]
}
test_invalid_json() {
local test_id=$1
local dir="/tmp/cli-test-invalid-${test_id}-$$"
timeout 5 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u0 'invalid json here' \
2>&1 | grep -qi "invalid json\|json decode error"
}
test_create_directory() {
local test_id=$1
local dir="/tmp/cli-test-create-${test_id}-$$/nested/path"
timeout 10 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-b &>/dev/null
[ -d "$dir" ]
}
# =============================================================================
# Main Test Execution
# =============================================================================
echo "=========================================="
echo " CLI Options Test Suite (Parallel)"
echo "=========================================="
echo ""
# Launch all tests in parallel
run_test 1 "Help flag (--help) shows usage without initialization" test_help_flag
run_test 2 "Version flag (--version) displays version" test_version_flag
run_test 3 "Add single URL with -u flag" test_single_url
run_test 4 "Add multiple URLs with multiple -u flags" test_multiple_urls
run_test 5 "Add URL with JSON options using -u0" test_url_with_options
run_test 6 "Add multiple URLs with different options (-u0, -u1)" test_multiple_urls_with_options
run_test 7 "Batch mode (-b) exits automatically after processing" test_batch_mode_exit
run_test 8 "Recheck all watches with -r all" test_recheck_all
run_test 9 "Recheck specific watches with -r UUID" test_recheck_specific
run_test 10 "Combined: Add URLs and recheck all with -u and -r all" test_combined_operations
run_test 11 "Invalid JSON in -u0 option should show error" test_invalid_json
run_test 12 "Create datastore directory with -C flag" test_create_directory
# Wait for all tests to complete
echo ""
echo "Waiting for all tests to complete..."
wait
# Collect results
echo ""
echo "=========================================="
echo " Test Summary"
echo "=========================================="
TESTS_RUN=0
TESTS_PASSED=0
TESTS_FAILED=0
for result_file in "$TEST_RESULTS_DIR"/test_*.result; do
if [ -f "$result_file" ]; then
TESTS_RUN=$((TESTS_RUN + 1))
status=$(cut -d'|' -f1 < "$result_file")
if [ "$status" = "pass" ]; then
TESTS_PASSED=$((TESTS_PASSED + 1))
else
TESTS_FAILED=$((TESTS_FAILED + 1))
fi
fi
done
echo "Tests run: $TESTS_RUN"
echo -e "${GREEN}Tests passed: $TESTS_PASSED${NC}"
if [ $TESTS_FAILED -gt 0 ]; then
echo -e "${RED}Tests failed: $TESTS_FAILED${NC}"
else
echo -e "${GREEN}Tests failed: $TESTS_FAILED${NC}"
fi
echo "=========================================="
echo ""
# Exit with appropriate code
if [ $TESTS_FAILED -gt 0 ]; then
echo -e "${RED}Some tests failed!${NC}"
exit 1
else
echo -e "${GREEN}All tests passed!${NC}"
exit 0
fi

View File

@@ -53,11 +53,21 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path):
backup = ZipFile(io.BytesIO(res.data))
l = backup.namelist()
uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
newlist = list(filter(uuid4hex.match, l)) # Read Note below
# Check for UUID-based txt files (history and snapshot)
uuid4hex_txt = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
txt_files = list(filter(uuid4hex_txt.match, l))
# Should be two txt files in the archive (history and the snapshot)
assert len(newlist) == 2
assert len(txt_files) == 2
# Check for watch.json files (new format)
uuid4hex_json = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}/watch\.json$', re.I)
json_files = list(filter(uuid4hex_json.match, l))
# Should be one watch.json file in the archive (the imported watch)
assert len(json_files) == 1, f"Expected 1 watch.json file, found {len(json_files)}: {json_files}"
# Check for changedetection.json (settings file)
assert 'changedetection.json' in l, "changedetection.json should be in backup"
# Get the latest one
res = client.get(

View File

@@ -59,11 +59,29 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
# Wait for the sync DB save to happen
time.sleep(2)
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
# Check which format is being used
datastore_path = live_server.app.config['DATASTORE'].datastore_path
changedetection_json = os.path.join(datastore_path, 'changedetection.json')
url_watches_json = os.path.join(datastore_path, 'url-watches.json')
json_obj = None
with open(json_db_file, 'r', encoding='utf-8') as f:
json_obj = json.load(f)
json_obj = {'watching': {}}
if os.path.exists(changedetection_json):
# New format: individual watch.json files
logger.info("Testing with new format (changedetection.json + individual watch.json)")
# Load each watch.json file
for uuid in live_server.app.config['DATASTORE'].data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.isfile(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
json_obj['watching'][uuid] = json.load(f)
else:
# Legacy format: url-watches.json
logger.info("Testing with legacy format (url-watches.json)")
with open(url_watches_json, 'r', encoding='utf-8') as f:
json_obj = json.load(f)
# assert the right amount of watches was found in the JSON
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
@@ -88,7 +106,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
# Find the snapshot one
for fname in files_in_watch_dir:
if fname != 'history.txt' and 'html' not in fname:
if fname != 'history.txt' and fname != 'watch.json' and 'html' not in fname:
if strtobool(os.getenv("TEST_WITH_BROTLI")):
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
@@ -105,11 +123,23 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
assert json_obj['watching'][w]['title'], "Watch should have a title set"
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
# With new format, we also have watch.json, so 4 files total
if os.path.exists(changedetection_json):
assert len(files_in_watch_dir) == 4, "Should be four files in the dir with new format: watch.json, html.br snapshot, history.txt and the extracted text snapshot"
else:
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir with legacy format: html.br snapshot, history.txt and the extracted text snapshot"
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
with open(json_db_file, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
# Check that 'default' Watch vars aren't accidentally being saved
if os.path.exists(changedetection_json):
# New format: check all individual watch.json files
for uuid in json_obj['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
with open(watch_json_file, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), f"'default' probably shouldnt be here in {watch_json_file}, it came from when the 'default' Watch vars were accidently being saved"
else:
# Legacy format: check url-watches.json
with open(url_watches_json, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -142,10 +142,14 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watches_with_body = 0
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['body']==body_value:
# Read individual watch.json files
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
if watch_data.get('body') == body_value:
watches_with_body += 1
# Should be only one with body set
@@ -225,10 +229,14 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_
wait_for_all_checks(client)
watches_with_method = 0
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['method'] == 'PATCH':
# Read individual watch.json files
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
if watch_data.get('method') == 'PATCH':
watches_with_method += 1
# Should be only one with method set to PATCH

View File

@@ -2857,7 +2857,7 @@ msgstr "Echtzeit-Updates offline"
#: changedetectionio/templates/base.html
msgid "Select Language"
msgstr "Wählen Sie eine Sprache aus"
msgstr "Wählen Sie Sprache aus"
#: changedetectionio/templates/base.html
msgid "Auto-detect from browser"

View File

@@ -42,7 +42,7 @@ orjson~=3.11
# jq not available on Windows so must be installed manually
# Notification library
apprise==1.9.7
apprise==1.9.6
diff_match_patch