Compare commits

...

8 Commits

Author SHA1 Message Date
dgtlmoon
746990391a test fix
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Waiting to run
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2026-01-19 18:58:39 +01:00
dgtlmoon
1661f7b85b tweak 2026-01-19 18:55:16 +01:00
dgtlmoon
1f37b358b1 Merge branch 'master' into datastore-refactor 2026-01-19 18:52:06 +01:00
dgtlmoon
151e603af7 API - Validation improvements (#3782)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-19 18:16:25 +01:00
dgtlmoon
7311af4b58 i18n - zh traditional chinese autodetect from browser fix 2026-01-19 16:28:25 +01:00
dgtlmoon
af193e8d7a UI - Fixes for search dialog #3778 (#3781) 2026-01-19 16:18:23 +01:00
dgtlmoon
639c53f0f8 left out pypi file 2026-01-19 13:11:57 +01:00
dgtlmoon
48e8295433 Big refactor to save watches as their own datafile with some agnostic data store backend 2026-01-19 12:59:10 +01:00
22 changed files with 3929 additions and 1193 deletions

View File

@@ -61,8 +61,8 @@ jobs:
# --- API test ---
# This also means that the docs/api-spec.yml was shipped and could be read
test -f /tmp/url-watches.json
API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/url-watches.json)
test -f /tmp/changedetection.json
API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/changedetection.json)
echo Test API KEY is $API_KEY
curl -X POST "http://127.0.0.1:10000/api/v1/watch" \
-H "x-api-key: ${API_KEY}" \

View File

@@ -9,6 +9,7 @@ recursive-include changedetectionio/notification *
recursive-include changedetectionio/processors *
recursive-include changedetectionio/realtime *
recursive-include changedetectionio/static *
recursive-include changedetectionio/store *
recursive-include changedetectionio/templates *
recursive-include changedetectionio/tests *
recursive-include changedetectionio/translations *

View File

@@ -112,12 +112,12 @@ def sigshutdown_handler(_signo, _stack_frame):
except Exception as e:
logger.error(f"Error shutting down Socket.IO server: {str(e)}")
# Save data quickly
# Save data quickly - force immediate save using abstract method
try:
datastore.sync_to_json()
logger.success('Fast sync to disk complete.')
datastore.force_save_all()
logger.success('Fast sync to storage complete.')
except Exception as e:
logger.error(f"Error syncing to disk: {str(e)}")
logger.error(f"Error syncing to storage: {str(e)}")
sys.exit()

View File

@@ -68,13 +68,17 @@ class Watch(Resource):
import time
from copy import deepcopy
watch = None
for _ in range(20):
# Retry up to 20 times if dict is being modified
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
for attempt in range(20):
try:
watch = deepcopy(self.datastore.data['watching'].get(uuid))
break
except RuntimeError:
# Incase dict changed, try again
time.sleep(0.01)
# Dict changed during deepcopy, retry after yielding to scheduler
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
if attempt < 19: # Don't yield on last attempt
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
if not watch:
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
@@ -126,17 +130,31 @@ class Watch(Resource):
if request.json.get('proxy'):
plist = self.datastore.proxy_list
if not request.json.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
if not plist or request.json.get('proxy') not in plist:
proxy_list_str = ', '.join(plist) if plist else 'none configured'
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(request.json)
if validation_error:
return validation_error, 400
# XSS etc protection
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
return "Invalid URL", 400
# XSS etc protection - validate URL if it's being updated
if 'url' in request.json:
new_url = request.json.get('url')
# URL must be a non-empty string
if new_url is None:
return "URL cannot be null", 400
if not isinstance(new_url, str):
return "URL must be a string", 400
if not new_url.strip():
return "URL cannot be empty or whitespace only", 400
if not is_safe_valid_url(new_url.strip()):
return "Invalid or unsupported URL format. URL must use http://, https://, or ftp:// protocol", 400
# Handle processor-config-* fields separately (save to JSON, not datastore)
from changedetectionio import processors
@@ -232,6 +250,10 @@ class WatchSingleHistory(Resource):
if timestamp == 'latest':
timestamp = list(watch.history.keys())[-1]
# Validate that the timestamp exists in history
if timestamp not in watch.history:
abort(404, message=f"No history snapshot found for timestamp '{timestamp}'")
if request.args.get('html'):
content = watch.get_fetched_html(timestamp)
if content:
@@ -419,8 +441,9 @@ class CreateWatch(Resource):
if json_data.get('proxy'):
plist = self.datastore.proxy_list
if not json_data.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
if not plist or json_data.get('proxy') not in plist:
proxy_list_str = ', '.join(plist) if plist else 'none configured'
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(json_data)

View File

@@ -27,11 +27,23 @@ def create_backup(datastore_path, watches: dict):
compression=zipfile.ZIP_DEFLATED,
compresslevel=8) as zipObj:
# Add the index
zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json")
# Add the settings file (supports both formats)
# New format: changedetection.json
changedetection_json = os.path.join(datastore_path, "changedetection.json")
if os.path.isfile(changedetection_json):
zipObj.write(changedetection_json, arcname="changedetection.json")
logger.debug("Added changedetection.json to backup")
# Add the flask app secret
zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt")
# Legacy format: url-watches.json (for backward compatibility)
url_watches_json = os.path.join(datastore_path, "url-watches.json")
if os.path.isfile(url_watches_json):
zipObj.write(url_watches_json, arcname="url-watches.json")
logger.debug("Added url-watches.json to backup")
# Add the flask app secret (if it exists)
secret_file = os.path.join(datastore_path, "secret.txt")
if os.path.isfile(secret_file):
zipObj.write(secret_file, arcname="secret.txt")
# Add any data in the watch data directory.
for uuid, w in watches.items():
@@ -90,8 +102,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Maximum number of backups reached, please remove some"), "error")
return redirect(url_for('backups.index'))
# Be sure we're written fresh
datastore.sync_to_json()
# Be sure we're written fresh - force immediate save using abstract method
datastore.force_save_all()
zip_thread = threading.Thread(
target=create_backup,
args=(datastore.datastore_path, datastore.data.get("watching")),

View File

@@ -62,7 +62,7 @@ class import_url_list(Importer):
extras = None
if processor:
extras = {'processor': processor}
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, write_to_disk_now=False, extras=extras)
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, save_immediately=False, extras=extras)
if new_uuid:
# Straight into the queue.
@@ -129,7 +129,7 @@ class import_distill_io_json(Importer):
new_uuid = datastore.add_watch(url=d['uri'].strip(),
tag=",".join(d.get('tags', [])),
extras=extras,
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
@@ -204,7 +204,7 @@ class import_xlsx_wachete(Importer):
new_uuid = datastore.add_watch(url=data['url'].strip(),
extras=extras,
tag=data.get('folder'),
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
self.new_uuids.append(new_uuid)
@@ -287,7 +287,7 @@ class import_xlsx_custom(Importer):
new_uuid = datastore.add_watch(url=url,
extras=extras,
tag=tags,
write_to_disk_now=False)
save_immediately=False)
if new_uuid:
# Straight into the queue.
self.new_uuids.append(new_uuid)

View File

@@ -400,11 +400,27 @@ def changedetection_app(config=None, datastore_o=None):
language_codes = get_language_codes()
def get_locale():
# Locale aliases: map browser language codes to translation directory names
# This handles cases where browsers send standard codes (e.g., zh-TW)
# but our translations use more specific codes (e.g., zh_Hant_TW)
locale_aliases = {
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
}
# 1. Try to get locale from session (user explicitly selected)
if 'locale' in session:
return session['locale']
# 2. Fall back to Accept-Language header
return request.accept_languages.best_match(language_codes)
# Get the best match from browser's Accept-Language header
browser_locale = request.accept_languages.best_match(language_codes + list(locale_aliases.keys()))
# 3. Check if we need to map the browser locale to our internal locale
if browser_locale in locale_aliases:
return locale_aliases[browser_locale]
return browser_locale
# Initialize Babel with locale selector
babel = Babel(app, locale_selector=get_locale)

View File

@@ -69,6 +69,19 @@
}
});
// Handle Enter key in search input
if (searchInput) {
searchInput.addEventListener('keydown', function(e) {
if (e.key === 'Enter') {
e.preventDefault();
if (searchForm) {
// Trigger form submission programmatically
searchForm.dispatchEvent(new Event('submit'));
}
}
});
}
// Handle form submission
if (searchForm) {
searchForm.addEventListener('submit', function(e) {
@@ -88,8 +101,8 @@
params.append('tags', tags);
}
// Navigate to search results
window.location.href = '?' + params.toString();
// Navigate to search results (always redirect to watchlist home)
window.location.href = '/?' + params.toString();
});
}
});

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,958 @@
import shutil
from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_safe_valid_url
from flask import (
flash
)
from flask_babel import gettext
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
from ..model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from copy import deepcopy, copy
from os import path, unlink
from threading import Lock
import json
import os
import re
import secrets
import sys
import threading
import time
import uuid as uuid_builder
from loguru import logger
from blinker import signal
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
from ..processors import get_custom_watch_obj_for_processor
from ..processors.restock_diff import Restock
# Import the base class and helpers
from .file_saving_datastore import FileSavingDataStore, load_all_watches, save_watch_atomic, save_json_atomic
from .updates import DatastoreUpdatesMixin
from .legacy_loader import has_legacy_datastore
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
# Open a github issue if you know something :)
# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change
class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
__version_check = True
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
# Initialize parent class
super().__init__()
# Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.datastore_path = datastore_path
self.needs_write = False
self.start_time = time.time()
self.stop_thread = False
self.save_version_copy_json_db(version_tag)
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
def save_version_copy_json_db(self, version_tag):
"""
Create version-tagged backup of changedetection.json.
This is called on version upgrades to preserve a backup in case
the new version has issues.
"""
import re
version_text = re.sub(r'\D+', '-', version_tag)
db_path = os.path.join(self.datastore_path, "changedetection.json")
db_path_version_backup = os.path.join(self.datastore_path, f"changedetection-{version_text}.json")
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
from shutil import copyfile
logger.info(f"Backing up changedetection.json due to new version to '{db_path_version_backup}'.")
copyfile(db_path, db_path_version_backup)
def _load_settings(self):
"""
Load settings from storage.
File backend implementation: reads from changedetection.json
Returns:
dict: Settings data loaded from storage
"""
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
logger.info(f"Loading settings from {changedetection_json}")
if HAS_ORJSON:
with open(changedetection_json, 'rb') as f:
return orjson.loads(f.read())
else:
with open(changedetection_json, 'r', encoding='utf-8') as f:
return json.load(f)
def _apply_settings(self, settings_data):
"""
Apply loaded settings data to internal data structure.
Args:
settings_data: Dictionary loaded from changedetection.json
"""
# Apply top-level fields
if 'app_guid' in settings_data:
self.__data['app_guid'] = settings_data['app_guid']
if 'build_sha' in settings_data:
self.__data['build_sha'] = settings_data['build_sha']
if 'version_tag' in settings_data:
self.__data['version_tag'] = settings_data['version_tag']
# Apply settings sections
if 'settings' in settings_data:
if 'headers' in settings_data['settings']:
self.__data['settings']['headers'].update(settings_data['settings']['headers'])
if 'requests' in settings_data['settings']:
self.__data['settings']['requests'].update(settings_data['settings']['requests'])
if 'application' in settings_data['settings']:
self.__data['settings']['application'].update(settings_data['settings']['application'])
def _rehydrate_tags(self):
"""Rehydrate tag entities from stored data."""
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(
uuid, tag, processor_override='restock_diff'
)
logger.info(f"Tag: {uuid} {tag['title']}")
def _load_state(self):
"""
Load complete datastore state from storage.
Orchestrates loading of settings and watches using polymorphic methods.
"""
# Load settings
settings_data = self._load_settings()
self._apply_settings(settings_data)
# Load watches (polymorphic - parent class method)
self._load_watches()
# Rehydrate tags
self._rehydrate_tags()
def reload_state(self, datastore_path, include_default_watches, version_tag):
"""
Load datastore from storage or create new one.
Supports two scenarios:
1. NEW format: changedetection.json exists → load and run updates if needed
2. EMPTY: No changedetection.json → create new OR trigger migration from legacy
Note: Legacy url-watches.json migration happens in update_26, not here.
"""
logger.info(f"Datastore path is '{datastore_path}'")
# Initialize data structure
self.__data = App.model()
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
# Base definition for all watchers (deepcopy part of #569)
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
# Load build SHA if available (Docker deployments)
if path.isfile('changedetectionio/source.txt'):
with open('changedetectionio/source.txt') as f:
self.__data['build_sha'] = f.read()
# Check if datastore already exists
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
if os.path.exists(changedetection_json):
# Load existing datastore (changedetection.json + watch.json files)
logger.info("Loading existing datastore")
try:
self._load_state()
except Exception as e:
logger.critical(f"Failed to load datastore: {e}")
raise
# Run schema updates if needed
self.run_updates()
else:
# No datastore yet - check if this is a fresh install or legacy migration
# Generate app_guid FIRST (required for all operations)
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else:
self.__data['app_guid'] = str(uuid_builder.uuid4())
# Generate RSS access token
self.__data['settings']['application']['rss_access_token'] = secrets.token_hex(16)
# Generate API access token
self.__data['settings']['application']['api_access_token'] = secrets.token_hex(16)
# Check if legacy datastore exists (url-watches.json)
if has_legacy_datastore(self.datastore_path):
# Legacy datastore detected - trigger migration
logger.critical(f"Legacy datastore detected at {self.datastore_path}/url-watches.json")
logger.critical("Migration will be triggered via update_26")
# Set schema version to 0 to trigger ALL updates including update_26
self.__data['settings']['application']['schema_version'] = 0
# update_26 will load the legacy data and migrate to new format
# Data will be loaded into memory during update_26, no need to add default watches
self.run_updates()
else:
# Fresh install - create new datastore
logger.critical(f"No datastore found, creating new datastore at {self.datastore_path}")
# Set schema version to latest (no updates needed)
updates_available = self.get_updates_available()
self.__data['settings']['application']['schema_version'] = updates_available.pop() if updates_available else 26
# Add default watches if requested
if include_default_watches:
self.add_watch(
url='https://news.ycombinator.com/',
tag='Tech news',
extras={'fetch_backend': 'html_requests'}
)
self.add_watch(
url='https://changedetection.io/CHANGELOG.txt',
tag='changedetection.io',
extras={'fetch_backend': 'html_requests'}
)
# Create changedetection.json immediately
try:
self._save_settings()
logger.info("Created changedetection.json for new datastore")
except Exception as e:
logger.error(f"Failed to create initial changedetection.json: {e}")
# Set version tag
self.__data['version_tag'] = version_tag
# Validate proxies.json if it exists
_ = self.proxy_list # Just to test parsing
# Ensure app_guid exists (for datastores loaded from existing files)
if 'app_guid' not in self.__data:
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
else:
self.__data['app_guid'] = str(uuid_builder.uuid4())
self.mark_settings_dirty()
# Ensure RSS access token exists
if not self.__data['settings']['application'].get('rss_access_token'):
secret = secrets.token_hex(16)
self.__data['settings']['application']['rss_access_token'] = secret
self.mark_settings_dirty()
# Ensure API access token exists
if not self.__data['settings']['application'].get('api_access_token'):
secret = secrets.token_hex(16)
self.__data['settings']['application']['api_access_token'] = secret
self.mark_settings_dirty()
# Handle password reset lockfile
password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock")
if path.isfile(password_reset_lockfile):
self.remove_password()
unlink(password_reset_lockfile)
# Start the background save thread
self.start_save_thread()
def rehydrate_entity(self, uuid, entity, processor_override=None):
"""Set the dict back to the dict Watch object"""
entity['uuid'] = uuid
if processor_override:
watch_class = get_custom_watch_obj_for_processor(processor_override)
entity['processor'] = processor_override
else:
watch_class = get_custom_watch_obj_for_processor(entity.get('processor'))
if entity.get('uuid') != 'text_json_diff':
logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}")
entity = watch_class(datastore_path=self.datastore_path, default=entity)
return entity
# ============================================================================
# FileSavingDataStore Abstract Method Implementations
# ============================================================================
def _watch_exists(self, uuid):
"""Check if watch exists in datastore."""
return uuid in self.__data['watching']
def _get_watch_dict(self, uuid):
"""Get watch as dictionary."""
return dict(self.__data['watching'][uuid])
def _build_settings_data(self):
"""
Build settings data structure for saving.
Returns:
dict: Settings data ready for serialization
"""
return {
'note': 'Settings file - watches are stored in individual {uuid}/watch.json files',
'app_guid': self.__data['app_guid'],
'settings': self.__data['settings'],
'build_sha': self.__data.get('build_sha'),
'version_tag': self.__data.get('version_tag')
}
def _save_settings(self):
"""
Save settings to storage.
File backend implementation: saves to changedetection.json
Implementation of abstract method from FileSavingDataStore.
Uses the generic save_json_atomic helper.
Raises:
OSError: If disk is full or other I/O error
"""
settings_data = self._build_settings_data()
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
save_json_atomic(changedetection_json, settings_data, label="settings", max_size_mb=10)
def _load_watches(self):
"""
Load all watches from storage.
File backend implementation: reads individual watch.json files
Implementation of abstract method from FileSavingDataStore.
Delegates to helper function and stores results in internal data structure.
"""
watching, watch_hashes = load_all_watches(
self.datastore_path,
self.rehydrate_entity,
self._compute_hash
)
# Store loaded data
self.__data['watching'] = watching
self._watch_hashes = watch_hashes
def _delete_watch(self, uuid):
"""
Delete a watch from storage.
File backend implementation: deletes entire {uuid}/ directory recursively.
Implementation of abstract method from FileSavingDataStore.
Args:
uuid: Watch UUID to delete
"""
watch_dir = os.path.join(self.datastore_path, uuid)
if os.path.exists(watch_dir):
shutil.rmtree(watch_dir)
logger.info(f"Deleted watch directory: {watch_dir}")
# ============================================================================
# Watch Management Methods
# ============================================================================
def set_last_viewed(self, uuid, timestamp):
logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}")
self.data['watching'][uuid].update({'last_viewed': int(timestamp)})
self.mark_watch_dirty(uuid)
watch_check_update = signal('watch_check_update')
if watch_check_update:
watch_check_update.send(watch_uuid=uuid)
def remove_password(self):
self.__data['settings']['application']['password'] = False
self.mark_settings_dirty()
def update_watch(self, uuid, update_obj):
# It's possible that the watch could be deleted before update
if not self.__data['watching'].get(uuid):
return
with self.lock:
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
for dict_key, d in self.generic_definition.items():
if isinstance(d, dict):
if update_obj is not None and dict_key in update_obj:
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
del (update_obj[dict_key])
self.__data['watching'][uuid].update(update_obj)
self.mark_watch_dirty(uuid)
@property
def threshold_seconds(self):
seconds = 0
for m, n in Watch.mtable.items():
x = self.__data['settings']['requests']['time_between_check'].get(m)
if x:
seconds += x * n
return seconds
@property
def unread_changes_count(self):
unread_changes_count = 0
for uuid, watch in self.__data['watching'].items():
if watch.history_n >= 2 and watch.viewed == False:
unread_changes_count += 1
return unread_changes_count
@property
def data(self):
# Re #152, Return env base_url if not overriden
# Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services
# like 'Join', so it's always best to atleast set something obvious so that they are not broken.
active_base_url = BASE_URL_NOT_SET_TEXT
if self.__data['settings']['application'].get('base_url'):
active_base_url = self.__data['settings']['application'].get('base_url')
elif os.getenv('BASE_URL'):
active_base_url = os.getenv('BASE_URL')
# I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable
# even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something
d = self.__data
d['settings']['application']['active_base_url'] = active_base_url.strip('" ')
return d
# Delete a single watch by UUID
def delete(self, uuid):
"""
Delete a watch by UUID.
Uses abstracted storage method for backend-agnostic deletion.
Supports 'all' to delete all watches (mainly for testing).
Args:
uuid: Watch UUID to delete, or 'all' to delete all watches
"""
with self.lock:
if uuid == 'all':
# Delete all watches - capture UUIDs first before modifying dict
all_uuids = list(self.__data['watching'].keys())
for watch_uuid in all_uuids:
# Delete from storage using polymorphic method
try:
self._delete_watch(watch_uuid)
except Exception as e:
logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}")
# Clean up tracking data
self._watch_hashes.pop(watch_uuid, None)
self._dirty_watches.discard(watch_uuid)
# Send delete signal
watch_delete_signal = signal('watch_deleted')
if watch_delete_signal:
watch_delete_signal.send(watch_uuid=watch_uuid)
# Clear the dict
self.__data['watching'] = {}
# Mainly used for testing to allow all items to flush before running next test
time.sleep(1)
else:
# Delete single watch from storage using polymorphic method
try:
self._delete_watch(uuid)
except Exception as e:
logger.error(f"Failed to delete watch {uuid} from storage: {e}")
# Remove from watching dict
del self.data['watching'][uuid]
# Clean up tracking data
self._watch_hashes.pop(uuid, None)
self._dirty_watches.discard(uuid)
# Send delete signal
watch_delete_signal = signal('watch_deleted')
if watch_delete_signal:
watch_delete_signal.send(watch_uuid=uuid)
self.needs_write_urgent = True
# Clone a watch by UUID
def clone(self, uuid):
url = self.data['watching'][uuid].get('url')
extras = deepcopy(self.data['watching'][uuid])
new_uuid = self.add_watch(url=url, extras=extras)
watch = self.data['watching'][new_uuid]
return new_uuid
def url_exists(self, url):
# Probably their should be dict...
for watch in self.data['watching'].values():
if watch['url'].lower() == url.lower():
return True
return False
# Remove a watchs data but keep the entry (URL etc)
def clear_watch_history(self, uuid):
self.__data['watching'][uuid].clear_watch()
self.needs_write_urgent = True
def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True):
import requests
if extras is None:
extras = {}
# Incase these are copied across, assume it's a reference and deepcopy()
apply_extras = deepcopy(extras)
apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
# Was it a share link? try to fetch the data
if (url.startswith("https://changedetection.io/share/")):
try:
r = requests.request(method="GET",
url=url,
# So we know to return the JSON instead of the human-friendly "help" page
headers={'App-Guid': self.__data['app_guid']},
timeout=5.0) # 5 second timeout to prevent blocking
res = r.json()
# List of permissible attributes we accept from the wild internet
for k in [
'body',
'browser_steps',
'css_filter',
'extract_text',
'headers',
'ignore_text',
'include_filters',
'method',
'paused',
'previous_md5',
'processor',
'subtractive_selectors',
'tag',
'tags',
'text_should_not_be_present',
'title',
'trigger_text',
'url',
'use_page_title_in_list',
'webdriver_js_execute_code',
]:
if res.get(k):
if k != 'css_filter':
apply_extras[k] = res[k]
else:
# We renamed the field and made it a list
apply_extras['include_filters'] = [res['css_filter']]
except Exception as e:
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
flash(gettext("Error fetching metadata for {}").format(url), 'error')
return False
if not is_safe_valid_url(url):
flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error')
return None
if tag and type(tag) == str:
# Then it's probably a string of the actual tag by name, split and add it
for t in tag.split(','):
# for each stripped tag, add tag as UUID
for a_t in t.split(','):
tag_uuid = self.add_tag(a_t)
apply_extras['tags'].append(tag_uuid)
# Or if UUIDs given directly
if tag_uuids:
for t in tag_uuids:
apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()]))
# Make any uuids unique
if apply_extras.get('tags'):
apply_extras['tags'] = list(set(apply_extras.get('tags')))
# If the processor also has its own Watch implementation
watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor'))
new_watch = watch_class(datastore_path=self.datastore_path, url=url)
new_uuid = new_watch.get('uuid')
logger.debug(f"Adding URL '{url}' - {new_uuid}")
for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']:
if k in apply_extras:
del apply_extras[k]
if not apply_extras.get('date_created'):
apply_extras['date_created'] = int(time.time())
new_watch.update(apply_extras)
new_watch.ensure_data_dir_exists()
self.__data['watching'][new_uuid] = new_watch
if save_immediately:
# Save immediately using polymorphic method
try:
self.save_watch(new_uuid, force=True)
logger.debug(f"Saved new watch {new_uuid}")
except Exception as e:
logger.error(f"Failed to save new watch {new_uuid}: {e}")
# Mark dirty for retry
self.mark_watch_dirty(new_uuid)
else:
self.mark_watch_dirty(new_uuid)
logger.debug(f"Added '{url}'")
return new_uuid
def _watch_resource_exists(self, watch_uuid, resource_name):
"""
Check if a watch-related resource exists.
File backend implementation: checks if file exists in watch directory.
Args:
watch_uuid: Watch UUID
resource_name: Name of resource (e.g., "last-screenshot.png")
Returns:
bool: True if resource exists
"""
resource_path = os.path.join(self.datastore_path, watch_uuid, resource_name)
return path.isfile(resource_path)
def visualselector_data_is_ready(self, watch_uuid):
"""
Check if visual selector data (screenshot + elements) is ready.
Returns:
bool: True if both screenshot and elements data exist
"""
has_screenshot = self._watch_resource_exists(watch_uuid, "last-screenshot.png")
has_elements = self._watch_resource_exists(watch_uuid, "elements.deflate")
return has_screenshot and has_elements
# Old sync_to_json and save_datastore methods removed - now handled by FileSavingDataStore parent class
# Go through the datastore path and remove any snapshots that are not mentioned in the index
# This usually is not used, but can be handy.
def remove_unused_snapshots(self):
logger.info("Removing snapshots from datastore that are not in the index..")
index = []
for uuid in self.data['watching']:
for id in self.data['watching'][uuid].history:
index.append(self.data['watching'][uuid].history[str(id)])
import pathlib
# Only in the sub-directories
for uuid in self.data['watching']:
for item in pathlib.Path(self.datastore_path).rglob(uuid + "/*.txt"):
if not str(item) in index:
logger.info(f"Removing {item}")
unlink(item)
@property
def proxy_list(self):
proxy_list = {}
proxy_list_file = os.path.join(self.datastore_path, 'proxies.json')
# Load from external config file
if path.isfile(proxy_list_file):
if HAS_ORJSON:
# orjson.loads() expects UTF-8 encoded bytes #3611
with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f:
proxy_list = orjson.loads(f.read())
else:
with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f:
proxy_list = json.load(f)
# Mapping from UI config if available
extras = self.data['settings']['requests'].get('extra_proxies')
if extras:
i = 0
for proxy in extras:
i += 0
if proxy.get('proxy_name') and proxy.get('proxy_url'):
k = "ui-" + str(i) + proxy.get('proxy_name')
proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')}
if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')):
proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''}
return proxy_list if len(proxy_list) else None
def get_preferred_proxy_for_watch(self, uuid):
"""
Returns the preferred proxy by ID key
:param uuid: UUID
:return: proxy "key" id
"""
if self.proxy_list is None:
return None
# If it's a valid one
watch = self.data['watching'].get(uuid)
if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy":
return None
if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()):
return watch.get('proxy')
# not valid (including None), try the system one
else:
system_proxy_id = self.data['settings']['requests'].get('proxy')
# Is not None and exists
if self.proxy_list.get(system_proxy_id):
return system_proxy_id
# Fallback - Did not resolve anything, or doesnt exist, use the first available
if system_proxy_id is None or not self.proxy_list.get(system_proxy_id):
first_default = list(self.proxy_list)[0]
return first_default
return None
@property
def has_extra_headers_file(self):
filepath = os.path.join(self.datastore_path, 'headers.txt')
return os.path.isfile(filepath)
def get_all_base_headers(self):
headers = {}
# Global app settings
headers.update(self.data['settings'].get('headers', {}))
return headers
def get_all_headers_in_textfile_for_watch(self, uuid):
from ..model.App import parse_headers_from_text_file
headers = {}
# Global in /datastore/headers.txt
filepath = os.path.join(self.datastore_path, 'headers.txt')
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
watch = self.data['watching'].get(uuid)
if watch:
# In /datastore/xyz-xyz/headers.txt
filepath = os.path.join(watch.watch_data_dir, 'headers.txt')
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
# In /datastore/tag-name.txt
tags = self.get_all_tags_for_watch(uuid=uuid)
for tag_uuid, tag in tags.items():
fname = "headers-" + re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt"
filepath = os.path.join(self.datastore_path, fname)
try:
if os.path.isfile(filepath):
headers.update(parse_headers_from_text_file(filepath))
except Exception as e:
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
return headers
def get_tag_overrides_for_watch(self, uuid, attr):
tags = self.get_all_tags_for_watch(uuid=uuid)
ret = []
if tags:
for tag_uuid, tag in tags.items():
if attr in tag and tag[attr]:
ret = [*ret, *tag[attr]]
return ret
def add_tag(self, title):
# If name exists, return that
n = title.strip().lower()
logger.debug(f">>> Adding new tag - '{n}'")
if not n:
return False
for uuid, tag in self.__data['settings']['application'].get('tags', {}).items():
if n == tag.get('title', '').lower().strip():
logger.warning(f"Tag '{title}' already exists, skipping creation.")
return uuid
# Eventually almost everything todo with a watch will apply as a Tag
# So we use the same model as a Watch
with self.lock:
from ..model import Tag
new_tag = Tag.model(datastore_path=self.datastore_path, default={
'title': title.strip(),
'date_created': int(time.time())
})
new_uuid = new_tag.get('uuid')
self.__data['settings']['application']['tags'][new_uuid] = new_tag
self.mark_settings_dirty()
return new_uuid
def get_all_tags_for_watch(self, uuid):
"""This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet"""
watch = self.data['watching'].get(uuid)
# Should return a dict of full tag info linked by UUID
if watch:
return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', []))
return {}
@property
def extra_browsers(self):
res = []
p = list(filter(
lambda s: (s.get('browser_name') and s.get('browser_connection_url')),
self.__data['settings']['requests'].get('extra_browsers', [])))
if p:
for i in p:
res.append(("extra_browser_" + i['browser_name'], i['browser_name']))
return res
def tag_exists_by_name(self, tag_name):
# Check if any tag dictionary has a 'title' attribute matching the provided tag_name
tags = self.__data['settings']['application']['tags'].values()
return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()),
None)
def any_watches_have_processor_by_name(self, processor_name):
for watch in self.data['watching'].values():
if watch.get('processor') == processor_name:
return True
return False
def search_watches_for_url(self, query, tag_limit=None, partial=False):
"""Search watches by URL, title, or error messages
Args:
query (str): Search term to match against watch URLs, titles, and error messages
tag_limit (str, optional): Optional tag name to limit search results
partial: (bool, optional): sub-string matching
Returns:
list: List of UUIDs of watches that match the search criteria
"""
matching_uuids = []
query = query.lower().strip()
tag = self.tag_exists_by_name(tag_limit) if tag_limit else False
for uuid, watch in self.data['watching'].items():
# Filter by tag if requested
if tag_limit:
if not tag.get('uuid') in watch.get('tags', []):
continue
# Search in URL, title, or error messages
if partial:
if ((watch.get('title') and query in watch.get('title').lower()) or
query in watch.get('url', '').lower() or
(watch.get('last_error') and query in watch.get('last_error').lower())):
matching_uuids.append(uuid)
else:
if ((watch.get('title') and query == watch.get('title').lower()) or
query == watch.get('url', '').lower() or
(watch.get('last_error') and query == watch.get('last_error').lower())):
matching_uuids.append(uuid)
return matching_uuids
def get_unique_notification_tokens_available(self):
# Ask each type of watch if they have any extra notification token to add to the validation
extra_notification_tokens = {}
watch_processors_checked = set()
for watch_uuid, watch in self.__data['watching'].items():
processor = watch.get('processor')
if processor not in watch_processors_checked:
extra_notification_tokens.update(watch.extra_notification_token_values())
watch_processors_checked.add(processor)
return extra_notification_tokens
def get_unique_notification_token_placeholders_available(self):
# The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice
extra_notification_tokens = []
watch_processors_checked = set()
for watch_uuid, watch in self.__data['watching'].items():
processor = watch.get('processor')
if processor not in watch_processors_checked:
extra_notification_tokens += watch.extra_notification_token_placeholder_info()
watch_processors_checked.add(processor)
return extra_notification_tokens
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.mark_settings_dirty()
return notification_url
# Schema update methods moved to store/updates.py (DatastoreUpdatesMixin)
# This includes: get_updates_available(), run_updates(), and update_1() through update_26()

View File

@@ -0,0 +1,100 @@
"""
Base classes for the datastore.
This module defines the abstract interfaces that all datastore implementations must follow.
"""
from abc import ABC, abstractmethod
from threading import Lock
from loguru import logger
class DataStore(ABC):
"""
Abstract base class for all datastore implementations.
Defines the core interface that all datastores must implement for:
- Loading and saving data
- Managing watches
- Handling settings
- Providing data access
"""
lock = Lock()
datastore_path = None
@abstractmethod
def reload_state(self, datastore_path, include_default_watches, version_tag):
"""
Load data from persistent storage.
Args:
datastore_path: Path to the datastore directory
include_default_watches: Whether to create default watches if none exist
version_tag: Application version string
"""
pass
@abstractmethod
def add_watch(self, url, **kwargs):
"""
Add a new watch.
Args:
url: URL to watch
**kwargs: Additional watch parameters
Returns:
UUID of the created watch
"""
pass
@abstractmethod
def update_watch(self, uuid, update_obj):
"""
Update an existing watch.
Args:
uuid: Watch UUID
update_obj: Dictionary of fields to update
"""
pass
@abstractmethod
def delete(self, uuid):
"""
Delete a watch.
Args:
uuid: Watch UUID to delete
"""
pass
@property
@abstractmethod
def data(self):
"""
Access to the underlying data structure.
Returns:
Dictionary containing all datastore data
"""
pass
@abstractmethod
def force_save_all(self):
"""
Force immediate synchronous save of all data to storage.
This is the abstract method for forcing a complete save.
Different backends implement this differently:
- File backend: Mark all watches/settings dirty, then save
- Redis backend: SAVE command or pipeline flush
- SQL backend: COMMIT transaction
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
pass

View File

@@ -0,0 +1,698 @@
"""
File-based datastore with individual watch persistence and dirty tracking.
This module provides the FileSavingDataStore abstract class that implements:
- Individual watch.json file persistence
- Hash-based change detection (only save what changed)
- Background save thread with dirty tracking
- Atomic file writes safe for NFS/NAS
"""
import hashlib
import json
import os
import tempfile
import time
from threading import Thread
from loguru import logger
from .base import DataStore
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
# ============================================================================
# Helper Functions for Atomic File Operations
# ============================================================================
def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10):
"""
Save JSON data to disk using atomic write pattern.
Generic helper for saving any JSON data (settings, watches, etc.) with:
- Atomic write (temp file + rename)
- Directory fsync for crash consistency
- Size validation
- Proper error handling
Args:
file_path: Full path to target JSON file
data_dict: Dictionary to serialize
label: Human-readable label for error messages (e.g., "watch", "settings")
max_size_mb: Maximum allowed file size in MB
Raises:
ValueError: If serialized data exceeds max_size_mb
OSError: If disk is full (ENOSPC) or other I/O error
"""
# Ensure parent directory exists
parent_dir = os.path.dirname(file_path)
os.makedirs(parent_dir, exist_ok=True)
# Create temp file in same directory (required for NFS atomicity)
fd, temp_path = tempfile.mkstemp(
suffix='.tmp',
prefix='json-',
dir=parent_dir,
text=False
)
fd_closed = False
try:
# Serialize data
if HAS_ORJSON:
data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2)
else:
data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8')
# Safety check: validate size
MAX_SIZE = max_size_mb * 1024 * 1024
data_size = len(data)
if data_size > MAX_SIZE:
raise ValueError(
f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB "
f"(max: {max_size_mb}MB). This indicates a bug or data corruption."
)
# Write to temp file
os.write(fd, data)
os.fsync(fd) # Force file data to disk
os.close(fd)
fd_closed = True
# Atomic rename
os.replace(temp_path, file_path)
# Sync directory to ensure filename metadata is durable
try:
dir_fd = os.open(parent_dir, os.O_RDONLY)
try:
os.fsync(dir_fd)
finally:
os.close(dir_fd)
except (OSError, AttributeError):
# Windows doesn't support fsync on directories
pass
except OSError as e:
# Cleanup temp file
if not fd_closed:
try:
os.close(fd)
except:
pass
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
# Provide helpful error messages
if e.errno == 28: # ENOSPC
raise OSError(f"Disk full: Cannot save {label}") from e
elif e.errno == 122: # EDQUOT
raise OSError(f"Disk quota exceeded: Cannot save {label}") from e
else:
raise OSError(f"I/O error saving {label}: {e}") from e
except Exception as e:
# Cleanup temp file
if not fd_closed:
try:
os.close(fd)
except:
pass
if os.path.exists(temp_path):
try:
os.unlink(temp_path)
except:
pass
raise e
def save_watch_atomic(watch_dir, uuid, watch_dict):
"""
Save a watch to disk using atomic write pattern.
Convenience wrapper around save_json_atomic for watches.
Args:
watch_dir: Directory for this watch (e.g., /datastore/{uuid})
uuid: Watch UUID (for logging)
watch_dict: Dictionary representation of the watch
Raises:
ValueError: If serialized data exceeds 10MB (indicates bug or corruption)
OSError: If disk is full (ENOSPC) or other I/O error
"""
watch_json = os.path.join(watch_dir, "watch.json")
save_json_atomic(watch_json, watch_dict, label=f"watch {uuid}", max_size_mb=10)
def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
"""
Load a watch from its JSON file.
Args:
watch_json: Path to the watch.json file
uuid: Watch UUID
rehydrate_entity_func: Function to convert dict to Watch object
Returns:
Tuple of (Watch object, raw_data_dict) or (None, None) if failed
The raw_data_dict is needed to compute the hash before rehydration
"""
try:
# Check file size before reading
file_size = os.path.getsize(watch_json)
MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB
if file_size > MAX_WATCH_SIZE:
logger.critical(
f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: "
f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). "
f"File: {watch_json}. This indicates a bug or data corruption. "
f"Watch will be skipped."
)
return None, None
if HAS_ORJSON:
with open(watch_json, 'rb') as f:
watch_data = orjson.loads(f.read())
else:
with open(watch_json, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
# Return both the raw data and the rehydrated watch
# Raw data is needed to compute hash before rehydration changes anything
watch_obj = rehydrate_entity_func(uuid, watch_data)
return watch_obj, watch_data
except json.JSONDecodeError as e:
logger.critical(
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
except ValueError as e:
# orjson raises ValueError for invalid JSON
if "invalid json" in str(e).lower() or HAS_ORJSON:
logger.critical(
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
f"File: {watch_json}. Error: {e}. "
f"Watch will be skipped and may need manual recovery from backup."
)
return None, None
# Re-raise if it's not a JSON parsing error
raise
except FileNotFoundError:
logger.error(f"Watch file not found: {watch_json} for watch {uuid}")
return None, None
except Exception as e:
logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}")
return None, None
def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
"""
Load all watches from individual watch.json files.
SYNCHRONOUS loading: Blocks until all watches are loaded.
This ensures data consistency - web server won't accept requests
until all watches are available. Progress logged every 100 watches.
Args:
datastore_path: Path to the datastore directory
rehydrate_entity_func: Function to convert dict to Watch object
compute_hash_func: Function to compute hash from raw watch dict
Returns:
Tuple of (watching_dict, hashes_dict)
- watching_dict: uuid -> Watch object
- hashes_dict: uuid -> hash string (computed from raw data)
"""
logger.info("Loading watches from individual watch.json files...")
watching = {}
watch_hashes = {}
# Find all UUID directories
if not os.path.exists(datastore_path):
return watching, watch_hashes
# Get all directories that look like UUIDs
try:
all_items = os.listdir(datastore_path)
except Exception as e:
logger.error(f"Failed to list datastore directory: {e}")
return watching, watch_hashes
uuid_dirs = [
d for d in all_items
if os.path.isdir(os.path.join(datastore_path, d))
and not d.startswith('.') # Skip hidden dirs
and d not in ['__pycache__'] # Skip Python cache dirs
]
# First pass: count directories with watch.json files
watch_dirs = []
for uuid_dir in uuid_dirs:
watch_json = os.path.join(datastore_path, uuid_dir, "watch.json")
if os.path.isfile(watch_json):
watch_dirs.append(uuid_dir)
total = len(watch_dirs)
loaded = 0
failed = 0
for uuid_dir in watch_dirs:
watch_json = os.path.join(datastore_path, uuid_dir, "watch.json")
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch and raw_data:
watching[uuid_dir] = watch
# Compute hash from raw data BEFORE rehydration to match saved hash
watch_hashes[uuid_dir] = compute_hash_func(raw_data)
loaded += 1
if loaded % 100 == 0:
logger.info(f"Loaded {loaded}/{total} watches...")
else:
# load_watch_from_file already logged the specific error
failed += 1
if failed > 0:
logger.critical(
f"LOAD COMPLETE: {loaded} watches loaded successfully, "
f"{failed} watches FAILED to load (corrupted or invalid)"
)
else:
logger.info(f"Loaded {loaded} watches from disk")
return watching, watch_hashes
# ============================================================================
# FileSavingDataStore Class
# ============================================================================
class FileSavingDataStore(DataStore):
"""
Abstract datastore that provides file persistence with change tracking.
Features:
- Individual watch.json files (one per watch)
- Dirty tracking: Only saves items that have changed
- Hash-based change detection: Prevents unnecessary writes
- Background save thread: Non-blocking persistence
- Two-tier urgency: Standard (60s) and urgent (immediate) saves
Subclasses must implement:
- rehydrate_entity(): Convert dict to Watch object
- Access to internal __data structure for watch management
"""
needs_write = False
needs_write_urgent = False
stop_thread = False
# Change tracking
_dirty_watches = set() # Watch UUIDs that need saving
_dirty_settings = False # Settings changed
_watch_hashes = {} # UUID -> SHA256 hash for change detection
# Health monitoring
_last_save_time = 0 # Timestamp of last successful save
_save_cycle_count = 0 # Number of save cycles completed
_total_saves = 0 # Total watches saved (lifetime)
_save_errors = 0 # Total save errors (lifetime)
def __init__(self):
super().__init__()
self.save_data_thread = None
self._last_save_time = time.time()
def _compute_hash(self, watch_dict):
"""
Compute SHA256 hash of watch for change detection.
Args:
watch_dict: Dictionary representation of watch
Returns:
Hex string of SHA256 hash
"""
# Use orjson for deterministic serialization if available
if HAS_ORJSON:
json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS)
else:
json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False)
json_bytes = json_str.encode('utf-8')
return hashlib.sha256(json_bytes).hexdigest()
def mark_watch_dirty(self, uuid):
"""
Mark a watch as needing save.
Args:
uuid: Watch UUID
"""
with self.lock:
self._dirty_watches.add(uuid)
dirty_count = len(self._dirty_watches)
# Backpressure detection - warn if dirty set grows too large
if dirty_count > 1000:
logger.critical(
f"BACKPRESSURE WARNING: {dirty_count} watches pending save! "
f"Save thread may not be keeping up with write rate. "
f"This could indicate disk I/O bottleneck or save thread failure."
)
elif dirty_count > 500:
logger.warning(
f"Dirty watch count high: {dirty_count} watches pending save. "
f"Monitoring for potential backpressure."
)
self.needs_write = True
def mark_settings_dirty(self):
"""Mark settings as needing save."""
with self.lock:
self._dirty_settings = True
self.needs_write = True
def save_watch(self, uuid, force=False):
"""
Save a single watch if it has changed (polymorphic method).
This is the high-level save method that handles:
- Hash computation and change detection
- Calling the backend-specific save implementation
- Updating the hash cache
Args:
uuid: Watch UUID
force: If True, skip hash check and save anyway
Returns:
True if saved, False if skipped (unchanged)
"""
if not self._watch_exists(uuid):
logger.warning(f"Cannot save watch {uuid} - does not exist")
return False
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
# Skip save if unchanged (unless forced)
if not force and current_hash == self._watch_hashes.get(uuid):
#logger.debug(f"Watch {uuid} unchanged, skipping save")
return False
try:
self._save_watch(uuid, watch_dict)
self._watch_hashes[uuid] = current_hash
logger.debug(f"Saved watch {uuid}")
return True
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
raise
def _save_watch(self, uuid, watch_dict):
"""
Save a single watch to storage (polymorphic).
Backend-specific implementation. Subclasses override for different storage:
- File backend: Writes to {uuid}/watch.json
- Redis backend: SET watch:{uuid}
- SQL backend: UPDATE watches WHERE uuid=?
Args:
uuid: Watch UUID
watch_dict: Dictionary representation of watch
"""
# Default file implementation
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
def _save_settings(self):
"""
Save settings to storage (polymorphic).
Subclasses must implement for their backend.
- File: changedetection.json
- Redis: SET settings
- SQL: UPDATE settings table
"""
raise NotImplementedError("Subclass must implement _save_settings")
def _load_watches(self):
"""
Load all watches from storage (polymorphic).
Subclasses must implement for their backend.
- File: Read individual watch.json files
- Redis: SCAN watch:* keys
- SQL: SELECT * FROM watches
"""
raise NotImplementedError("Subclass must implement _load_watches")
def _delete_watch(self, uuid):
"""
Delete a watch from storage (polymorphic).
Subclasses must implement for their backend.
- File: Delete {uuid}/ directory recursively
- Redis: DEL watch:{uuid}
- SQL: DELETE FROM watches WHERE uuid=?
Args:
uuid: Watch UUID to delete
"""
raise NotImplementedError("Subclass must implement _delete_watch")
def _save_dirty_items(self):
"""
Save only items that have changed.
This is the core optimization: instead of saving the entire datastore,
we only save watches that were marked dirty and settings if changed.
"""
start_time = time.time()
# Capture dirty sets under lock
with self.lock:
dirty_watches = list(self._dirty_watches)
dirty_settings = self._dirty_settings
self._dirty_watches.clear()
self._dirty_settings = False
if not dirty_watches and not dirty_settings:
return
logger.debug(f"Checking {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}")
# Save each dirty watch using the polymorphic save method
saved_count = 0
error_count = 0
skipped_unchanged = 0
for uuid in dirty_watches:
# Check if watch still exists (might have been deleted)
if not self._watch_exists(uuid):
# Watch was deleted, remove hash
self._watch_hashes.pop(uuid, None)
continue
# Pre-check hash to avoid unnecessary save_watch() calls
watch_dict = self._get_watch_dict(uuid)
current_hash = self._compute_hash(watch_dict)
if current_hash == self._watch_hashes.get(uuid):
# Watch hasn't actually changed, skip
skipped_unchanged += 1
continue
try:
if self.save_watch(uuid, force=True): # force=True since we already checked hash
saved_count += 1
except Exception as e:
error_count += 1
# Re-mark for retry
with self.lock:
self._dirty_watches.add(uuid)
# Save settings if changed
if dirty_settings:
try:
self._save_settings()
logger.debug("Saved settings")
except Exception as e:
logger.error(f"Failed to save settings: {e}")
error_count += 1
with self.lock:
self._dirty_settings = True
# Update metrics
elapsed = time.time() - start_time
self._save_cycle_count += 1
self._total_saves += saved_count
self._save_errors += error_count
self._last_save_time = time.time()
# Log performance metrics
if saved_count > 0:
avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds
skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else ""
logger.info(
f"Successfully saved {saved_count} watches in {elapsed:.2f}s "
f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}). "
f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)"
)
elif skipped_unchanged > 0:
logger.debug(f"Save cycle: {skipped_unchanged} watches unchanged, nothing saved")
if error_count > 0:
logger.error(f"Save cycle completed with {error_count} errors")
self.needs_write = False
self.needs_write_urgent = False
def _watch_exists(self, uuid):
"""
Check if watch exists. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
bool
"""
raise NotImplementedError("Subclass must implement _watch_exists")
def _get_watch_dict(self, uuid):
"""
Get watch as dictionary. Subclass must implement.
Args:
uuid: Watch UUID
Returns:
Dictionary representation of watch
"""
raise NotImplementedError("Subclass must implement _get_watch_dict")
def save_datastore(self):
"""
Background thread that periodically saves dirty items.
Runs every 60 seconds (with 0.5s sleep intervals for responsiveness),
or immediately when needs_write_urgent is set.
"""
while True:
if self.stop_thread:
# Graceful shutdown: flush any remaining dirty items before stopping
if self.needs_write or self._dirty_watches or self._dirty_settings:
logger.warning("Datastore save thread stopping - flushing remaining dirty items...")
try:
self._save_dirty_items()
logger.info("Graceful shutdown complete - all data saved")
except Exception as e:
logger.critical(f"FAILED to save dirty items during shutdown: {e}")
else:
logger.info("Datastore save thread stopping - no dirty items")
return
if self.needs_write or self.needs_write_urgent:
try:
self._save_dirty_items()
except Exception as e:
logger.error(f"Error in save cycle: {e}")
# 60 second timer with early break for urgent saves
for i in range(120):
time.sleep(0.5)
if self.stop_thread or self.needs_write_urgent:
break
def start_save_thread(self):
"""Start the background save thread."""
if not self.save_data_thread or not self.save_data_thread.is_alive():
self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
self.save_data_thread.start()
logger.info("Datastore save thread started")
def force_save_all(self):
"""
Force immediate synchronous save of all changes to storage.
File backend implementation of the abstract force_save_all() method.
Marks all watches and settings as dirty, then saves immediately.
Used by:
- Backup creation (ensure everything is saved before backup)
- Shutdown (ensure all changes are persisted)
- Manual save operations
"""
logger.info("Force saving all data to storage...")
# Mark everything as dirty to ensure complete save
for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid)
self.mark_settings_dirty()
# Save immediately (synchronous)
self._save_dirty_items()
logger.success("All data saved to storage")
def get_health_status(self):
"""
Get datastore health status for monitoring.
Returns:
dict with health metrics and status
"""
now = time.time()
time_since_last_save = now - self._last_save_time
with self.lock:
dirty_count = len(self._dirty_watches)
is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive()
# Determine health status
if not is_thread_alive:
status = "CRITICAL"
message = "Save thread is DEAD"
elif time_since_last_save > 300: # 5 minutes
status = "WARNING"
message = f"No save activity for {time_since_last_save:.0f}s"
elif dirty_count > 1000:
status = "WARNING"
message = f"High backpressure: {dirty_count} watches pending"
elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01:
status = "WARNING"
message = f"High error rate: {self._save_errors} errors"
else:
status = "HEALTHY"
message = "Operating normally"
return {
"status": status,
"message": message,
"thread_alive": is_thread_alive,
"dirty_watches": dirty_count,
"dirty_settings": self._dirty_settings,
"last_save_seconds_ago": int(time_since_last_save),
"save_cycles": self._save_cycle_count,
"total_saves": self._total_saves,
"total_errors": self._save_errors,
"error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2)
}

View File

@@ -0,0 +1,66 @@
"""
Legacy format loader for url-watches.json.
Provides functions to detect and load from the legacy monolithic JSON format.
Used during migration (update_26) to transition to individual watch.json files.
"""
import os
import json
from loguru import logger
# Try to import orjson for faster JSON serialization
try:
import orjson
HAS_ORJSON = True
except ImportError:
HAS_ORJSON = False
def has_legacy_datastore(datastore_path):
"""
Check if a legacy url-watches.json file exists.
This is used by update_26 to determine if migration is needed.
Args:
datastore_path: Path to datastore directory
Returns:
bool: True if url-watches.json exists
"""
url_watches_json = os.path.join(datastore_path, "url-watches.json")
return os.path.exists(url_watches_json)
def load_legacy_format(json_store_path):
"""
Load datastore from legacy url-watches.json format.
Args:
json_store_path: Full path to url-watches.json file
Returns:
dict: Loaded datastore data with 'watching', 'settings', etc.
None: If file doesn't exist or loading failed
"""
logger.info(f"Loading from legacy format: {json_store_path}")
if not os.path.isfile(json_store_path):
logger.warning(f"Legacy file not found: {json_store_path}")
return None
try:
if HAS_ORJSON:
with open(json_store_path, 'rb') as f:
data = orjson.loads(f.read())
else:
with open(json_store_path, 'r', encoding='utf-8') as f:
data = json.load(f)
logger.info(f"Loaded {len(data.get('watching', {}))} watches from legacy format")
return data
except Exception as e:
logger.error(f"Failed to load legacy format: {e}")
return None

View File

@@ -0,0 +1,660 @@
"""
Schema update migrations for the datastore.
This module contains all schema version upgrade methods (update_1 through update_N).
These are mixed into ChangeDetectionStore to keep the main store file focused.
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
"""
import os
import re
import shutil
import tarfile
import time
from loguru import logger
from copy import deepcopy
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
from ..processors.restock_diff import Restock
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
from .file_saving_datastore import save_watch_atomic
def create_backup_tarball(datastore_path, update_number):
"""
Create a tarball backup of the entire datastore structure before running an update.
Includes:
- All {uuid}/watch.json files
- changedetection.json (settings, if it exists)
- url-watches.json (legacy format, if it exists)
- Directory structure preserved
Args:
datastore_path: Path to datastore directory
update_number: Update number being applied
Returns:
str: Path to created tarball, or None if backup failed
Restoration:
To restore from a backup:
cd /path/to/datastore
tar -xzf before-update-N-timestamp.tar.gz
This will restore all watch.json files and settings to their pre-update state.
"""
timestamp = int(time.time())
backup_filename = f"before-update-{update_number}-{timestamp}.tar.gz"
backup_path = os.path.join(datastore_path, backup_filename)
try:
logger.info(f"Creating backup tarball: {backup_filename}")
with tarfile.open(backup_path, "w:gz") as tar:
# Backup changedetection.json if it exists (new format)
changedetection_json = os.path.join(datastore_path, "changedetection.json")
if os.path.isfile(changedetection_json):
tar.add(changedetection_json, arcname="changedetection.json")
logger.debug("Added changedetection.json to backup")
# Backup url-watches.json if it exists (legacy format)
url_watches_json = os.path.join(datastore_path, "url-watches.json")
if os.path.isfile(url_watches_json):
tar.add(url_watches_json, arcname="url-watches.json")
logger.debug("Added url-watches.json to backup")
# Backup all watch directories with their watch.json files
# This preserves the UUID directory structure
watch_count = 0
for entry in os.listdir(datastore_path):
entry_path = os.path.join(datastore_path, entry)
# Skip if not a directory
if not os.path.isdir(entry_path):
continue
# Skip hidden directories and backup directories
if entry.startswith('.') or entry.startswith('before-update-'):
continue
# Check if this directory has a watch.json (indicates it's a watch UUID directory)
watch_json = os.path.join(entry_path, "watch.json")
if os.path.isfile(watch_json):
# Add the watch.json file preserving directory structure
tar.add(watch_json, arcname=f"{entry}/watch.json")
watch_count += 1
if watch_count % 100 == 0:
logger.debug(f"Backed up {watch_count} watch.json files...")
logger.success(f"Backup created: {backup_filename} ({watch_count} watches)")
return backup_path
except Exception as e:
logger.error(f"Failed to create backup tarball: {e}")
# Try to clean up partial backup
if os.path.exists(backup_path):
try:
os.unlink(backup_path)
except:
pass
return None
class DatastoreUpdatesMixin:
"""
Mixin class containing all schema update methods.
This class is inherited by ChangeDetectionStore to provide schema migration functionality.
Each update_N method upgrades the schema from version N-1 to version N.
"""
def get_updates_available(self):
"""
Discover all available update methods.
Returns:
list: Sorted list of update version numbers (e.g., [1, 2, 3, ..., 26])
"""
import inspect
updates_available = []
for i, o in inspect.getmembers(self, predicate=inspect.ismethod):
m = re.search(r'update_(\d+)$', i)
if m:
updates_available.append(int(m.group(1)))
updates_available.sort()
return updates_available
def run_updates(self):
"""
Run all pending schema updates sequentially.
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
Process:
1. Get list of available updates
2. For each update > current schema version:
- Create backup of datastore
- Run update method
- Update schema version
- Mark settings and watches dirty
3. If any update fails, stop processing
4. Save all changes immediately
"""
updates_available = self.get_updates_available()
updates_ran = []
for update_n in updates_available:
if update_n > self.data['settings']['application']['schema_version']:
logger.critical(f"Applying update_{update_n}")
# Create tarball backup of entire datastore structure
# This includes all watch.json files, settings, and preserves directory structure
backup_path = create_backup_tarball(self.datastore_path, update_n)
if backup_path:
logger.info(f"Backup created at: {backup_path}")
else:
logger.warning("Backup creation failed, but continuing with update")
try:
update_method = getattr(self, f"update_{update_n}")()
except Exception as e:
logger.error(f"Error while trying update_{update_n}")
logger.error(e)
# Don't run any more updates
return
else:
# Bump the version, important
self.data['settings']['application']['schema_version'] = update_n
self.mark_settings_dirty()
# CRITICAL: Mark all watches as dirty so changes are persisted
# Most updates modify watches, and in the new individual watch.json structure,
# we need to ensure those changes are saved
logger.info(f"Marking all {len(self.data['watching'])} watches as dirty after update_{update_n}")
for uuid in self.data['watching'].keys():
self.mark_watch_dirty(uuid)
# Track which updates ran
updates_ran.append(update_n)
# If any updates ran, save all changes immediately
if updates_ran:
logger.critical(f"Saving all changes after running {len(updates_ran)} update(s): {updates_ran}")
try:
self._save_dirty_items()
logger.success("All update changes saved successfully")
except Exception as e:
logger.error(f"Failed to save updates: {e}")
# Don't raise - updates already ran, just log the error
# ============================================================================
# Individual Update Methods
# ============================================================================
def update_1(self):
"""Convert minutes to seconds on settings and each watch."""
if self.data['settings']['requests'].get('minutes_between_check'):
self.data['settings']['requests']['time_between_check']['minutes'] = self.data['settings']['requests']['minutes_between_check']
# Remove the default 'hours' that is set from the model
self.data['settings']['requests']['time_between_check']['hours'] = None
for uuid, watch in self.data['watching'].items():
if 'minutes_between_check' in watch:
# Only upgrade individual watch time if it was set
if watch.get('minutes_between_check', False):
self.data['watching'][uuid]['time_between_check']['minutes'] = watch['minutes_between_check']
def update_2(self):
"""
Move the history list to a flat text file index.
Better than SQLite because this list is only appended to, and works across NAS / NFS type setups.
"""
# @todo test running this on a newly updated one (when this already ran)
for uuid, watch in self.data['watching'].items():
history = []
if watch.get('history', False):
for d, p in watch['history'].items():
d = int(d) # Used to be keyed as str, we'll fix this now too
history.append("{},{}\n".format(d, p))
if len(history):
target_path = os.path.join(self.datastore_path, uuid)
if os.path.exists(target_path):
with open(os.path.join(target_path, "history.txt"), "w") as f:
f.writelines(history)
else:
logger.warning(f"Datastore history directory {target_path} does not exist, skipping history import.")
# No longer needed, dynamically pulled from the disk when needed.
# But we should set it back to a empty dict so we don't break if this schema runs on an earlier version.
# In the distant future we can remove this entirely
self.data['watching'][uuid]['history'] = {}
def update_3(self):
"""We incorrectly stored last_changed when there was not a change, and then confused the output list table."""
# see https://github.com/dgtlmoon/changedetection.io/pull/835
return
def update_4(self):
"""`last_changed` not needed, we pull that information from the history.txt index."""
for uuid, watch in self.data['watching'].items():
try:
# Remove it from the struct
del(watch['last_changed'])
except:
continue
return
def update_5(self):
"""
If the watch notification body, title look the same as the global one, unset it, so the watch defaults back to using the main settings.
In other words - the watch notification_title and notification_body are not needed if they are the same as the default one.
"""
current_system_body = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
current_system_title = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
for uuid, watch in self.data['watching'].items():
try:
watch_body = watch.get('notification_body', '')
if watch_body and watch_body.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_body:
# Looks the same as the default one, so unset it
watch['notification_body'] = None
watch_title = watch.get('notification_title', '')
if watch_title and watch_title.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_title:
# Looks the same as the default one, so unset it
watch['notification_title'] = None
except Exception as e:
continue
return
def update_7(self):
"""
We incorrectly used common header overrides that should only apply to Requests.
These are now handled in content_fetcher::html_requests and shouldnt be passed to Playwright/Selenium.
"""
# These were hard-coded in early versions
for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']:
if self.data['settings']['headers'].get(v):
del self.data['settings']['headers'][v]
def update_8(self):
"""Convert filters to a list of filters css_filter -> include_filters."""
for uuid, watch in self.data['watching'].items():
try:
existing_filter = watch.get('css_filter', '')
if existing_filter:
watch['include_filters'] = [existing_filter]
except:
continue
return
def update_9(self):
"""Convert old static notification tokens to jinja2 tokens."""
# Each watch
# only { } not {{ or }}
r = r'(?<!{){(?!{)(\w+)(?<!})}(?!})'
for uuid, watch in self.data['watching'].items():
try:
n_body = watch.get('notification_body', '')
if n_body:
watch['notification_body'] = re.sub(r, r'{{\1}}', n_body)
n_title = watch.get('notification_title')
if n_title:
watch['notification_title'] = re.sub(r, r'{{\1}}', n_title)
n_urls = watch.get('notification_urls')
if n_urls:
for i, url in enumerate(n_urls):
watch['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
except:
continue
# System wide
n_body = self.data['settings']['application'].get('notification_body')
if n_body:
self.data['settings']['application']['notification_body'] = re.sub(r, r'{{\1}}', n_body)
n_title = self.data['settings']['application'].get('notification_title')
if n_body:
self.data['settings']['application']['notification_title'] = re.sub(r, r'{{\1}}', n_title)
n_urls = self.data['settings']['application'].get('notification_urls')
if n_urls:
for i, url in enumerate(n_urls):
self.data['settings']['application']['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
return
def update_10(self):
"""Some setups may have missed the correct default, so it shows the wrong config in the UI, although it will default to system-wide."""
for uuid, watch in self.data['watching'].items():
try:
if not watch.get('fetch_backend', ''):
watch['fetch_backend'] = 'system'
except:
continue
return
def update_12(self):
"""Create tag objects and their references from existing tag text."""
i = 0
for uuid, watch in self.data['watching'].items():
# Split out and convert old tag string
tag = watch.get('tag')
if tag:
tag_uuids = []
for t in tag.split(','):
tag_uuids.append(self.add_tag(title=t))
self.data['watching'][uuid]['tags'] = tag_uuids
def update_13(self):
"""#1775 - Update 11 did not update the records correctly when adding 'date_created' values for sorting."""
i = 0
for uuid, watch in self.data['watching'].items():
if not watch.get('date_created'):
self.data['watching'][uuid]['date_created'] = i
i += 1
return
def update_14(self):
"""#1774 - protect xpath1 against migration."""
for awatch in self.data["watching"]:
if self.data["watching"][awatch]['include_filters']:
for num, selector in enumerate(self.data["watching"][awatch]['include_filters']):
if selector.startswith('/'):
self.data["watching"][awatch]['include_filters'][num] = 'xpath1:' + selector
if selector.startswith('xpath:'):
self.data["watching"][awatch]['include_filters'][num] = selector.replace('xpath:', 'xpath1:', 1)
def update_15(self):
"""Use more obvious default time setting."""
for uuid in self.data["watching"]:
if self.data["watching"][uuid]['time_between_check'] == self.data['settings']['requests']['time_between_check']:
# What the old logic was, which was pretty confusing
self.data["watching"][uuid]['time_between_check_use_default'] = True
elif all(value is None or value == 0 for value in self.data["watching"][uuid]['time_between_check'].values()):
self.data["watching"][uuid]['time_between_check_use_default'] = True
else:
# Something custom here
self.data["watching"][uuid]['time_between_check_use_default'] = False
def update_16(self):
"""Correctly set datatype for older installs where 'tag' was string and update_12 did not catch it."""
for uuid, watch in self.data['watching'].items():
if isinstance(watch.get('tags'), str):
self.data['watching'][uuid]['tags'] = []
def update_17(self):
"""Migrate old 'in_stock' values to the new Restock."""
for uuid, watch in self.data['watching'].items():
if 'in_stock' in watch:
watch['restock'] = Restock({'in_stock': watch.get('in_stock')})
del watch['in_stock']
def update_18(self):
"""Migrate old restock settings."""
for uuid, watch in self.data['watching'].items():
if not watch.get('restock_settings'):
# So we enable price following by default
self.data['watching'][uuid]['restock_settings'] = {'follow_price_changes': True}
# Migrate and cleanoff old value
self.data['watching'][uuid]['restock_settings']['in_stock_processing'] = 'in_stock_only' if watch.get(
'in_stock_only') else 'all_changes'
if self.data['watching'][uuid].get('in_stock_only'):
del (self.data['watching'][uuid]['in_stock_only'])
def update_19(self):
"""Compress old elements.json to elements.deflate, saving disk, this compression is pretty fast."""
import zlib
for uuid, watch in self.data['watching'].items():
json_path = os.path.join(self.datastore_path, uuid, "elements.json")
deflate_path = os.path.join(self.datastore_path, uuid, "elements.deflate")
if os.path.exists(json_path):
with open(json_path, "rb") as f_j:
with open(deflate_path, "wb") as f_d:
logger.debug(f"Compressing {str(json_path)} to {str(deflate_path)}..")
f_d.write(zlib.compress(f_j.read()))
os.unlink(json_path)
def update_20(self):
"""Migrate extract_title_as_title to use_page_title_in_list."""
for uuid, watch in self.data['watching'].items():
if self.data['watching'][uuid].get('extract_title_as_title'):
self.data['watching'][uuid]['use_page_title_in_list'] = self.data['watching'][uuid].get('extract_title_as_title')
del self.data['watching'][uuid]['extract_title_as_title']
if self.data['settings']['application'].get('extract_title_as_title'):
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def update_21(self):
"""Migrate timezone to scheduler_timezone_default."""
if self.data['settings']['application'].get('timezone'):
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
del self.data['settings']['application']['timezone']
def update_23(self):
"""Some notification formats got the wrong name type."""
def re_run(formats):
sys_n_format = self.data['settings']['application'].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == sys_n_format), None)
if key_exists_as_value: # key of "Plain text"
logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['notification_format'] = key_exists_as_value
for uuid, watch in self.data['watching'].items():
n_format = self.data['watching'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
for uuid, tag in self.data['settings']['application']['tags'].items():
n_format = self.data['settings']['application']['tags'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(
f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['tags'][uuid][
'notification_format'] = key_exists_as_value # should be 'text' or whatever
from ..notification import valid_notification_formats
formats = deepcopy(valid_notification_formats)
re_run(formats)
# And in previous versions, it was "text" instead of Plain text, Markdown instead of "Markdown to HTML"
formats['text'] = 'Text'
formats['markdown'] = 'Markdown'
re_run(formats)
def update_24(self):
"""RSS types should be inline with the same names as notification types."""
rss_format = self.data['settings']['application'].get('rss_content_format')
if not rss_format or 'text' in rss_format:
# might have been 'plaintext, 'plain text' or something
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
elif 'html' in rss_format:
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
else:
# safe fallback to text
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
def update_25(self):
"""Different processors now hold their own history.txt."""
for uuid, watch in self.data['watching'].items():
processor = self.data['watching'][uuid].get('processor')
if processor != 'text_json_diff':
old_history_txt = os.path.join(self.datastore_path, "history.txt")
target_history_name = f"history-{processor}.txt"
if os.path.isfile(old_history_txt) and not os.path.isfile(target_history_name):
new_history_txt = os.path.join(self.datastore_path, target_history_name)
logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...")
shutil.move(old_history_txt, new_history_txt)
def update_26(self):
"""
Migration: Individual watch persistence (COPY-based, safe rollback).
Loads legacy url-watches.json format and migrates to:
- {uuid}/watch.json (per watch)
- changedetection.json (settings only)
IMPORTANT:
- A tarball backup (before-update-26-timestamp.tar.gz) is created before migration
- url-watches.json is LEFT INTACT for rollback safety
- Users can roll back by simply downgrading to the previous version
- Or restore from tarball: tar -xzf before-update-26-*.tar.gz
This is a dedicated migration release - users upgrade at their own pace.
"""
logger.critical("=" * 80)
logger.critical("Running migration: Individual watch persistence (update_26)")
logger.critical("COPY-based migration: url-watches.json will remain intact for rollback")
logger.critical("=" * 80)
# Check if already migrated
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
if os.path.exists(changedetection_json):
logger.info("Migration already completed (changedetection.json exists), skipping")
return
# Check if we need to load legacy data
from .legacy_loader import has_legacy_datastore, load_legacy_format
if not has_legacy_datastore(self.datastore_path):
logger.info("No legacy datastore found, nothing to migrate")
return
# Load legacy data from url-watches.json
logger.critical("Loading legacy datastore from url-watches.json...")
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
legacy_data = load_legacy_format(legacy_path)
if not legacy_data:
raise Exception("Failed to load legacy datastore from url-watches.json")
# Populate settings from legacy data
logger.info("Populating settings from legacy data...")
if 'settings' in legacy_data:
self.data['settings'] = legacy_data['settings']
if 'app_guid' in legacy_data:
self.data['app_guid'] = legacy_data['app_guid']
if 'build_sha' in legacy_data:
self.data['build_sha'] = legacy_data['build_sha']
if 'version_tag' in legacy_data:
self.data['version_tag'] = legacy_data['version_tag']
# Rehydrate watches from legacy data
logger.info("Rehydrating watches from legacy data...")
self.data['watching'] = {}
for uuid, watch_data in legacy_data.get('watching', {}).items():
try:
self.data['watching'][uuid] = self.rehydrate_entity(uuid, watch_data)
except Exception as e:
logger.error(f"Failed to rehydrate watch {uuid}: {e}")
raise Exception(f"Migration failed: Could not rehydrate watch {uuid}. Error: {e}")
watch_count = len(self.data['watching'])
logger.success(f"Loaded {watch_count} watches from legacy format")
# Phase 1: Save all watches to individual files
logger.critical(f"Phase 1/4: Saving {watch_count} watches to individual watch.json files...")
saved_count = 0
for uuid, watch in self.data['watching'].items():
try:
watch_dict = dict(watch)
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
# Initialize hash
self._watch_hashes[uuid] = self._compute_hash(watch_dict)
saved_count += 1
if saved_count % 100 == 0:
logger.info(f" Progress: {saved_count}/{watch_count} watches saved...")
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
raise Exception(
f"Migration failed: Could not save watch {uuid}. "
f"url-watches.json remains intact, safe to retry. Error: {e}"
)
logger.critical(f"Phase 1 complete: Saved {saved_count} watches")
# Phase 2: Verify all files exist
logger.critical("Phase 2/4: Verifying all watch.json files were created...")
missing = []
for uuid in self.data['watching'].keys():
watch_json = os.path.join(self.datastore_path, uuid, "watch.json")
if not os.path.isfile(watch_json):
missing.append(uuid)
if missing:
raise Exception(
f"Migration failed: {len(missing)} watch files missing: {missing[:5]}... "
f"url-watches.json remains intact, safe to retry."
)
logger.critical(f"Phase 2 complete: Verified {watch_count} watch files")
# Phase 3: Create new settings file
logger.critical("Phase 3/4: Creating changedetection.json...")
try:
self._save_settings()
except Exception as e:
logger.error(f"Failed to create changedetection.json: {e}")
raise Exception(
f"Migration failed: Could not create changedetection.json. "
f"url-watches.json remains intact, safe to retry. Error: {e}"
)
# Phase 4: Verify settings file exists
logger.critical("Phase 4/4: Verifying changedetection.json exists...")
if not os.path.isfile(changedetection_json):
raise Exception(
"Migration failed: changedetection.json not found after save. "
"url-watches.json remains intact, safe to retry."
)
logger.critical("Phase 4 complete: Verified changedetection.json exists")
# Success! Now reload from new format
logger.critical("Reloading datastore from new format...")
self._load_state()
logger.success("Datastore reloaded from new format successfully")
logger.critical("=" * 80)
logger.critical("MIGRATION COMPLETED SUCCESSFULLY!")
logger.critical("=" * 80)
logger.info("")
logger.info("New format:")
logger.info(f" - {watch_count} individual watch.json files created")
logger.info(f" - changedetection.json created (settings only)")
logger.info("")
logger.info("Rollback safety:")
logger.info(" - url-watches.json preserved for rollback")
logger.info(" - To rollback: downgrade to previous version and restart")
logger.info(" - No manual file operations needed")
logger.info("")
logger.info("Optional cleanup (after testing new version):")
logger.info(f" - rm {os.path.join(self.datastore_path, 'url-watches.json')}")
logger.info("")
# Schema version will be updated by run_updates()

View File

@@ -58,7 +58,7 @@ def is_valid_uuid(val):
def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
@@ -506,7 +506,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Enable password check and diff page access bypass
@@ -548,3 +548,172 @@ def test_api_conflict_UI_password(client, live_server, measure_memory_usage, dat
assert len(res.json)
def test_api_url_validation(client, live_server, measure_memory_usage, datastore_path):
"""
Test URL validation for edge cases in both CREATE and UPDATE endpoints.
Addresses security issues where empty/null/invalid URLs could bypass validation.
This test ensures that:
- CREATE endpoint rejects null, empty, and invalid URLs
- UPDATE endpoint rejects attempts to change URL to null, empty, or invalid
- UPDATE endpoint allows updating other fields without touching URL
- URL validation properly checks protocol, format, and safety
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: CREATE with null URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": None}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with null URL should fail"
# Test 2: CREATE with empty string URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": ""}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with empty string URL should fail"
assert b'Invalid or unsupported URL' in res.data or b'required' in res.data.lower()
# Test 3: CREATE with whitespace-only URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": " "}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with whitespace-only URL should fail"
# Test 4: CREATE with invalid protocol should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "javascript:alert(1)"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with javascript: protocol should fail"
# Test 5: CREATE with missing protocol should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "example.com"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch without protocol should fail"
# Test 6: CREATE with valid URL should succeed (baseline)
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Valid URL test"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 201, "Creating watch with valid URL should succeed"
assert is_valid_uuid(res.json.get('uuid'))
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 7: UPDATE to null URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": None}),
)
assert res.status_code == 400, "Updating watch URL to null should fail"
# Accept either OpenAPI validation error or our custom validation error
assert b'URL cannot be null' in res.data or b'OpenAPI validation failed' in res.data or b'validation error' in res.data.lower()
# Test 8: UPDATE to empty string URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": ""}),
)
assert res.status_code == 400, "Updating watch URL to empty string should fail"
# Accept either our custom validation error or OpenAPI/schema validation error
assert b'URL cannot be empty' in res.data or b'OpenAPI validation' in res.data or b'Invalid or unsupported URL' in res.data
# Test 9: UPDATE to whitespace-only URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": " \t\n "}),
)
assert res.status_code == 400, "Updating watch URL to whitespace should fail"
# Accept either our custom validation error or generic validation error
assert b'URL cannot be empty' in res.data or b'Invalid or unsupported URL' in res.data or b'validation' in res.data.lower()
# Test 10: UPDATE to invalid protocol should fail (javascript:)
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": "javascript:alert(document.domain)"}),
)
assert res.status_code == 400, "Updating watch URL to XSS attempt should fail"
assert b'Invalid or unsupported URL' in res.data or b'protocol' in res.data.lower()
# Test 11: UPDATE to file:// protocol should fail (unless ALLOW_FILE_URI is set)
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": "file:///etc/passwd"}),
)
assert res.status_code == 400, "Updating watch URL to file:// should fail by default"
# Test 12: UPDATE other fields without URL should succeed
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"title": "Updated title without URL change"}),
)
assert res.status_code == 200, "Updating other fields without URL should succeed"
# Test 13: Verify URL is still valid after non-URL update
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.json.get('url') == test_url, "URL should remain unchanged"
assert res.json.get('title') == "Updated title without URL change"
# Test 14: UPDATE to valid different URL should succeed
new_valid_url = test_url + "?new=param"
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": new_valid_url}),
)
assert res.status_code == 200, "Updating to valid different URL should succeed"
# Test 15: Verify URL was actually updated
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.json.get('url') == new_valid_url, "URL should be updated to new valid URL"
# Test 16: CREATE with XSS in URL parameters should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "http://example.com?xss=<script>alert(1)</script>"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
# This should fail because of suspicious characters check
assert res.status_code == 400, "Creating watch with XSS in URL params should fail"
# Cleanup
client.delete(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key},
)
delete_all_watches(client)

View File

@@ -0,0 +1,805 @@
#!/usr/bin/env python3
"""
Comprehensive security and edge case tests for the API.
Tests critical areas that were identified as gaps in the existing test suite.
"""
import time
import json
import threading
import uuid as uuid_module
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
import os
def set_original_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def is_valid_uuid(val):
try:
uuid_module.UUID(str(val))
return True
except ValueError:
return False
# ============================================================================
# TIER 1: CRITICAL SECURITY TESTS
# ============================================================================
def test_api_path_traversal_in_uuids(client, live_server, measure_memory_usage, datastore_path):
"""
Test that path traversal attacks via UUID parameter are blocked.
Addresses CVE-like vulnerabilities where ../../../ in UUID could access arbitrary files.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create a valid watch first
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Valid watch"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
valid_uuid = res.json.get('uuid')
# Test 1: Path traversal with ../../../
res = client.get(
f"/api/v1/watch/../../etc/passwd",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Path traversal should be rejected"
# Test 2: Encoded path traversal
res = client.get(
"/api/v1/watch/..%2F..%2F..%2Fetc%2Fpasswd",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Encoded path traversal should be rejected"
# Test 3: Double-encoded path traversal
res = client.get(
"/api/v1/watch/%2e%2e%2f%2e%2e%2f%2e%2e%2f",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Double-encoded traversal should be rejected"
# Test 4: Try to access datastore file
res = client.get(
"/api/v1/watch/../url-watches.json",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Access to datastore should be blocked"
# Test 5: Null byte injection
res = client.get(
f"/api/v1/watch/{valid_uuid}%00.json",
headers={'x-api-key': api_key}
)
# Should either work (ignoring null byte) or reject - but not crash
assert res.status_code in [200, 400, 404]
# Test 6: DELETE with path traversal
res = client.delete(
"/api/v1/watch/../../datastore/url-watches.json",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404, 405], "DELETE with traversal should be blocked (405=method not allowed is also acceptable)"
# Cleanup
client.delete(url_for("watch", uuid=valid_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_injection_via_headers_and_proxy(client, live_server, measure_memory_usage, datastore_path):
"""
Test that injection attacks via headers and proxy fields are properly sanitized.
Addresses XSS and injection vulnerabilities.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: XSS in headers
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": {
"User-Agent": "<script>alert(1)</script>",
"X-Custom": "'; DROP TABLE watches; --"
}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Headers are metadata used for HTTP requests, not HTML rendering
# Storing them as-is is expected behavior
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
# Verify headers are stored (API returns JSON, not HTML, so no XSS risk)
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
assert res.status_code == 200
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Null bytes in headers
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": {"X-Test": "value\x00null"}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle null bytes gracefully (reject or sanitize)
assert res.status_code in [201, 400]
# Test 3: Malformed proxy string
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"proxy": "http://evil.com:8080@victim.com"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject invalid proxy format
assert res.status_code == 400
# Test 4: Control characters in notification title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_title": "Test\r\nInjected-Header: value"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept but sanitize control characters
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_large_payload_dos(client, live_server, measure_memory_usage, datastore_path):
"""
Test that excessively large payloads are rejected to prevent DoS.
Addresses memory leak issues found in changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Huge ignore_text array
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"ignore_text": ["a" * 10000] * 100 # 1MB of data
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should either accept (with limits) or reject
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Massive headers object
huge_headers = {f"X-Header-{i}": "x" * 1000 for i in range(100)}
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": huge_headers
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject or truncate
assert res.status_code in [201, 400, 413]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Huge browser_steps array
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click", "selector": "#test" * 1000, "optional_value": ""}
] * 100
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject or limit
assert res.status_code in [201, 400, 413]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 4: Extremely long title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "x" * 100000 # 100KB title
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject (exceeds maxLength: 5000)
assert res.status_code == 400
delete_all_watches(client)
def test_api_utf8_encoding_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test UTF-8 encoding edge cases that have caused bugs on Windows.
Addresses 18+ encoding bugs from changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Unicode in title (should work)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "Test 中文 Ελληνικά 日本語 🔥"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
# Verify it round-trips correctly
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
assert res.status_code == 200
assert "中文" in res.json.get('title')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Unicode in URL query parameters
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url + "?search=日本語"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle URL encoding properly
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Null byte in title (should be rejected or sanitized)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "Test\x00Title"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle gracefully
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 4: BOM (Byte Order Mark) in title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "\ufeffTest with BOM"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_concurrency_race_conditions(client, live_server, measure_memory_usage, datastore_path):
"""
Test concurrent API requests to detect race conditions.
Addresses 20+ concurrency bugs from changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create a watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Concurrency test"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 1: Concurrent updates to same watch
# Note: Flask test client is not thread-safe, so we test sequential updates instead
# Real concurrency issues would be caught in integration tests with actual HTTP requests
results = []
for i in range(10):
try:
r = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"title": f"Title {i}"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
results.append(r.status_code)
except Exception as e:
results.append(str(e))
# All updates should succeed (200) without crashes
assert all(r == 200 for r in results), f"Some updates failed: {results}"
# Test 2: Update while watch is being checked
# Queue a recheck
client.get(
url_for("watch", uuid=watch_uuid, recheck=True),
headers={'x-api-key': api_key}
)
# Immediately update it
res = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"title": "Updated during check"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should succeed without error
assert res.status_code == 200
# Test 3: Delete watch that's being processed
# Create another watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
watch_uuid2 = res.json.get('uuid')
# Queue it for checking
client.get(url_for("watch", uuid=watch_uuid2, recheck=True), headers={'x-api-key': api_key})
# Immediately delete it
res = client.delete(url_for("watch", uuid=watch_uuid2), headers={'x-api-key': api_key})
# Should succeed or return appropriate error
assert res.status_code in [204, 404, 400]
# Cleanup
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
# ============================================================================
# TIER 2: IMPORTANT FUNCTIONALITY TESTS
# ============================================================================
def test_api_time_validation_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test time_between_check validation edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Zero interval
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": 0}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "Zero interval should be rejected"
# Test 2: Negative interval
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": -100}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "Negative interval should be rejected"
# Test 3: All fields null with use_default=false
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"weeks": None, "days": None, "hours": None, "minutes": None, "seconds": None}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "All null intervals should be rejected when not using default"
# Test 4: Extremely large interval (overflow risk)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"weeks": 999999999}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should either accept (with limits) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 5: Valid minimal interval (should work)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": 60}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_browser_steps_validation(client, live_server, measure_memory_usage, datastore_path):
"""
Test browser_steps validation for invalid operations and structures.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Empty browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "", "selector": "", "optional_value": ""}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (empty is valid as null)
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Invalid operation type
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "invalid_operation", "selector": "#test", "optional_value": ""}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (validation happens at runtime) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Missing required fields in browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click"} # Missing selector and optional_value
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected due to schema validation
assert res.status_code == 400
# Test 4: Extra fields in browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click", "selector": "#test", "optional_value": "", "extra_field": "value"}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected due to additionalProperties: false
assert res.status_code == 400
delete_all_watches(client)
def test_api_queue_manipulation(client, live_server, measure_memory_usage, datastore_path):
"""
Test queue behavior under stress and edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Create many watches rapidly
watch_uuids = []
for i in range(20):
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": f"Watch {i}"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
if res.status_code == 201:
watch_uuids.append(res.json.get('uuid'))
assert len(watch_uuids) == 20, "Should be able to create 20 watches"
# Test 2: Recheck all when watches exist
res = client.get(
url_for("createwatch", recheck_all='1'),
headers={'x-api-key': api_key},
)
# Should return success (200 or 202 for background processing)
assert res.status_code in [200, 202]
# Test 3: Verify queue doesn't overflow with moderate load
# The app has MAX_QUEUE_SIZE = 5000, we're well below that
wait_for_all_checks(client)
# Cleanup
for uuid in watch_uuids:
client.delete(url_for("watch", uuid=uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
# ============================================================================
# TIER 3: EDGE CASES & POLISH
# ============================================================================
def test_api_history_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test history API with invalid timestamps and edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create watch and generate history
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 1: Get history with invalid timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="invalid"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Invalid timestamp should return 404"
# Test 2: Future timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="9999999999"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Future timestamp should return 404"
# Test 3: Negative timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="-1"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Negative timestamp should return 404"
# Test 4: Diff with reversed timestamps (from > to)
# First get actual timestamps
res = client.get(
url_for("watchhistory", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
if len(res.json) >= 2:
timestamps = sorted(res.json.keys())
# Try reversed order
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp=timestamps[-1], to_timestamp=timestamps[0]),
headers={'x-api-key': api_key}
)
# Should either work (show reverse diff) or return error
assert res.status_code in [200, 400]
# Cleanup
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_notification_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test notification configuration edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Invalid notification URL
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_urls": ["invalid://url", "ftp://test.com"]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (apprise validates at runtime) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Invalid notification format
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_format": "invalid_format"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected by schema
assert res.status_code == 400
# Test 3: Empty notification arrays
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_urls": []
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (empty is valid)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_tag_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test tag/group API edge cases including XSS and path traversal.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Test 1: Empty tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": ""}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected (empty title)
assert res.status_code == 400
# Test 2: XSS in tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "<script>alert(1)</script>"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept but sanitize
if res.status_code == 201:
tag_uuid = res.json.get('uuid')
# Verify title is stored safely
res = client.get(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Should be escaped or sanitized
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Test 3: Path traversal in tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "../../etc/passwd"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (it's just a string, not a path)
if res.status_code == 201:
tag_uuid = res.json.get('uuid')
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Test 4: Very long tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "x" * 10000}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected (exceeds maxLength)
assert res.status_code == 400
def test_api_authentication_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test API authentication edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Missing API key
res = client.get(url_for("createwatch"))
assert res.status_code == 403, "Missing API key should be forbidden"
# Test 2: Invalid API key
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "invalid_key_12345"}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 3: API key with special characters
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "key<script>alert(1)</script>"}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 4: Very long API key
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "x" * 10000}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 5: Case sensitivity of API key
wrong_case_key = api_key.upper() if api_key.islower() else api_key.lower()
res = client.get(
url_for("createwatch"),
headers={'x-api-key': wrong_case_key}
)
# Should be forbidden (keys are case-sensitive)
assert res.status_code == 403, "Wrong case API key should be forbidden"
# Test 6: Valid API key should work
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200, "Valid API key should work"

View File

@@ -53,11 +53,21 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path):
backup = ZipFile(io.BytesIO(res.data))
l = backup.namelist()
uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
newlist = list(filter(uuid4hex.match, l)) # Read Note below
# Check for UUID-based txt files (history and snapshot)
uuid4hex_txt = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
txt_files = list(filter(uuid4hex_txt.match, l))
# Should be two txt files in the archive (history and the snapshot)
assert len(newlist) == 2
assert len(txt_files) == 2
# Check for watch.json files (new format)
uuid4hex_json = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}/watch\.json$', re.I)
json_files = list(filter(uuid4hex_json.match, l))
# Should be one watch.json file in the archive (the imported watch)
assert len(json_files) == 1, f"Expected 1 watch.json file, found {len(json_files)}: {json_files}"
# Check for changedetection.json (settings file)
assert 'changedetection.json' in l, "changedetection.json should be in backup"
# Get the latest one
res = client.get(

View File

@@ -59,11 +59,29 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
# Wait for the sync DB save to happen
time.sleep(2)
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
# Check which format is being used
datastore_path = live_server.app.config['DATASTORE'].datastore_path
changedetection_json = os.path.join(datastore_path, 'changedetection.json')
url_watches_json = os.path.join(datastore_path, 'url-watches.json')
json_obj = None
with open(json_db_file, 'r', encoding='utf-8') as f:
json_obj = json.load(f)
json_obj = {'watching': {}}
if os.path.exists(changedetection_json):
# New format: individual watch.json files
logger.info("Testing with new format (changedetection.json + individual watch.json)")
# Load each watch.json file
for uuid in live_server.app.config['DATASTORE'].data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.isfile(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
json_obj['watching'][uuid] = json.load(f)
else:
# Legacy format: url-watches.json
logger.info("Testing with legacy format (url-watches.json)")
with open(url_watches_json, 'r', encoding='utf-8') as f:
json_obj = json.load(f)
# assert the right amount of watches was found in the JSON
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
@@ -88,7 +106,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
# Find the snapshot one
for fname in files_in_watch_dir:
if fname != 'history.txt' and 'html' not in fname:
if fname != 'history.txt' and fname != 'watch.json' and 'html' not in fname:
if strtobool(os.getenv("TEST_WITH_BROTLI")):
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
@@ -105,11 +123,23 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
assert json_obj['watching'][w]['title'], "Watch should have a title set"
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
# With new format, we also have watch.json, so 4 files total
if os.path.exists(changedetection_json):
assert len(files_in_watch_dir) == 4, "Should be four files in the dir with new format: watch.json, html.br snapshot, history.txt and the extracted text snapshot"
else:
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir with legacy format: html.br snapshot, history.txt and the extracted text snapshot"
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
with open(json_db_file, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
# Check that 'default' Watch vars aren't accidentally being saved
if os.path.exists(changedetection_json):
# New format: check all individual watch.json files
for uuid in json_obj['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
with open(watch_json_file, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), f"'default' probably shouldnt be here in {watch_json_file}, it came from when the 'default' Watch vars were accidently being saved"
else:
# Legacy format: check url-watches.json
with open(url_watches_json, 'r', encoding='utf-8') as f:
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -325,3 +325,274 @@ def test_time_unit_translations(client, live_server, measure_memory_usage, datas
assert b"Time Between Check" not in res.data, "Should not have English 'Time Between Check'"
assert "Chrome 請求".encode() not in res.data, "Should not have incorrect 'Chrome 請求' (Chrome requests)"
assert "使用預設通知".encode() not in res.data, "Should not have incorrect '使用預設通知' (Use default notification)"
def test_accept_language_header_zh_tw(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending zh-TW in Accept-Language header get Traditional Chinese.
This tests the locale alias mapping for issue #3779.
"""
from flask import url_for
# Clear any session data to simulate a fresh visitor
with client.session_transaction() as sess:
sess.clear()
# Request the index page with zh-TW in Accept-Language header (what browsers send)
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Traditional Chinese content, not Simplified Chinese
# Traditional: 選擇語言, Simplified: 选择语言
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' (Select Language)"
assert '选择语言'.encode() not in res.data, "Should not get Simplified Chinese '选择语言'"
# Check HTML lang attribute uses BCP 47 format
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected BCP 47 language tag zh-Hant-TW in HTML"
# Check that the correct flag icon is shown (Taiwan flag for Traditional Chinese)
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
"Expected Taiwan flag 'fi fi-tw' for Traditional Chinese"
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' not in res.data, \
"Should not show China flag 'fi fi-cn' for Traditional Chinese"
# Verify we're getting Traditional Chinese text throughout the page
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Check Traditional Chinese translations (not English)
assert "小時".encode() in res.data, "Expected Traditional Chinese '小時' for Hours"
assert "分鐘".encode() in res.data, "Expected Traditional Chinese '分鐘' for Minutes"
assert b"Hours" not in res.data or "小時".encode() in res.data, "Should have Traditional Chinese, not English"
def test_accept_language_header_en_variants(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending en-GB and en-US in Accept-Language header get the correct English variant.
This ensures the locale selector works properly for English variants.
"""
from flask import url_for
# Test 1: British English (en-GB)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en-GB,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
# Check HTML lang attribute uses BCP 47 format with hyphen
assert b'<html lang="en-GB"' in res.data, "Expected BCP 47 language tag en-GB in HTML"
# Check that the correct flag icon is shown (UK flag for en-GB)
assert b'<span class="fi fi-gb fis" id="language-selector-flag">' in res.data, \
"Expected UK flag 'fi fi-gb' for British English"
# Test 2: American English (en-US)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en-US,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
# Check HTML lang attribute uses BCP 47 format with hyphen
assert b'<html lang="en-US"' in res.data, "Expected BCP 47 language tag en-US in HTML"
# Check that the correct flag icon is shown (US flag for en-US)
assert b'<span class="fi fi-us fis" id="language-selector-flag">' in res.data, \
"Expected US flag 'fi fi-us' for American English"
# Test 3: Generic 'en' should fall back to one of the English variants
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content (either variant is fine)
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
def test_accept_language_header_zh_simplified(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending zh or zh-CN in Accept-Language header get Simplified Chinese.
This ensures Simplified Chinese still works correctly and doesn't get confused with Traditional.
"""
from flask import url_for
# Test 1: Generic 'zh' should get Simplified Chinese
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Simplified Chinese content, not Traditional Chinese
# Simplified: 选择语言, Traditional: 選擇語言
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' (Select Language)"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese '選擇語言'"
# Check HTML lang attribute
assert b'<html lang="zh"' in res.data, "Expected language tag zh in HTML"
# Check that the correct flag icon is shown (China flag for Simplified Chinese)
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
"Expected China flag 'fi fi-cn' for Simplified Chinese"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
"Should not show Taiwan flag 'fi fi-tw' for Simplified Chinese"
# Test 2: 'zh-CN' should also get Simplified Chinese
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Simplified Chinese content
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' with zh-CN header"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese with zh-CN header"
# Check that the correct flag icon is shown (China flag for zh-CN)
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
"Expected China flag 'fi fi-cn' for zh-CN header"
# Verify Simplified Chinese in settings page
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Check Simplified Chinese translations (not Traditional or English)
# Simplified: 小时, Traditional: 小時
assert "小时".encode() in res.data, "Expected Simplified Chinese '小时' for Hours"
assert "分钟".encode() in res.data, "Expected Simplified Chinese '分钟' for Minutes"
assert "".encode() in res.data, "Expected Simplified Chinese '' for Seconds"
# Make sure it's not Traditional Chinese
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時'"
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘'"
def test_session_locale_overrides_accept_language(client, live_server, measure_memory_usage, datastore_path):
"""
Test that session locale preference overrides browser Accept-Language header.
Scenario:
1. Browser auto-detects zh-TW (Traditional Chinese) from Accept-Language header
2. User explicitly selects Korean language
3. On subsequent page loads, Korean should be shown (not Traditional Chinese)
even though the Accept-Language header still says zh-TW
This tests the session override behavior for issue #3779.
"""
from flask import url_for
# Step 1: Clear session and make first request with zh-TW header (auto-detect)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should initially get Traditional Chinese from auto-detect
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' from auto-detect"
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected zh-Hant-TW language tag"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
"Expected Taiwan flag 'fi fi-tw' from auto-detect"
# Step 2: User explicitly selects Korean language
res = client.get(
url_for("set_language", locale="ko"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Browser still sends zh-TW
follow_redirects=True
)
assert res.status_code == 200
# Step 3: Make another request with same zh-TW header
# Session should override the Accept-Language header
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still sending zh-TW!
follow_redirects=True
)
assert res.status_code == 200
# Should now get Korean (session overrides auto-detect)
# Korean: 언어 선택, Traditional Chinese: 選擇語言
assert '언어 선택'.encode() in res.data, "Expected Korean '언어 선택' (Select Language) from session"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese when Korean is set in session"
# Check HTML lang attribute is Korean
assert b'<html lang="ko"' in res.data, "Expected Korean language tag 'ko' in HTML"
# Check that Korean flag is shown (not Taiwan flag)
assert b'<span class="fi fi-kr fis" id="language-selector-flag">' in res.data, \
"Expected Korean flag 'fi fi-kr' from session preference"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
"Should not show Taiwan flag when Korean is set in session"
# Verify Korean text on settings page as well
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still zh-TW!
follow_redirects=True
)
assert res.status_code == 200
# Check Korean translations (not Traditional Chinese or English)
# Korean: 시간 (Hours), 분 (Minutes), 초 (Seconds)
# Traditional Chinese: 小時, 分鐘, 秒
assert "시간".encode() in res.data, "Expected Korean '시간' for Hours"
assert "".encode() in res.data, "Expected Korean '' for Minutes"
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時' when Korean is set"
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘' when Korean is set"

View File

@@ -142,10 +142,14 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watches_with_body = 0
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['body']==body_value:
# Read individual watch.json files
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
if watch_data.get('body') == body_value:
watches_with_body += 1
# Should be only one with body set
@@ -225,10 +229,14 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_
wait_for_all_checks(client)
watches_with_method = 0
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
app_struct = json.load(f)
for uuid in app_struct['watching']:
if app_struct['watching'][uuid]['method'] == 'PATCH':
# Read individual watch.json files
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
with open(watch_json_file, 'r', encoding='utf-8') as f:
watch_data = json.load(f)
if watch_data.get('method') == 'PATCH':
watches_with_method += 1
# Should be only one with method set to PATCH

View File

@@ -64,6 +64,19 @@ def is_safe_valid_url(test_url):
import re
import validators
# Validate input type first - must be a non-empty string
if test_url is None:
logger.warning('URL validation failed: URL is None')
return False
if not isinstance(test_url, str):
logger.warning(f'URL validation failed: URL must be a string, got {type(test_url).__name__}')
return False
if not test_url.strip():
logger.warning('URL validation failed: URL is empty or whitespace only')
return False
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'

View File

@@ -183,15 +183,30 @@ components:
properties:
weeks:
type: integer
minimum: 0
maximum: 52000
nullable: true
days:
type: integer
minimum: 0
maximum: 365000
nullable: true
hours:
type: integer
minimum: 0
maximum: 8760000
nullable: true
minutes:
type: integer
minimum: 0
maximum: 525600000
nullable: true
seconds:
type: integer
description: Time intervals between checks
minimum: 0
maximum: 31536000000
nullable: true
description: Time intervals between checks. All fields must be non-negative. At least one non-zero value required when not using default settings.
time_between_check_use_default:
type: boolean
default: true
@@ -200,7 +215,9 @@ components:
type: array
items:
type: string
description: Notification URLs for this web page change monitor (watch)
maxLength: 1000
maxItems: 100
description: Notification URLs for this web page change monitor (watch). Maximum 100 URLs.
notification_title:
type: string
description: Custom notification title
@@ -224,14 +241,19 @@ components:
operation:
type: string
maxLength: 5000
nullable: true
selector:
type: string
maxLength: 5000
nullable: true
optional_value:
type: string
maxLength: 5000
nullable: true
required: [operation, selector, optional_value]
description: Browser automation steps
additionalProperties: false
maxItems: 100
description: Browser automation steps. Maximum 100 steps allowed.
processor:
type: string
enum: [restock_diff, text_json_diff]