Compare commits

..

7 Commits

Author SHA1 Message Date
dgtlmoon
063ee38099 Merge branch 'master' into processor-plugin-improvements
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-18 00:56:30 +01:00
dgtlmoon
5007b8201e Merge branch 'master' into processor-plugin-improvements
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-15 23:14:30 +01:00
dgtlmoon
7e853a4b46 Configurable paackages 2026-01-14 19:01:30 +01:00
dgtlmoon
4dc5301de4 Downgrade for osint plugin and handle missing processors better 2026-01-14 18:22:23 +01:00
dgtlmoon
edf0989cd4 Put the type badge first in the list 2026-01-14 17:34:54 +01:00
dgtlmoon
423b546948 Refactor of processor extra configs 2026-01-14 17:29:20 +01:00
dgtlmoon
c1c810a79a Misc fixes for processor plugins 2026-01-14 16:59:20 +01:00
29 changed files with 901 additions and 2036 deletions

View File

@@ -132,6 +132,15 @@ ENV LOGGER_LEVEL="$LOGGER_LEVEL"
ENV LC_ALL=en_US.UTF-8
WORKDIR /app
# Copy and set up entrypoint script for installing extra packages
COPY docker-entrypoint.sh /docker-entrypoint.sh
RUN chmod +x /docker-entrypoint.sh
# Set entrypoint to handle EXTRA_PACKAGES env var
ENTRYPOINT ["/docker-entrypoint.sh"]
# Default command (can be overridden in docker-compose.yml)
CMD ["python", "./changedetection.py", "-d", "/datastore"]

View File

@@ -16,7 +16,6 @@ recursive-include changedetectionio/widgets *
prune changedetectionio/static/package-lock.json
prune changedetectionio/static/styles/node_modules
prune changedetectionio/static/styles/package-lock.json
include changedetectionio/favicon_utils.py
include changedetection.py
include requirements.txt
include README-pip.md

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.52.7'
__version__ = '0.52.6'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -2,7 +2,6 @@ import os
import threading
from changedetectionio.validate_url import is_safe_valid_url
from changedetectionio.favicon_utils import get_favicon_mime_type
from . import auth
from changedetectionio import queuedWatchMetaData, strtobool
@@ -69,17 +68,13 @@ class Watch(Resource):
import time
from copy import deepcopy
watch = None
# Retry up to 20 times if dict is being modified
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
for attempt in range(20):
for _ in range(20):
try:
watch = deepcopy(self.datastore.data['watching'].get(uuid))
break
except RuntimeError:
# Dict changed during deepcopy, retry after yielding to scheduler
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
if attempt < 19: # Don't yield on last attempt
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
# Incase dict changed, try again
time.sleep(0.01)
if not watch:
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
@@ -131,86 +126,32 @@ class Watch(Resource):
if request.json.get('proxy'):
plist = self.datastore.proxy_list
if not plist or request.json.get('proxy') not in plist:
proxy_list_str = ', '.join(plist) if plist else 'none configured'
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
if not request.json.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(request.json)
if validation_error:
return validation_error, 400
# XSS etc protection - validate URL if it's being updated
if 'url' in request.json:
new_url = request.json.get('url')
# URL must be a non-empty string
if new_url is None:
return "URL cannot be null", 400
if not isinstance(new_url, str):
return "URL must be a string", 400
if not new_url.strip():
return "URL cannot be empty or whitespace only", 400
if not is_safe_valid_url(new_url.strip()):
return "Invalid or unsupported URL format. URL must use http://, https://, or ftp:// protocol", 400
# XSS etc protection
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
return "Invalid URL", 400
# Handle processor-config-* fields separately (save to JSON, not datastore)
from changedetectionio import processors
processor_config_data = {}
regular_data = {}
for key, value in request.json.items():
if key.startswith('processor_config_'):
config_key = key.replace('processor_config_', '')
if value: # Only save non-empty values
processor_config_data[config_key] = value
else:
regular_data[key] = value
# Make a mutable copy of request.json for modification
json_data = dict(request.json)
# Extract and remove processor config fields from json_data
processor_config_data = processors.extract_processor_config_from_form_data(json_data)
# Update watch with regular (non-processor-config) fields
watch.update(regular_data)
watch.update(json_data)
# Save processor config to JSON file if any config data exists
if processor_config_data:
try:
processor_name = request.json.get('processor', watch.get('processor'))
if processor_name:
# Create a processor instance to access config methods
from changedetectionio.processors import difference_detection_processor
processor_instance = difference_detection_processor(self.datastore, uuid)
# Use processor name as filename so each processor keeps its own config
config_filename = f'{processor_name}.json'
processor_instance.update_extra_watch_config(config_filename, processor_config_data)
logger.debug(f"API: Saved processor config to {config_filename}: {processor_config_data}")
# Call optional edit_hook if processor has one
try:
import importlib
edit_hook_module_name = f'changedetectionio.processors.{processor_name}.edit_hook'
try:
edit_hook = importlib.import_module(edit_hook_module_name)
logger.debug(f"API: Found edit_hook module for {processor_name}")
if hasattr(edit_hook, 'on_config_save'):
logger.info(f"API: Calling edit_hook.on_config_save for {processor_name}")
# Call hook and get updated config
updated_config = edit_hook.on_config_save(watch, processor_config_data, self.datastore)
# Save updated config back to file
processor_instance.update_extra_watch_config(config_filename, updated_config)
logger.info(f"API: Edit hook updated config: {updated_config}")
else:
logger.debug(f"API: Edit hook module found but no on_config_save function")
except ModuleNotFoundError:
logger.debug(f"API: No edit_hook module for processor {processor_name} (this is normal)")
except Exception as hook_error:
logger.error(f"API: Edit hook error (non-fatal): {hook_error}", exc_info=True)
except Exception as e:
logger.error(f"API: Failed to save processor config: {e}")
# Save processor config to JSON file
processors.save_processor_config(self.datastore, uuid, processor_config_data)
return "OK", 200
@@ -251,10 +192,6 @@ class WatchSingleHistory(Resource):
if timestamp == 'latest':
timestamp = list(watch.history.keys())[-1]
# Validate that the timestamp exists in history
if timestamp not in watch.history:
abort(404, message=f"No history snapshot found for timestamp '{timestamp}'")
if request.args.get('html'):
content = watch.get_fetched_html(timestamp)
if content:
@@ -403,9 +340,16 @@ class WatchFavicon(Resource):
favicon_filename = watch.get_favicon_filename()
if favicon_filename:
# Use cached MIME type detection
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
mime = get_favicon_mime_type(filepath)
try:
import magic
mime = magic.from_file(
os.path.join(watch.watch_data_dir, favicon_filename),
mime=True
)
except ImportError:
# Fallback, no python-magic
import mimetypes
mime, encoding = mimetypes.guess_type(favicon_filename)
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
response.headers['Content-type'] = mime
@@ -435,9 +379,8 @@ class CreateWatch(Resource):
if json_data.get('proxy'):
plist = self.datastore.proxy_list
if not plist or json_data.get('proxy') not in plist:
proxy_list_str = ', '.join(plist) if plist else 'none configured'
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
if not json_data.get('proxy') in plist:
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
# Validate time_between_check when not using defaults
validation_error = validate_time_between_check_required(json_data)

View File

@@ -30,23 +30,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
app: Flask application instance
datastore: Application datastore
executor: ThreadPoolExecutor for queue operations (optional)
Returns:
"restart" if worker should restart, "shutdown" for clean exit
"""
# Set a descriptive name for this task
task = asyncio.current_task()
if task:
task.set_name(f"async-worker-{worker_id}")
# Read restart policy from environment
max_jobs = int(os.getenv("WORKER_MAX_JOBS", "10"))
max_runtime_seconds = int(os.getenv("WORKER_MAX_RUNTIME", "3600")) # 1 hour default
jobs_processed = 0
start_time = time.time()
logger.info(f"Starting async worker {worker_id} (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
logger.info(f"Starting async worker {worker_id}")
while not app.config.exit.is_set():
update_handler = None
@@ -61,11 +51,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
)
except asyncio.TimeoutError:
# No jobs available - check if we should restart based on time while idle
runtime = time.time() - start_time
if runtime >= max_runtime_seconds:
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
return "restart"
# No jobs available, continue loop
continue
except Exception as e:
# Handle expected Empty exception from queue timeout
@@ -131,11 +117,14 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
processor = watch.get('processor', 'text_json_diff')
# Init a new 'difference_detection_processor'
try:
processor_module = importlib.import_module(f"changedetectionio.processors.{processor}.processor")
except ModuleNotFoundError as e:
print(f"Processor module '{processor}' not found.")
raise e
# Use get_processor_module() to support both built-in and plugin processors
from changedetectionio.processors import get_processor_module
processor_module = get_processor_module(processor)
if not processor_module:
error_msg = f"Processor module '{processor}' not found."
logger.error(error_msg)
raise ModuleNotFoundError(error_msg)
update_handler = processor_module.perform_site_check(datastore=datastore,
watch_uuid=uuid)
@@ -502,19 +491,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
# Small yield for normal completion
await asyncio.sleep(0.01)
# Job completed - increment counter and check restart conditions
jobs_processed += 1
runtime = time.time() - start_time
# Check if we should restart (only when idle, between jobs)
should_restart_jobs = jobs_processed >= max_jobs
should_restart_time = runtime >= max_runtime_seconds
if should_restart_jobs or should_restart_time:
reason = f"{jobs_processed} jobs" if should_restart_jobs else f"{runtime:.0f}s runtime"
logger.info(f"Worker {worker_id} restarting after {reason} ({jobs_processed} jobs, {runtime:.0f}s runtime)")
return "restart"
# Check if we should exit
if app.config.exit.is_set():
break
@@ -522,12 +498,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
# Check if we're in pytest environment - if so, be more gentle with logging
import sys
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info(f"Worker {worker_id} shutting down")
return "shutdown"
def cleanup_error_artifacts(uuid, datastore):
"""Helper function to clean up error artifacts"""

View File

@@ -101,23 +101,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's difference module
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
# Try to get the processor's difference module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'difference')
# Call the processor's render() function
if hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have a difference module, falling back to text_json_diff: {e}")
# Call the processor's render() function
if processor_module and hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
# Fallback: if processor doesn't have difference module, use text_json_diff as default
from changedetectionio.processors.text_json_diff.difference import render as default_render
@@ -157,23 +155,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's extract module
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.extract')
# Try to get the processor's extract module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'extract')
# Call the processor's render_form() function
if hasattr(processor_module, 'render_form'):
return processor_module.render_form(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have an extract module, falling back to base extractor: {e}")
# Call the processor's render_form() function
if processor_module and hasattr(processor_module, 'render_form'):
return processor_module.render_form(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
# Fallback: if processor doesn't have extract module, use base processors.extract as default
from changedetectionio.processors.extract import render_form as default_render_form
@@ -213,24 +209,22 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's extract module
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.extract')
# Try to get the processor's extract module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'extract')
# Call the processor's process_extraction() function
if hasattr(processor_module, 'process_extraction'):
return processor_module.process_extraction(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
make_response=make_response,
send_from_directory=send_from_directory,
flash=flash,
redirect=redirect
)
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have an extract module, falling back to base extractor: {e}")
# Call the processor's process_extraction() function
if processor_module and hasattr(processor_module, 'process_extraction'):
return processor_module.process_extraction(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
make_response=make_response,
send_from_directory=send_from_directory,
flash=flash,
redirect=redirect
)
# Fallback: if processor doesn't have extract module, use base processors.extract as default
from changedetectionio.processors.extract import process_extraction as default_process_extraction
@@ -280,38 +274,33 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's difference module
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
# Try to get the processor's difference module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'difference')
# Call the processor's get_asset() function
if hasattr(processor_module, 'get_asset'):
result = processor_module.get_asset(
asset_name=asset_name,
watch=watch,
datastore=datastore,
request=request
)
# Call the processor's get_asset() function
if processor_module and hasattr(processor_module, 'get_asset'):
result = processor_module.get_asset(
asset_name=asset_name,
watch=watch,
datastore=datastore,
request=request
)
if result is None:
from flask import abort
abort(404, description=f"Asset '{asset_name}' not found")
binary_data, content_type, cache_control = result
response = make_response(binary_data)
response.headers['Content-Type'] = content_type
if cache_control:
response.headers['Cache-Control'] = cache_control
return response
else:
logger.warning(f"Processor {processor_name} does not implement get_asset()")
if result is None:
from flask import abort
abort(404, description=f"Processor '{processor_name}' does not support assets")
abort(404, description=f"Asset '{asset_name}' not found")
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have a difference module: {e}")
binary_data, content_type, cache_control = result
response = make_response(binary_data)
response.headers['Content-Type'] = content_type
if cache_control:
response.headers['Cache-Control'] = cache_control
return response
else:
logger.warning(f"Processor {processor_name} does not implement get_asset()")
from flask import abort
abort(404, description=f"Processor '{processor_name}' not found")
abort(404, description=f"Processor '{processor_name}' does not support assets")
return diff_blueprint

View File

@@ -72,8 +72,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
processor_name = datastore.data['watching'][uuid].get('processor', '')
processor_classes = next((tpl for tpl in processors.find_processors() if tpl[1] == processor_name), None)
if not processor_classes:
flash(gettext("Cannot load the edit form for processor/plugin '{}', plugin missing?").format(processor_classes[1]), 'error')
return redirect(url_for('watchlist.index'))
flash(gettext("Could not load '{}' processor, processor plugin might be missing. Please select a different processor.").format(processor_name), 'error')
# Fall back to default processor so user can still edit and change processor
processor_classes = next((tpl for tpl in processors.find_processors() if tpl[1] == 'text_json_diff'), None)
if not processor_classes:
# If even text_json_diff is missing, something is very wrong
flash(gettext("Could not load '{}' processor, processor plugin might be missing.").format(processor_name), 'error')
return redirect(url_for('watchlist.index'))
parent_module = processors.get_parent_module(processor_classes[0])
@@ -150,58 +155,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
extra_update_obj['time_between_check'] = form.time_between_check.data
# Handle processor-config-* fields separately (save to JSON, not datastore)
processor_config_data = {}
fields_to_remove = []
for field_name, field_value in form.data.items():
if field_name.startswith('processor_config_'):
config_key = field_name.replace('processor_config_', '')
if field_value: # Only save non-empty values
processor_config_data[config_key] = field_value
fields_to_remove.append(field_name)
# Save processor config to JSON file if any config data exists
if processor_config_data:
try:
processor_name = form.data.get('processor')
# Create a processor instance to access config methods
processor_instance = processors.difference_detection_processor(datastore, uuid)
# Use processor name as filename so each processor keeps its own config
config_filename = f'{processor_name}.json'
processor_instance.update_extra_watch_config(config_filename, processor_config_data)
logger.debug(f"Saved processor config to {config_filename}: {processor_config_data}")
# Call optional edit_hook if processor has one
try:
# Try to import the edit_hook module from the processor package
import importlib
edit_hook_module_name = f'changedetectionio.processors.{processor_name}.edit_hook'
try:
edit_hook = importlib.import_module(edit_hook_module_name)
logger.debug(f"Found edit_hook module for {processor_name}")
if hasattr(edit_hook, 'on_config_save'):
logger.info(f"Calling edit_hook.on_config_save for {processor_name}")
watch_obj = datastore.data['watching'][uuid]
# Call hook and get updated config
updated_config = edit_hook.on_config_save(watch_obj, processor_config_data, datastore)
# Save updated config back to file
processor_instance.update_extra_watch_config(config_filename, updated_config)
logger.info(f"Edit hook updated config: {updated_config}")
else:
logger.debug(f"Edit hook module found but no on_config_save function")
except ModuleNotFoundError:
logger.debug(f"No edit_hook module for processor {processor_name} (this is normal)")
except Exception as hook_error:
logger.error(f"Edit hook error (non-fatal): {hook_error}", exc_info=True)
except Exception as e:
logger.error(f"Failed to save processor config: {e}")
# Remove processor-config-* fields from form.data before updating datastore
for field_name in fields_to_remove:
form.data.pop(field_name, None)
# Handle processor-config-* fields separately (save to JSON, not datastore)
# IMPORTANT: These must NOT be saved to url-watches.json, only to the processor-specific JSON file
processor_config_data = processors.extract_processor_config_from_form_data(form.data)
processors.save_processor_config(datastore, uuid, processor_config_data)
# Ignore text
form_ignore_text = form.ignore_text.data

View File

@@ -39,24 +39,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's preview module
import importlib
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.preview')
# Try to get the processor's preview module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'preview')
# Call the processor's render() function
if hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
except (ImportError, ModuleNotFoundError) as e:
logger.debug(f"Processor {processor_name} does not have a preview module, using default preview: {e}")
# Call the processor's render() function
if processor_module and hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
# Fallback: if processor doesn't have preview module, use default text preview
content = []
@@ -163,39 +160,33 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's preview module
import importlib
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.preview')
# Try to get the processor's preview module (works for both built-in and plugin processors)
from changedetectionio.processors import get_processor_submodule
processor_module = get_processor_submodule(processor_name, 'preview')
# Call the processor's get_asset() function
if hasattr(processor_module, 'get_asset'):
result = processor_module.get_asset(
asset_name=asset_name,
watch=watch,
datastore=datastore,
request=request
)
# Call the processor's get_asset() function
if processor_module and hasattr(processor_module, 'get_asset'):
result = processor_module.get_asset(
asset_name=asset_name,
watch=watch,
datastore=datastore,
request=request
)
if result is None:
from flask import abort
abort(404, description=f"Asset '{asset_name}' not found")
binary_data, content_type, cache_control = result
response = make_response(binary_data)
response.headers['Content-Type'] = content_type
if cache_control:
response.headers['Cache-Control'] = cache_control
return response
else:
logger.warning(f"Processor {processor_name} does not implement get_asset()")
if result is None:
from flask import abort
abort(404, description=f"Processor '{processor_name}' does not support assets")
abort(404, description=f"Asset '{asset_name}' not found")
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have a preview module: {e}")
binary_data, content_type, cache_control = result
response = make_response(binary_data)
response.headers['Content-Type'] = content_type
if cache_control:
response.headers['Cache-Control'] = cache_control
return response
else:
logger.warning(f"Processor {processor_name} does not implement get_asset()")
from flask import abort
abort(404, description=f"Processor '{processor_name}' not found")
abort(404, description=f"Processor '{processor_name}' does not support assets")
return preview_blueprint

View File

@@ -205,23 +205,24 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
</div>
{% endif %}
<div>
<span class="watch-title">
{% if system_use_url_watchlist or watch.get('use_page_title_in_list') %}
{{ watch.label }}
{% else %}
{{ watch.get('title') or watch.link }}
{% endif %}
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}">&nbsp;</a>
</span>
{%- if watch['processor'] and watch['processor'] in processor_badge_texts -%}
<span class="processor-badge processor-badge-{{ watch['processor'] }}" title="{{ processor_descriptions.get(watch['processor'], watch['processor']) }}">{{ processor_badge_texts[watch['processor']] }}</span>
{%- endif -%}
<span class="watch-title">
{% if system_use_url_watchlist or watch.get('use_page_title_in_list') %}
{{ watch.label }}
{% else %}
{{ watch.get('title') or watch.link }}
{% endif %}
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}">&nbsp;</a>
</span>
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list)|safe }}</div>
{%- if watch['processor'] == 'text_json_diff' -%}
{%- if watch['has_ldjson_price_data'] and not watch['track_ldjson_price_data'] -%}
<div class="ldjson-price-track-offer">Switch to Restock & Price watch mode? <a href="{{url_for('price_data_follower.accept', uuid=watch.uuid)}}" class="pure-button button-xsmall">Yes</a> <a href="{{url_for('price_data_follower.reject', uuid=watch.uuid)}}" class="">No</a></div>
{%- endif -%}
{%- endif -%}
{%- if watch['processor'] and watch['processor'] in processor_badge_texts -%}
<span class="processor-badge processor-badge-{{ watch['processor'] }}" title="{{ processor_descriptions.get(watch['processor'], watch['processor']) }}">{{ processor_badge_texts[watch['processor']] }}</span>
{%- endif -%}
{%- for watch_tag_uuid, watch_tag in datastore.get_all_tags_for_watch(watch['uuid']).items() -%}
<span class="watch-tag-list tag-{{ watch_tag.title|sanitize_tag_class }}">{{ watch_tag.title }}</span>
{%- endfor -%}

View File

@@ -1,43 +0,0 @@
"""
Favicon utilities for changedetection.io
Handles favicon MIME type detection with caching
"""
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_favicon_mime_type(filepath):
"""
Detect MIME type of favicon by reading file content using puremagic.
Results are cached to avoid repeatedly reading the same files.
Args:
filepath: Full path to the favicon file
Returns:
MIME type string (e.g., 'image/png')
"""
mime = None
try:
import puremagic
with open(filepath, 'rb') as f:
content_bytes = f.read(200) # Read first 200 bytes
detections = puremagic.magic_string(content_bytes)
if detections:
mime = detections[0].mime_type
except Exception:
pass
# Fallback to mimetypes if puremagic fails
if not mime:
import mimetypes
mime, _ = mimetypes.guess_type(filepath)
# Final fallback based on extension
if not mime:
mime = 'image/x-icon' if filepath.endswith('.ico') else 'image/png'
return mime

View File

@@ -44,8 +44,6 @@ from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, Watch
from changedetectionio.api.Search import Search
from .time_handler import is_within_schedule
from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale
from changedetectionio.favicon_utils import get_favicon_mime_type
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
datastore = None
@@ -71,13 +69,9 @@ socketio_server = None
CORS(app)
# Super handy for compressing large BrowserSteps responses and others
# Flask-Compress handles HTTP compression, Socket.IO compression disabled to prevent memory leak
compress = FlaskCompress()
app.config['COMPRESS_MIN_SIZE'] = 2096
FlaskCompress(app)
app.config['COMPRESS_MIN_SIZE'] = 4096
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml']
# Use gzip only - smaller memory footprint than zstd/brotli (4-8KB vs 200-500KB contexts)
app.config['COMPRESS_ALGORITHM'] = ['gzip']
compress.init_app(app)
app.config['TEMPLATES_AUTO_RELOAD'] = False
@@ -406,27 +400,11 @@ def changedetection_app(config=None, datastore_o=None):
language_codes = get_language_codes()
def get_locale():
# Locale aliases: map browser language codes to translation directory names
# This handles cases where browsers send standard codes (e.g., zh-TW)
# but our translations use more specific codes (e.g., zh_Hant_TW)
locale_aliases = {
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
}
# 1. Try to get locale from session (user explicitly selected)
if 'locale' in session:
return session['locale']
# 2. Fall back to Accept-Language header
# Get the best match from browser's Accept-Language header
browser_locale = request.accept_languages.best_match(language_codes + list(locale_aliases.keys()))
# 3. Check if we need to map the browser locale to our internal locale
if browser_locale in locale_aliases:
return locale_aliases[browser_locale]
return browser_locale
return request.accept_languages.best_match(language_codes)
# Initialize Babel with locale selector
babel = Babel(app, locale_selector=get_locale)
@@ -688,9 +666,16 @@ def changedetection_app(config=None, datastore_o=None):
favicon_filename = watch.get_favicon_filename()
if favicon_filename:
# Use cached MIME type detection
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
mime = get_favicon_mime_type(filepath)
try:
import magic
mime = magic.from_file(
os.path.join(watch.watch_data_dir, favicon_filename),
mime=True
)
except ImportError:
# Fallback, no python-magic
import mimetypes
mime, encoding = mimetypes.guess_type(favicon_filename)
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
response.headers['Content-type'] = mime

View File

@@ -105,6 +105,30 @@ class ChangeDetectionSpec:
"""
pass
@hookspec
def register_processor(self):
"""Register an external processor plugin.
External packages can implement this hook to register custom processors
that will be discovered alongside built-in processors.
Returns:
dict or None: Dictionary with processor information:
{
'processor_name': str, # Machine name (e.g., 'osint_recon')
'processor_module': module, # Module containing processor.py
'processor_class': class, # The perform_site_check class
'metadata': { # Optional metadata
'name': str, # Display name
'description': str, # Description
'processor_weight': int,# Sort weight (lower = higher priority)
'list_badge_text': str, # Badge text for UI
}
}
Return None if this plugin doesn't provide a processor
"""
pass
# Set up Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)

View File

@@ -17,9 +17,11 @@ def find_sub_packages(package_name):
return [name for _, name, is_pkg in pkgutil.iter_modules(package.__path__) if is_pkg]
@lru_cache(maxsize=1)
def find_processors():
"""
Find all subclasses of DifferenceDetectionProcessor in the specified package.
Results are cached to avoid repeated discovery.
:param package_name: The name of the package to scan for processor modules.
:return: A list of (module, class) tuples.
@@ -46,6 +48,22 @@ def find_processors():
except (ModuleNotFoundError, ImportError) as e:
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
# Discover plugin processors via pluggy
try:
from changedetectionio.pluggy_interface import plugin_manager
plugin_results = plugin_manager.hook.register_processor()
for result in plugin_results:
if result and isinstance(result, dict):
processor_module = result.get('processor_module')
processor_name = result.get('processor_name')
if processor_module and processor_name:
processors.append((processor_module, processor_name))
logger.info(f"Registered plugin processor: {processor_name}")
except Exception as e:
logger.warning(f"Error loading plugin processors: {e}")
return processors
@@ -97,54 +115,137 @@ def find_processor_module(processor_name):
return None
def get_processor_module(processor_name):
"""
Get the actual processor module (with perform_site_check class) by name.
Works for both built-in and plugin processors.
Args:
processor_name: Processor machine name (e.g., 'text_json_diff', 'osint_recon')
Returns:
module: The processor module containing perform_site_check, or None if not found
"""
processor_classes = find_processors()
processor_tuple = next((tpl for tpl in processor_classes if tpl[1] == processor_name), None)
if processor_tuple:
# Return the actual processor module (first element of tuple)
return processor_tuple[0]
return None
def get_processor_submodule(processor_name, submodule_name):
"""
Get an optional submodule from a processor (e.g., 'difference', 'extract', 'preview').
Works for both built-in and plugin processors.
Args:
processor_name: Processor machine name (e.g., 'text_json_diff', 'osint_recon')
submodule_name: Name of the submodule (e.g., 'difference', 'extract', 'preview')
Returns:
module: The submodule if it exists, or None if not found
"""
processor_classes = find_processors()
processor_tuple = next((tpl for tpl in processor_classes if tpl[1] == processor_name), None)
if not processor_tuple:
return None
processor_module = processor_tuple[0]
parent_module = get_parent_module(processor_module)
if not parent_module:
return None
# Try to import the submodule
try:
# For built-in processors: changedetectionio.processors.text_json_diff.difference
# For plugin processors: changedetectionio_osint.difference
parent_module_name = parent_module.__name__
submodule_full_name = f"{parent_module_name}.{submodule_name}"
return importlib.import_module(submodule_full_name)
except (ModuleNotFoundError, ImportError):
return None
@lru_cache(maxsize=1)
def get_plugin_processor_metadata():
"""Get metadata from plugin processors."""
metadata = {}
try:
from changedetectionio.pluggy_interface import plugin_manager
plugin_results = plugin_manager.hook.register_processor()
for result in plugin_results:
if result and isinstance(result, dict):
processor_name = result.get('processor_name')
meta = result.get('metadata', {})
if processor_name:
metadata[processor_name] = meta
except Exception as e:
logger.warning(f"Error getting plugin processor metadata: {e}")
return metadata
def available_processors():
"""
Get a list of processors by name and description for the UI elements.
Can be filtered via ALLOWED_PROCESSORS environment variable (comma-separated list).
Can be filtered via DISABLED_PROCESSORS environment variable (comma-separated list).
:return: A list :)
"""
processor_classes = find_processors()
# Check if ALLOWED_PROCESSORS env var is set
# For now we disable it, need to make a deploy with lots of new code and this will be an overload
allowed_processors_env = os.getenv('ALLOWED_PROCESSORS', 'text_json_diff, restock_diff').strip()
allowed_processors = None
if allowed_processors_env:
# Check if DISABLED_PROCESSORS env var is set
disabled_processors_env = os.getenv('DISABLED_PROCESSORS', 'image_ssim_diff').strip()
disabled_processors = []
if disabled_processors_env:
# Parse comma-separated list and strip whitespace
allowed_processors = [p.strip() for p in allowed_processors_env.split(',') if p.strip()]
logger.info(f"ALLOWED_PROCESSORS set, filtering to: {allowed_processors}")
disabled_processors = [p.strip() for p in disabled_processors_env.split(',') if p.strip()]
logger.info(f"DISABLED_PROCESSORS set, disabling: {disabled_processors}")
available = []
plugin_metadata = get_plugin_processor_metadata()
for module, sub_package_name in processor_classes:
# Filter by allowed processors if set
if allowed_processors and sub_package_name not in allowed_processors:
logger.debug(f"Skipping processor '{sub_package_name}' (not in ALLOWED_PROCESSORS)")
# Skip disabled processors
if sub_package_name in disabled_processors:
logger.debug(f"Skipping processor '{sub_package_name}' (in DISABLED_PROCESSORS)")
continue
# Try to get the 'name' attribute from the processor module first
if hasattr(module, 'name'):
description = gettext(module.name)
# Check if this is a plugin processor
if sub_package_name in plugin_metadata:
meta = plugin_metadata[sub_package_name]
description = gettext(meta.get('name', sub_package_name))
# Plugin processors start from weight 10 to separate them from built-in processors
weight = 100 + meta.get('processor_weight', 0)
else:
# Fall back to processor_description from parent module's __init__.py
parent_module = get_parent_module(module)
if parent_module and hasattr(parent_module, 'processor_description'):
description = gettext(parent_module.processor_description)
# Try to get the 'name' attribute from the processor module first
if hasattr(module, 'name'):
description = gettext(module.name)
else:
# Final fallback to a readable name
description = sub_package_name.replace('_', ' ').title()
# Fall back to processor_description from parent module's __init__.py
parent_module = get_parent_module(module)
if parent_module and hasattr(parent_module, 'processor_description'):
description = gettext(parent_module.processor_description)
else:
# Final fallback to a readable name
description = sub_package_name.replace('_', ' ').title()
# Get weight for sorting (lower weight = higher in list)
weight = 0 # Default weight for processors without explicit weight
# Get weight for sorting (lower weight = higher in list)
weight = 0 # Default weight for processors without explicit weight
# Check processor module itself first
if hasattr(module, 'processor_weight'):
weight = module.processor_weight
else:
# Fall back to parent module (package __init__.py)
parent_module = get_parent_module(module)
if parent_module and hasattr(parent_module, 'processor_weight'):
weight = parent_module.processor_weight
# Check processor module itself first
if hasattr(module, 'processor_weight'):
weight = module.processor_weight
else:
# Fall back to parent module (package __init__.py)
parent_module = get_parent_module(module)
if parent_module and hasattr(parent_module, 'processor_weight'):
weight = parent_module.processor_weight
available.append((sub_package_name, description, weight))
@@ -279,3 +380,76 @@ def get_processor_badge_css():
return '\n\n'.join(css_rules)
def save_processor_config(datastore, watch_uuid, config_data):
"""
Save processor-specific configuration to JSON file.
This is a shared helper function used by both the UI edit form and API endpoints
to consistently handle processor configuration storage.
Args:
datastore: The application datastore instance
watch_uuid: UUID of the watch
config_data: Dictionary of configuration data to save (with processor_config_* prefix removed)
Returns:
bool: True if saved successfully, False otherwise
"""
if not config_data:
return True
try:
from changedetectionio.processors.base import difference_detection_processor
# Get processor name from watch
watch = datastore.data['watching'].get(watch_uuid)
if not watch:
logger.error(f"Cannot save processor config: watch {watch_uuid} not found")
return False
processor_name = watch.get('processor', 'text_json_diff')
# Create a processor instance to access config methods
processor_instance = difference_detection_processor(datastore, watch_uuid)
# Use processor name as filename so each processor keeps its own config
config_filename = f'{processor_name}.json'
processor_instance.update_extra_watch_config(config_filename, config_data)
logger.debug(f"Saved processor config to {config_filename}: {config_data}")
return True
except Exception as e:
logger.error(f"Failed to save processor config: {e}")
return False
def extract_processor_config_from_form_data(form_data):
"""
Extract processor_config_* fields from form data and return separate dicts.
This is a shared helper function used by both the UI edit form and API endpoints
to consistently handle processor configuration extraction.
IMPORTANT: This function modifies form_data in-place by removing processor_config_* fields.
Args:
form_data: Dictionary of form data (will be modified in-place)
Returns:
dict: Dictionary of processor config data (with processor_config_* prefix removed)
"""
processor_config_data = {}
# Use list() to create a copy of keys since we're modifying the dict
for field_name in list(form_data.keys()):
if field_name.startswith('processor_config_'):
config_key = field_name.replace('processor_config_', '')
# Save all values (including empty strings) to allow explicit clearing of settings
processor_config_data[config_key] = form_data[field_name]
# Remove from form_data to prevent it from reaching datastore
del form_data[field_name]
return processor_config_data

View File

@@ -69,7 +69,7 @@ class RecheckPriorityQueue:
# Emit signals
self._emit_put_signals(item)
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
return True
except Exception as e:

View File

@@ -240,10 +240,7 @@ def init_socketio(app, datastore):
async_mode=async_mode,
cors_allowed_origins=cors_origins, # None means same-origin only
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
# Disable WebSocket compression to prevent memory accumulation
# Flask-Compress already handles HTTP response compression
engineio_options={'http_compression': False, 'compression_threshold': 0})
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')))
# Set up event handlers
logger.info("Socket.IO: Registering connect event handler")

View File

@@ -91,8 +91,8 @@ REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notif
# And again with brotli+screenshot attachment
SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5 REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --dist=load tests/test_backend.py tests/test_rss.py tests/test_unique_lines.py tests/test_notification.py tests/test_access_control.py
# Try high concurrency with aggressive worker restarts
FETCH_WORKERS=50 WORKER_MAX_RUNTIME=2 WORKER_MAX_JOBS=1 pytest tests/test_history_consistency.py -vv -l -s
# Try high concurrency
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
# Check file:// will pickup a file when enabled
echo "Hello world" > /tmp/test-file.txt

View File

@@ -69,19 +69,6 @@
}
});
// Handle Enter key in search input
if (searchInput) {
searchInput.addEventListener('keydown', function(e) {
if (e.key === 'Enter') {
e.preventDefault();
if (searchForm) {
// Trigger form submission programmatically
searchForm.dispatchEvent(new Event('submit'));
}
}
});
}
// Handle form submission
if (searchForm) {
searchForm.addEventListener('submit', function(e) {
@@ -101,8 +88,8 @@
params.append('tags', tags);
}
// Navigate to search results (always redirect to watchlist home)
window.location.href = '/?' + params.toString();
// Navigate to search results
window.location.href = '?' + params.toString();
});
}
});

View File

@@ -1,4 +1,3 @@
import gc
import shutil
from changedetectionio.strtobool import strtobool
@@ -126,10 +125,6 @@ class ChangeDetectionStore:
if 'application' in from_disk['settings']:
self.__data['settings']['application'].update(from_disk['settings']['application'])
# from_disk no longer needed - free memory immediately
del from_disk
gc.collect()
# Convert each existing watch back to the Watch.model object
for uuid, watch in self.__data['watching'].items():
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
@@ -353,8 +348,7 @@ class ChangeDetectionStore:
r = requests.request(method="GET",
url=url,
# So we know to return the JSON instead of the human-friendly "help" page
headers={'App-Guid': self.__data['app_guid']},
timeout=5.0) # 5 second timeout to prevent blocking
headers={'App-Guid': self.__data['app_guid']})
res = r.json()
# List of permissible attributes we accept from the wild internet
@@ -455,7 +449,7 @@ class ChangeDetectionStore:
data = deepcopy(self.__data)
except RuntimeError as e:
# Try again in 15 seconds
time.sleep(1)
time.sleep(15)
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
self.sync_to_json()
return

View File

@@ -58,7 +58,7 @@ def is_valid_uuid(val):
def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
@@ -506,7 +506,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Enable password check and diff page access bypass
@@ -548,172 +548,3 @@ def test_api_conflict_UI_password(client, live_server, measure_memory_usage, dat
assert len(res.json)
def test_api_url_validation(client, live_server, measure_memory_usage, datastore_path):
"""
Test URL validation for edge cases in both CREATE and UPDATE endpoints.
Addresses security issues where empty/null/invalid URLs could bypass validation.
This test ensures that:
- CREATE endpoint rejects null, empty, and invalid URLs
- UPDATE endpoint rejects attempts to change URL to null, empty, or invalid
- UPDATE endpoint allows updating other fields without touching URL
- URL validation properly checks protocol, format, and safety
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: CREATE with null URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": None}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with null URL should fail"
# Test 2: CREATE with empty string URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": ""}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with empty string URL should fail"
assert b'Invalid or unsupported URL' in res.data or b'required' in res.data.lower()
# Test 3: CREATE with whitespace-only URL should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": " "}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with whitespace-only URL should fail"
# Test 4: CREATE with invalid protocol should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "javascript:alert(1)"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch with javascript: protocol should fail"
# Test 5: CREATE with missing protocol should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "example.com"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 400, "Creating watch without protocol should fail"
# Test 6: CREATE with valid URL should succeed (baseline)
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Valid URL test"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
assert res.status_code == 201, "Creating watch with valid URL should succeed"
assert is_valid_uuid(res.json.get('uuid'))
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 7: UPDATE to null URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": None}),
)
assert res.status_code == 400, "Updating watch URL to null should fail"
# Accept either OpenAPI validation error or our custom validation error
assert b'URL cannot be null' in res.data or b'OpenAPI validation failed' in res.data or b'validation error' in res.data.lower()
# Test 8: UPDATE to empty string URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": ""}),
)
assert res.status_code == 400, "Updating watch URL to empty string should fail"
# Accept either our custom validation error or OpenAPI/schema validation error
assert b'URL cannot be empty' in res.data or b'OpenAPI validation' in res.data or b'Invalid or unsupported URL' in res.data
# Test 9: UPDATE to whitespace-only URL should fail
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": " \t\n "}),
)
assert res.status_code == 400, "Updating watch URL to whitespace should fail"
# Accept either our custom validation error or generic validation error
assert b'URL cannot be empty' in res.data or b'Invalid or unsupported URL' in res.data or b'validation' in res.data.lower()
# Test 10: UPDATE to invalid protocol should fail (javascript:)
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": "javascript:alert(document.domain)"}),
)
assert res.status_code == 400, "Updating watch URL to XSS attempt should fail"
assert b'Invalid or unsupported URL' in res.data or b'protocol' in res.data.lower()
# Test 11: UPDATE to file:// protocol should fail (unless ALLOW_FILE_URI is set)
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": "file:///etc/passwd"}),
)
assert res.status_code == 400, "Updating watch URL to file:// should fail by default"
# Test 12: UPDATE other fields without URL should succeed
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"title": "Updated title without URL change"}),
)
assert res.status_code == 200, "Updating other fields without URL should succeed"
# Test 13: Verify URL is still valid after non-URL update
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.json.get('url') == test_url, "URL should remain unchanged"
assert res.json.get('title') == "Updated title without URL change"
# Test 14: UPDATE to valid different URL should succeed
new_valid_url = test_url + "?new=param"
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({"url": new_valid_url}),
)
assert res.status_code == 200, "Updating to valid different URL should succeed"
# Test 15: Verify URL was actually updated
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.json.get('url') == new_valid_url, "URL should be updated to new valid URL"
# Test 16: CREATE with XSS in URL parameters should fail
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": "http://example.com?xss=<script>alert(1)</script>"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
follow_redirects=True
)
# This should fail because of suspicious characters check
assert res.status_code == 400, "Creating watch with XSS in URL params should fail"
# Cleanup
client.delete(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key},
)
delete_all_watches(client)

View File

@@ -1,805 +0,0 @@
#!/usr/bin/env python3
"""
Comprehensive security and edge case tests for the API.
Tests critical areas that were identified as gaps in the existing test suite.
"""
import time
import json
import threading
import uuid as uuid_module
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
import os
def set_original_response(datastore_path):
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def is_valid_uuid(val):
try:
uuid_module.UUID(str(val))
return True
except ValueError:
return False
# ============================================================================
# TIER 1: CRITICAL SECURITY TESTS
# ============================================================================
def test_api_path_traversal_in_uuids(client, live_server, measure_memory_usage, datastore_path):
"""
Test that path traversal attacks via UUID parameter are blocked.
Addresses CVE-like vulnerabilities where ../../../ in UUID could access arbitrary files.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create a valid watch first
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Valid watch"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
valid_uuid = res.json.get('uuid')
# Test 1: Path traversal with ../../../
res = client.get(
f"/api/v1/watch/../../etc/passwd",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Path traversal should be rejected"
# Test 2: Encoded path traversal
res = client.get(
"/api/v1/watch/..%2F..%2F..%2Fetc%2Fpasswd",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Encoded path traversal should be rejected"
# Test 3: Double-encoded path traversal
res = client.get(
"/api/v1/watch/%2e%2e%2f%2e%2e%2f%2e%2e%2f",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Double-encoded traversal should be rejected"
# Test 4: Try to access datastore file
res = client.get(
"/api/v1/watch/../url-watches.json",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404], "Access to datastore should be blocked"
# Test 5: Null byte injection
res = client.get(
f"/api/v1/watch/{valid_uuid}%00.json",
headers={'x-api-key': api_key}
)
# Should either work (ignoring null byte) or reject - but not crash
assert res.status_code in [200, 400, 404]
# Test 6: DELETE with path traversal
res = client.delete(
"/api/v1/watch/../../datastore/url-watches.json",
headers={'x-api-key': api_key}
)
assert res.status_code in [400, 404, 405], "DELETE with traversal should be blocked (405=method not allowed is also acceptable)"
# Cleanup
client.delete(url_for("watch", uuid=valid_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_injection_via_headers_and_proxy(client, live_server, measure_memory_usage, datastore_path):
"""
Test that injection attacks via headers and proxy fields are properly sanitized.
Addresses XSS and injection vulnerabilities.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: XSS in headers
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": {
"User-Agent": "<script>alert(1)</script>",
"X-Custom": "'; DROP TABLE watches; --"
}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Headers are metadata used for HTTP requests, not HTML rendering
# Storing them as-is is expected behavior
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
# Verify headers are stored (API returns JSON, not HTML, so no XSS risk)
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
assert res.status_code == 200
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Null bytes in headers
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": {"X-Test": "value\x00null"}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle null bytes gracefully (reject or sanitize)
assert res.status_code in [201, 400]
# Test 3: Malformed proxy string
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"proxy": "http://evil.com:8080@victim.com"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject invalid proxy format
assert res.status_code == 400
# Test 4: Control characters in notification title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_title": "Test\r\nInjected-Header: value"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept but sanitize control characters
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_large_payload_dos(client, live_server, measure_memory_usage, datastore_path):
"""
Test that excessively large payloads are rejected to prevent DoS.
Addresses memory leak issues found in changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Huge ignore_text array
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"ignore_text": ["a" * 10000] * 100 # 1MB of data
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should either accept (with limits) or reject
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Massive headers object
huge_headers = {f"X-Header-{i}": "x" * 1000 for i in range(100)}
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"headers": huge_headers
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject or truncate
assert res.status_code in [201, 400, 413]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Huge browser_steps array
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click", "selector": "#test" * 1000, "optional_value": ""}
] * 100
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject or limit
assert res.status_code in [201, 400, 413]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 4: Extremely long title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "x" * 100000 # 100KB title
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should reject (exceeds maxLength: 5000)
assert res.status_code == 400
delete_all_watches(client)
def test_api_utf8_encoding_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test UTF-8 encoding edge cases that have caused bugs on Windows.
Addresses 18+ encoding bugs from changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Unicode in title (should work)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "Test 中文 Ελληνικά 日本語 🔥"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
# Verify it round-trips correctly
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
assert res.status_code == 200
assert "中文" in res.json.get('title')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Unicode in URL query parameters
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url + "?search=日本語"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle URL encoding properly
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Null byte in title (should be rejected or sanitized)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "Test\x00Title"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should handle gracefully
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 4: BOM (Byte Order Mark) in title
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"title": "\ufeffTest with BOM"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_concurrency_race_conditions(client, live_server, measure_memory_usage, datastore_path):
"""
Test concurrent API requests to detect race conditions.
Addresses 20+ concurrency bugs from changelog.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create a watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": "Concurrency test"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 1: Concurrent updates to same watch
# Note: Flask test client is not thread-safe, so we test sequential updates instead
# Real concurrency issues would be caught in integration tests with actual HTTP requests
results = []
for i in range(10):
try:
r = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"title": f"Title {i}"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
results.append(r.status_code)
except Exception as e:
results.append(str(e))
# All updates should succeed (200) without crashes
assert all(r == 200 for r in results), f"Some updates failed: {results}"
# Test 2: Update while watch is being checked
# Queue a recheck
client.get(
url_for("watch", uuid=watch_uuid, recheck=True),
headers={'x-api-key': api_key}
)
# Immediately update it
res = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"title": "Updated during check"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should succeed without error
assert res.status_code == 200
# Test 3: Delete watch that's being processed
# Create another watch
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
watch_uuid2 = res.json.get('uuid')
# Queue it for checking
client.get(url_for("watch", uuid=watch_uuid2, recheck=True), headers={'x-api-key': api_key})
# Immediately delete it
res = client.delete(url_for("watch", uuid=watch_uuid2), headers={'x-api-key': api_key})
# Should succeed or return appropriate error
assert res.status_code in [204, 404, 400]
# Cleanup
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
# ============================================================================
# TIER 2: IMPORTANT FUNCTIONALITY TESTS
# ============================================================================
def test_api_time_validation_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test time_between_check validation edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Zero interval
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": 0}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "Zero interval should be rejected"
# Test 2: Negative interval
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": -100}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "Negative interval should be rejected"
# Test 3: All fields null with use_default=false
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"weeks": None, "days": None, "hours": None, "minutes": None, "seconds": None}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 400, "All null intervals should be rejected when not using default"
# Test 4: Extremely large interval (overflow risk)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"weeks": 999999999}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should either accept (with limits) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 5: Valid minimal interval (should work)
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"time_between_check_use_default": False,
"time_between_check": {"seconds": 60}
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_browser_steps_validation(client, live_server, measure_memory_usage, datastore_path):
"""
Test browser_steps validation for invalid operations and structures.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Empty browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "", "selector": "", "optional_value": ""}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (empty is valid as null)
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Invalid operation type
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "invalid_operation", "selector": "#test", "optional_value": ""}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (validation happens at runtime) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 3: Missing required fields in browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click"} # Missing selector and optional_value
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected due to schema validation
assert res.status_code == 400
# Test 4: Extra fields in browser step
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"browser_steps": [
{"operation": "click", "selector": "#test", "optional_value": "", "extra_field": "value"}
]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected due to additionalProperties: false
assert res.status_code == 400
delete_all_watches(client)
def test_api_queue_manipulation(client, live_server, measure_memory_usage, datastore_path):
"""
Test queue behavior under stress and edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Create many watches rapidly
watch_uuids = []
for i in range(20):
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url, "title": f"Watch {i}"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
if res.status_code == 201:
watch_uuids.append(res.json.get('uuid'))
assert len(watch_uuids) == 20, "Should be able to create 20 watches"
# Test 2: Recheck all when watches exist
res = client.get(
url_for("createwatch", recheck_all='1'),
headers={'x-api-key': api_key},
)
# Should return success (200 or 202 for background processing)
assert res.status_code in [200, 202]
# Test 3: Verify queue doesn't overflow with moderate load
# The app has MAX_QUEUE_SIZE = 5000, we're well below that
wait_for_all_checks(client)
# Cleanup
for uuid in watch_uuids:
client.delete(url_for("watch", uuid=uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
# ============================================================================
# TIER 3: EDGE CASES & POLISH
# ============================================================================
def test_api_history_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test history API with invalid timestamps and edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Create watch and generate history
res = client.post(
url_for("createwatch"),
data=json.dumps({"url": test_url}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
watch_uuid = res.json.get('uuid')
wait_for_all_checks(client)
# Test 1: Get history with invalid timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="invalid"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Invalid timestamp should return 404"
# Test 2: Future timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="9999999999"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Future timestamp should return 404"
# Test 3: Negative timestamp
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="-1"),
headers={'x-api-key': api_key}
)
assert res.status_code == 404, "Negative timestamp should return 404"
# Test 4: Diff with reversed timestamps (from > to)
# First get actual timestamps
res = client.get(
url_for("watchhistory", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
if len(res.json) >= 2:
timestamps = sorted(res.json.keys())
# Try reversed order
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp=timestamps[-1], to_timestamp=timestamps[0]),
headers={'x-api-key': api_key}
)
# Should either work (show reverse diff) or return error
assert res.status_code in [200, 400]
# Cleanup
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_notification_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test notification configuration edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Invalid notification URL
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_urls": ["invalid://url", "ftp://test.com"]
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (apprise validates at runtime) or reject
assert res.status_code in [201, 400]
if res.status_code == 201:
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
# Test 2: Invalid notification format
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_format": "invalid_format"
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected by schema
assert res.status_code == 400
# Test 3: Empty notification arrays
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": test_url,
"notification_urls": []
}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (empty is valid)
assert res.status_code == 201
watch_uuid = res.json.get('uuid')
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
delete_all_watches(client)
def test_api_tag_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test tag/group API edge cases including XSS and path traversal.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Test 1: Empty tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": ""}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected (empty title)
assert res.status_code == 400
# Test 2: XSS in tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "<script>alert(1)</script>"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept but sanitize
if res.status_code == 201:
tag_uuid = res.json.get('uuid')
# Verify title is stored safely
res = client.get(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Should be escaped or sanitized
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Test 3: Path traversal in tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "../../etc/passwd"}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should accept (it's just a string, not a path)
if res.status_code == 201:
tag_uuid = res.json.get('uuid')
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
# Test 4: Very long tag title
res = client.post(
url_for("tag"),
data=json.dumps({"title": "x" * 10000}),
headers={'content-type': 'application/json', 'x-api-key': api_key},
)
# Should be rejected (exceeds maxLength)
assert res.status_code == 400
def test_api_authentication_edge_cases(client, live_server, measure_memory_usage, datastore_path):
"""
Test API authentication edge cases.
"""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)
# Test 1: Missing API key
res = client.get(url_for("createwatch"))
assert res.status_code == 403, "Missing API key should be forbidden"
# Test 2: Invalid API key
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "invalid_key_12345"}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 3: API key with special characters
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "key<script>alert(1)</script>"}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 4: Very long API key
res = client.get(
url_for("createwatch"),
headers={'x-api-key': "x" * 10000}
)
assert res.status_code == 403, "Invalid API key should be forbidden"
# Test 5: Case sensitivity of API key
wrong_case_key = api_key.upper() if api_key.islower() else api_key.lower()
res = client.get(
url_for("createwatch"),
headers={'x-api-key': wrong_case_key}
)
# Should be forbidden (keys are case-sensitive)
assert res.status_code == 403, "Wrong case API key should be forbidden"
# Test 6: Valid API key should work
res = client.get(
url_for("createwatch"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200, "Valid API key should work"

View File

@@ -325,274 +325,3 @@ def test_time_unit_translations(client, live_server, measure_memory_usage, datas
assert b"Time Between Check" not in res.data, "Should not have English 'Time Between Check'"
assert "Chrome 請求".encode() not in res.data, "Should not have incorrect 'Chrome 請求' (Chrome requests)"
assert "使用預設通知".encode() not in res.data, "Should not have incorrect '使用預設通知' (Use default notification)"
def test_accept_language_header_zh_tw(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending zh-TW in Accept-Language header get Traditional Chinese.
This tests the locale alias mapping for issue #3779.
"""
from flask import url_for
# Clear any session data to simulate a fresh visitor
with client.session_transaction() as sess:
sess.clear()
# Request the index page with zh-TW in Accept-Language header (what browsers send)
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Traditional Chinese content, not Simplified Chinese
# Traditional: 選擇語言, Simplified: 选择语言
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' (Select Language)"
assert '选择语言'.encode() not in res.data, "Should not get Simplified Chinese '选择语言'"
# Check HTML lang attribute uses BCP 47 format
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected BCP 47 language tag zh-Hant-TW in HTML"
# Check that the correct flag icon is shown (Taiwan flag for Traditional Chinese)
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
"Expected Taiwan flag 'fi fi-tw' for Traditional Chinese"
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' not in res.data, \
"Should not show China flag 'fi fi-cn' for Traditional Chinese"
# Verify we're getting Traditional Chinese text throughout the page
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Check Traditional Chinese translations (not English)
assert "小時".encode() in res.data, "Expected Traditional Chinese '小時' for Hours"
assert "分鐘".encode() in res.data, "Expected Traditional Chinese '分鐘' for Minutes"
assert b"Hours" not in res.data or "小時".encode() in res.data, "Should have Traditional Chinese, not English"
def test_accept_language_header_en_variants(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending en-GB and en-US in Accept-Language header get the correct English variant.
This ensures the locale selector works properly for English variants.
"""
from flask import url_for
# Test 1: British English (en-GB)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en-GB,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
# Check HTML lang attribute uses BCP 47 format with hyphen
assert b'<html lang="en-GB"' in res.data, "Expected BCP 47 language tag en-GB in HTML"
# Check that the correct flag icon is shown (UK flag for en-GB)
assert b'<span class="fi fi-gb fis" id="language-selector-flag">' in res.data, \
"Expected UK flag 'fi fi-gb' for British English"
# Test 2: American English (en-US)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en-US,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
# Check HTML lang attribute uses BCP 47 format with hyphen
assert b'<html lang="en-US"' in res.data, "Expected BCP 47 language tag en-US in HTML"
# Check that the correct flag icon is shown (US flag for en-US)
assert b'<span class="fi fi-us fis" id="language-selector-flag">' in res.data, \
"Expected US flag 'fi fi-us' for American English"
# Test 3: Generic 'en' should fall back to one of the English variants
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'en'},
follow_redirects=True
)
assert res.status_code == 200
# Should get English content (either variant is fine)
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
def test_accept_language_header_zh_simplified(client, live_server, measure_memory_usage, datastore_path):
"""
Test that browsers sending zh or zh-CN in Accept-Language header get Simplified Chinese.
This ensures Simplified Chinese still works correctly and doesn't get confused with Traditional.
"""
from flask import url_for
# Test 1: Generic 'zh' should get Simplified Chinese
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Simplified Chinese content, not Traditional Chinese
# Simplified: 选择语言, Traditional: 選擇語言
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' (Select Language)"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese '選擇語言'"
# Check HTML lang attribute
assert b'<html lang="zh"' in res.data, "Expected language tag zh in HTML"
# Check that the correct flag icon is shown (China flag for Simplified Chinese)
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
"Expected China flag 'fi fi-cn' for Simplified Chinese"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
"Should not show Taiwan flag 'fi fi-tw' for Simplified Chinese"
# Test 2: 'zh-CN' should also get Simplified Chinese
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should get Simplified Chinese content
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' with zh-CN header"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese with zh-CN header"
# Check that the correct flag icon is shown (China flag for zh-CN)
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
"Expected China flag 'fi fi-cn' for zh-CN header"
# Verify Simplified Chinese in settings page
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh,en;q=0.9'},
follow_redirects=True
)
assert res.status_code == 200
# Check Simplified Chinese translations (not Traditional or English)
# Simplified: 小时, Traditional: 小時
assert "小时".encode() in res.data, "Expected Simplified Chinese '小时' for Hours"
assert "分钟".encode() in res.data, "Expected Simplified Chinese '分钟' for Minutes"
assert "".encode() in res.data, "Expected Simplified Chinese '' for Seconds"
# Make sure it's not Traditional Chinese
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時'"
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘'"
def test_session_locale_overrides_accept_language(client, live_server, measure_memory_usage, datastore_path):
"""
Test that session locale preference overrides browser Accept-Language header.
Scenario:
1. Browser auto-detects zh-TW (Traditional Chinese) from Accept-Language header
2. User explicitly selects Korean language
3. On subsequent page loads, Korean should be shown (not Traditional Chinese)
even though the Accept-Language header still says zh-TW
This tests the session override behavior for issue #3779.
"""
from flask import url_for
# Step 1: Clear session and make first request with zh-TW header (auto-detect)
with client.session_transaction() as sess:
sess.clear()
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
follow_redirects=True
)
assert res.status_code == 200
# Should initially get Traditional Chinese from auto-detect
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' from auto-detect"
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected zh-Hant-TW language tag"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
"Expected Taiwan flag 'fi fi-tw' from auto-detect"
# Step 2: User explicitly selects Korean language
res = client.get(
url_for("set_language", locale="ko"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Browser still sends zh-TW
follow_redirects=True
)
assert res.status_code == 200
# Step 3: Make another request with same zh-TW header
# Session should override the Accept-Language header
res = client.get(
url_for("watchlist.index"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still sending zh-TW!
follow_redirects=True
)
assert res.status_code == 200
# Should now get Korean (session overrides auto-detect)
# Korean: 언어 선택, Traditional Chinese: 選擇語言
assert '언어 선택'.encode() in res.data, "Expected Korean '언어 선택' (Select Language) from session"
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese when Korean is set in session"
# Check HTML lang attribute is Korean
assert b'<html lang="ko"' in res.data, "Expected Korean language tag 'ko' in HTML"
# Check that Korean flag is shown (not Taiwan flag)
assert b'<span class="fi fi-kr fis" id="language-selector-flag">' in res.data, \
"Expected Korean flag 'fi fi-kr' from session preference"
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
"Should not show Taiwan flag when Korean is set in session"
# Verify Korean text on settings page as well
res = client.get(
url_for("settings.settings_page"),
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still zh-TW!
follow_redirects=True
)
assert res.status_code == 200
# Check Korean translations (not Traditional Chinese or English)
# Korean: 시간 (Hours), 분 (Minutes), 초 (Seconds)
# Traditional Chinese: 小時, 分鐘, 秒
assert "시간".encode() in res.data, "Expected Korean '시간' for Hours"
assert "".encode() in res.data, "Expected Korean '' for Minutes"
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時' when Korean is set"
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘' when Korean is set"

File diff suppressed because it is too large Load Diff

View File

@@ -64,19 +64,6 @@ def is_safe_valid_url(test_url):
import re
import validators
# Validate input type first - must be a non-empty string
if test_url is None:
logger.warning('URL validation failed: URL is None')
return False
if not isinstance(test_url, str):
logger.warning(f'URL validation failed: URL must be a string, got {type(test_url).__name__}')
return False
if not test_url.strip():
logger.warning('URL validation failed: URL is empty or whitespace only')
return False
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'

View File

@@ -132,19 +132,11 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
while not app.config.exit.is_set():
try:
result = await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
if result == "restart":
# Worker requested restart - immediately loop back and restart
if not in_pytest:
logger.debug(f"Async worker {worker_id} restarting")
continue
else:
# Worker exited cleanly (shutdown)
if not in_pytest:
logger.info(f"Async worker {worker_id} exited cleanly")
break
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
# If we reach here, worker exited cleanly
if not in_pytest:
logger.info(f"Async worker {worker_id} exited cleanly")
break
except asyncio.CancelledError:
# Task was cancelled (normal shutdown)
if not in_pytest:
@@ -155,7 +147,7 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
if not in_pytest:
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
await asyncio.sleep(5)
if not in_pytest:
logger.info(f"Async worker {worker_id} shutdown complete")
@@ -169,11 +161,7 @@ def add_worker(update_q, notification_q, app, datastore):
"""Add a new async worker (for dynamic scaling)"""
global worker_threads
# Reuse lowest available ID to prevent unbounded growth over time
used_ids = {w.worker_id for w in worker_threads}
worker_id = 0
while worker_id in used_ids:
worker_id += 1
worker_id = len(worker_threads)
logger.info(f"Adding async worker {worker_id}")
try:
@@ -263,7 +251,7 @@ def queue_item_async_safe(update_q, item, silent=False):
return False
if not silent:
logger.trace(f"Successfully queued item: {item_uuid}")
logger.debug(f"Successfully queued item: {item_uuid}")
return True
except Exception as e:

View File

@@ -16,6 +16,13 @@ services:
# Log output levels: TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL
# - LOGGER_LEVEL=TRACE
#
# Install additional Python packages (processor plugins, etc.)
# Packages are installed at container startup and cached to avoid reinstalling on every restart
# Example: Install the OSINT reconnaissance processor plugin
# - EXTRA_PACKAGES=changedetection-osint-processor
# Multiple packages can be installed by separating with spaces:
# - EXTRA_PACKAGES=changedetection-osint-processor another-plugin
#
#
# Uncomment below and the "sockpuppetbrowser" to use a real Chrome browser (It uses the "playwright" protocol)
# - PLAYWRIGHT_DRIVER_URL=ws://browser-sockpuppet-chrome:3000

28
docker-entrypoint.sh Executable file
View File

@@ -0,0 +1,28 @@
#!/bin/bash
set -e
# Install additional packages from EXTRA_PACKAGES env var
# Uses a marker file to avoid reinstalling on every container restart
INSTALLED_MARKER="/datastore/.extra_packages_installed"
CURRENT_PACKAGES="$EXTRA_PACKAGES"
if [ -n "$EXTRA_PACKAGES" ]; then
# Check if we need to install/update packages
if [ ! -f "$INSTALLED_MARKER" ] || [ "$(cat $INSTALLED_MARKER 2>/dev/null)" != "$CURRENT_PACKAGES" ]; then
echo "Installing extra packages: $EXTRA_PACKAGES"
pip3 install --no-cache-dir $EXTRA_PACKAGES
if [ $? -eq 0 ]; then
echo "$CURRENT_PACKAGES" > "$INSTALLED_MARKER"
echo "Extra packages installed successfully"
else
echo "ERROR: Failed to install extra packages"
exit 1
fi
else
echo "Extra packages already installed: $EXTRA_PACKAGES"
fi
fi
# Execute the main command
exec "$@"

View File

@@ -183,30 +183,15 @@ components:
properties:
weeks:
type: integer
minimum: 0
maximum: 52000
nullable: true
days:
type: integer
minimum: 0
maximum: 365000
nullable: true
hours:
type: integer
minimum: 0
maximum: 8760000
nullable: true
minutes:
type: integer
minimum: 0
maximum: 525600000
nullable: true
seconds:
type: integer
minimum: 0
maximum: 31536000000
nullable: true
description: Time intervals between checks. All fields must be non-negative. At least one non-zero value required when not using default settings.
description: Time intervals between checks
time_between_check_use_default:
type: boolean
default: true
@@ -215,9 +200,7 @@ components:
type: array
items:
type: string
maxLength: 1000
maxItems: 100
description: Notification URLs for this web page change monitor (watch). Maximum 100 URLs.
description: Notification URLs for this web page change monitor (watch)
notification_title:
type: string
description: Custom notification title
@@ -241,19 +224,14 @@ components:
operation:
type: string
maxLength: 5000
nullable: true
selector:
type: string
maxLength: 5000
nullable: true
optional_value:
type: string
maxLength: 5000
nullable: true
required: [operation, selector, optional_value]
additionalProperties: false
maxItems: 100
description: Browser automation steps. Maximum 100 steps allowed.
description: Browser automation steps
processor:
type: string
enum: [restock_diff, text_json_diff]

View File

@@ -51,9 +51,9 @@ linkify-it-py
# - Needed for apprise/spush, and maybe others? hopefully doesnt trigger a rust compile.
# - Requires extra wheel for rPi, adds build time for arm/v8 which is not in piwheels
# Pinned to 43.0.1 for ARM compatibility (45.x may not have pre-built ARM wheels)
# Pinned to 44.x for ARM compatibility and sslyze compatibility (sslyze requires <45)
# Also pinned because dependabot wants specific versions
cryptography==46.0.3
cryptography==44.0.0
# apprise mqtt https://github.com/dgtlmoon/changedetection.io/issues/315
# use any version other than 2.0.x due to https://github.com/eclipse/paho.mqtt.python/issues/814