Compare commits

...

32 Commits

Author SHA1 Message Date
dgtlmoon
65e6b461cf Tweaks
Some checks are pending
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Waiting to run
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Blocked by required conditions
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Blocked by required conditions
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Waiting to run
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Waiting to run
ChangeDetection.io App Test / lint-code (push) Waiting to run
ChangeDetection.io App Test / test-application-3-10 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-11 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-12 (push) Blocked by required conditions
ChangeDetection.io App Test / test-application-3-13 (push) Blocked by required conditions
2026-02-02 20:24:33 +01:00
dgtlmoon
d96ddc0f23 Tweaks 2026-02-02 20:20:07 +01:00
dgtlmoon
e09c2813b1 Lower workers for testing 2026-02-02 20:04:17 +01:00
dgtlmoon
1f3c0995e5 test speeupds 2026-02-02 19:51:04 +01:00
dgtlmoon
d420bda7e4 tweaks 2026-02-02 18:55:59 +01:00
dgtlmoon
f6a1b6d808 Timing tune 2026-02-02 18:49:54 +01:00
dgtlmoon
6f12412396 Queue changes 2026-02-02 18:37:07 +01:00
dgtlmoon
ff2ead88dd test tweak 2026-02-02 18:19:27 +01:00
dgtlmoon
c38e3df4ee Bump ignore 2026-02-02 18:10:55 +01:00
dgtlmoon
899e21a018 Queue timing fixes 2026-02-02 18:10:47 +01:00
dgtlmoon
aea7fc6f0a test cleanup 2026-02-02 15:09:05 +01:00
dgtlmoon
d6d4960762 test tweak 2026-02-02 15:08:22 +01:00
dgtlmoon
72073bfc5e include cleanup 2026-02-02 15:04:10 +01:00
dgtlmoon
8c809872e8 GitHub build - attempt to cache container build better 2026-02-02 14:58:19 +01:00
dgtlmoon
081d803977 test fix 2026-02-02 14:56:34 +01:00
dgtlmoon
61826bbf94 WIP 2026-02-02 14:47:14 +01:00
dgtlmoon
5fc920db5d WIP
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-02 13:02:20 +01:00
dgtlmoon
68fb5cf898 test tweak 2026-02-02 12:59:21 +01:00
dgtlmoon
5b153ca25d Revert test changes 2026-02-02 12:51:39 +01:00
dgtlmoon
f166e96466 Test fix 2026-02-02 11:16:16 +01:00
dgtlmoon
b7eaeb4ae4 Test fixes 2026-02-02 11:06:11 +01:00
dgtlmoon
ef310e4a67 test tweaks
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-01 18:26:33 +01:00
dgtlmoon
cf32bf5f47 test improvements 2026-02-01 18:18:22 +01:00
dgtlmoon
424e4ec1aa Add test for worker active count 2026-02-01 12:27:19 +01:00
dgtlmoon
c1dca306ad Refactor queue handling, add tests 2026-02-01 12:21:39 +01:00
dgtlmoon
e219e8cada Janus queue worker not needed, improves multiple workers 2026-02-01 10:57:04 +01:00
dgtlmoon
674d863a21 UI - Move Default Proxy selection back to "General" tab
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2026-01-28 18:15:32 +01:00
dgtlmoon
0b9cfcdf09 API - Notification URLs werent always being validated (#3812) 2026-01-28 17:30:58 +01:00
dgtlmoon
fd820c9330 Remove deprecated call to strtobool 2026-01-28 17:27:20 +01:00
Robbert Langezaal
e02a1824c5 UI - Make watch tags link elements (#3813) 2026-01-28 17:18:23 +01:00
dgtlmoon
5911b7fe7a test tweak 2026-01-28 17:17:37 +01:00
dgtlmoon
a239480272 DB data migration upgrade fixes (#3811) 2026-01-28 12:02:19 +01:00
54 changed files with 1192 additions and 504 deletions

View File

@@ -37,10 +37,29 @@ jobs:
${{ runner.os }}-pip-py${{ env.PYTHON_VERSION }}-
${{ runner.os }}-pip-
- name: Get current date for cache key
id: date
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build changedetection.io container for testing under Python ${{ env.PYTHON_VERSION }}
uses: docker/build-push-action@v6
with:
context: ./
file: ./Dockerfile
build-args: |
PYTHON_VERSION=${{ env.PYTHON_VERSION }}
LOGGER_LEVEL=TRACE
tags: test-changedetectionio
load: true
cache-from: type=gha,scope=build-${{ github.ref_name }}-py${{ env.PYTHON_VERSION }}-${{ hashFiles('requirements.txt', 'Dockerfile') }}-${{ steps.date.outputs.date }}
cache-to: type=gha,mode=max,scope=build-${{ github.ref_name }}-py${{ env.PYTHON_VERSION }}-${{ hashFiles('requirements.txt', 'Dockerfile') }}-${{ steps.date.outputs.date }}
- name: Verify build
run: |
echo "---- Building for Python ${{ env.PYTHON_VERSION }} -----"
docker build --build-arg PYTHON_VERSION=${{ env.PYTHON_VERSION }} --build-arg LOGGER_LEVEL=TRACE -t test-changedetectionio .
echo "---- Built for Python ${{ env.PYTHON_VERSION }} -----"
docker run test-changedetectionio bash -c 'pip list'
- name: We should be Python ${{ env.PYTHON_VERSION }} ...

1
.gitignore vendored
View File

@@ -29,3 +29,4 @@ test-datastore/
# Memory consumption log
test-memory.log
tests/logs/

View File

@@ -102,8 +102,8 @@ def sigshutdown_handler(_signo, _stack_frame):
# Shutdown workers and queues immediately
try:
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
from changedetectionio import worker_pool
worker_pool.shutdown_workers()
except Exception as e:
logger.error(f"Error shutting down workers: {str(e)}")
@@ -133,43 +133,48 @@ def sigshutdown_handler(_signo, _stack_frame):
sys.exit()
def print_help():
"""Print help text for command line options"""
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print(' -P true/false Set all watches paused (true) or active (false)')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print(' -r all N Queue all watches, wait for completion, repeat N times')
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
print(' Use -p PORT to specify a different port if needed')
print('')
def main():
global datastore
global app
# Early help/version check before any initialization
if '--help' in sys.argv or '-help' in sys.argv:
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print(' -r all N Queue all watches, wait for completion, repeat N times')
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
print(' Use -p PORT to specify a different port if needed')
print('')
print_help()
sys.exit(0)
if '--version' in sys.argv or '-v' in sys.argv:
@@ -185,6 +190,7 @@ def main():
# Set a default logger level
logger_level = 'DEBUG'
include_default_watches = True
all_paused = None # None means don't change, True/False to set
host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip()
port = int(os.environ.get('PORT', 5000))
@@ -263,39 +269,9 @@ def main():
i += 1
try:
opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:", "port")
opts, args = getopt.getopt(cleaned_argv[1:], "6Ccsd:h:p:l:P:", "port")
except getopt.GetoptError as e:
print('Usage: changedetection.py [options]')
print('')
print('Standard options:')
print(' -s SSL enable')
print(' -h HOST Listen host (default: 0.0.0.0)')
print(' -p PORT Listen port (default: 5000)')
print(' -d PATH Datastore path')
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
print(' -c Cleanup unused snapshots')
print(' -C Create datastore directory if it doesn\'t exist')
print('')
print('Add URLs on startup:')
print(' -u URL Add URL to watch (can be used multiple times)')
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
print(' Available options: processor, fetch_backend, headers, method, etc.')
print(' See model/Watch.py for all available options')
print('')
print('Recheck on startup:')
print(' -r all Queue all watches for recheck on startup')
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
print(' -r all N Queue all watches, wait for completion, repeat N times')
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
print('')
print('Batch mode:')
print(' -b Run in batch mode (process queue then exit)')
print(' Useful for CI/CD, cron jobs, or one-time checks')
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
print(' Use -p PORT to specify a different port if needed')
print('')
print_help()
print(f'Error: {e}')
sys.exit(2)
@@ -332,6 +308,14 @@ def main():
if opt == '-l':
logger_level = int(arg) if arg.isdigit() else arg.upper()
if opt == '-P':
try:
all_paused = bool(strtobool(arg))
except ValueError:
print(f'Error: Invalid value for -P option: {arg}')
print('Expected: true, false, yes, no, 1, or 0')
sys.exit(2)
# If URLs are provided, don't include default watches
if urls_to_add:
include_default_watches = False
@@ -398,6 +382,11 @@ def main():
logger.critical(str(e))
return
# Apply all_paused setting if specified via CLI
if all_paused is not None:
datastore.data['settings']['application']['all_paused'] = all_paused
logger.info(f"Setting all watches paused: {all_paused}")
# Inject datastore into plugins that need access to settings
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
inject_datastore_into_plugins(datastore)
@@ -426,12 +415,12 @@ def main():
# This must happen AFTER app initialization so update_q is available
if batch_mode and added_watch_uuids:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
from changedetectionio import queuedWatchMetaData, worker_pool
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
for watch_uuid in added_watch_uuids:
try:
worker_handler.queue_item_async_safe(
worker_pool.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -443,7 +432,7 @@ def main():
# This must happen AFTER app initialization so update_q is available
if recheck_watches is not None:
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler
from changedetectionio import queuedWatchMetaData, worker_pool
watches_to_queue = []
if recheck_watches == 'all':
@@ -465,7 +454,7 @@ def main():
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_handler.queue_item_async_safe(
worker_pool.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -527,7 +516,7 @@ def main():
for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']:
try:
worker_handler.queue_item_async_safe(
worker_pool.queue_item_async_safe(
update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
)
@@ -560,7 +549,7 @@ def main():
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
# Use the shared wait_for_all_checks function
completed = worker_handler.wait_for_all_checks(update_q, timeout=300)
completed = worker_pool.wait_for_all_checks(update_q, timeout=300)
if not completed:
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")

View File

@@ -1,5 +1,5 @@
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from flask_expects_json import expects_json
from flask_restful import abort, Resource
from loguru import logger
@@ -42,7 +42,7 @@ class Tag(Resource):
# If less than 20 watches, queue synchronously for immediate feedback
if len(watches_to_queue) < 20:
for watch_uuid in watches_to_queue:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200
else:
# 20+ watches - queue in background thread to avoid blocking API response
@@ -50,7 +50,7 @@ class Tag(Resource):
"""Background thread to queue watches - discarded after completion."""
try:
for watch_uuid in watches_to_queue:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued")
except Exception as e:
logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}")
@@ -96,6 +96,16 @@ class Tag(Resource):
if not tag:
abort(404, message='No tag exists with the UUID of {}'.format(uuid))
# Validate notification_urls if provided
if 'notification_urls' in request.json:
from wtforms import ValidationError
from changedetectionio.api.Notifications import validate_notification_urls
try:
notification_urls = request.json.get('notification_urls', [])
validate_notification_urls(notification_urls)
except ValidationError as e:
return str(e), 400
tag.update(request.json)
self.datastore.needs_write_urgent = True

View File

@@ -6,7 +6,7 @@ from changedetectionio.favicon_utils import get_favicon_mime_type
from . import auth
from changedetectionio import queuedWatchMetaData, strtobool
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from flask import request, make_response, send_from_directory
from flask_expects_json import expects_json
from flask_restful import abort, Resource
@@ -85,7 +85,7 @@ class Watch(Resource):
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'):
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return "OK", 200
if request.args.get('paused', '') == 'paused':
self.datastore.data['watching'].get(uuid).pause()
@@ -140,6 +140,16 @@ class Watch(Resource):
if validation_error:
return validation_error, 400
# Validate notification_urls if provided
if 'notification_urls' in request.json:
from wtforms import ValidationError
from changedetectionio.api.Notifications import validate_notification_urls
try:
notification_urls = request.json.get('notification_urls', [])
validate_notification_urls(notification_urls)
except ValidationError as e:
return str(e), 400
# XSS etc protection - validate URL if it's being updated
if 'url' in request.json:
new_url = request.json.get('url')
@@ -444,6 +454,16 @@ class CreateWatch(Resource):
if validation_error:
return validation_error, 400
# Validate notification_urls if provided
if 'notification_urls' in json_data:
from wtforms import ValidationError
from changedetectionio.api.Notifications import validate_notification_urls
try:
notification_urls = json_data.get('notification_urls', [])
validate_notification_urls(notification_urls)
except ValidationError as e:
return str(e), 400
extras = copy.deepcopy(json_data)
# Because we renamed 'tag' to 'tags' but don't want to change the API (can do this in v2 of the API)
@@ -457,7 +477,7 @@ class CreateWatch(Resource):
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
if new_uuid:
# Dont queue because the scheduler will check that it hasnt been checked before anyway
# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
# worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
return {'uuid': new_uuid}, 201
else:
return "Invalid or unsupported URL", 400
@@ -494,7 +514,7 @@ class CreateWatch(Resource):
if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
running_uuids = set(worker_pool.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = [
@@ -504,7 +524,7 @@ class CreateWatch(Resource):
# Queue only the filtered watches
for uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -516,7 +536,7 @@ class CreateWatch(Resource):
# 20+ watches - queue in background thread to avoid blocking API response
# Capture queued/running state before background thread
queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
running_uuids = set(worker_pool.get_running_uuids())
def queue_all_watches_background():
"""Background thread to queue all watches - discarded after completion."""
@@ -526,7 +546,7 @@ class CreateWatch(Resource):
for uuid in watches_to_queue:
# Check if already queued or running (state captured at start)
if uuid not in queued_uuids and uuid not in running_uuids:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
queued_count += 1
else:
skipped_count += 1

View File

@@ -14,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import forms
#
if request.method == 'POST':
# from changedetectionio import worker_handler
# from changedetectionio import worker_pool
from changedetectionio.blueprint.imports.importer import (
import_url_list,
@@ -31,7 +31,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs")
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if len(importer_handler.remaining_data) == 0:
return redirect(url_for('watchlist.index'))
@@ -45,7 +45,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# XLSX importer
@@ -70,7 +70,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Could be some remaining, or we could be on GET

View File

@@ -4,7 +4,7 @@ from flask import Blueprint, flash, redirect, url_for
from flask_login import login_required
from changedetectionio.store import ChangeDetectionStore
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from queue import PriorityQueue
PRICE_DATA_TRACK_ACCEPT = 'accepted'
@@ -20,7 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
datastore.data['watching'][uuid]['processor'] = 'restock_diff'
datastore.data['watching'][uuid].clear_watch()
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return redirect(url_for("watchlist.index"))
@login_required

View File

@@ -37,6 +37,8 @@ def construct_single_watch_routes(rss_blueprint, datastore):
rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# Get the watch by UUID
watch = datastore.data['watching'].get(uuid)
if not watch:

View File

@@ -83,7 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Adjust worker count if it changed
if new_worker_count != old_worker_count:
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds
# Check CPU core availability and warn if worker count is high
@@ -92,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format(
new_worker_count, cpu_count), 'warning')
result = worker_handler.adjust_async_worker_count(
result = worker_pool.adjust_async_worker_count(
new_count=new_worker_count,
update_q=update_q,
notification_q=notification_q,

View File

@@ -80,6 +80,16 @@
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">{{ _('When a request returns no content, or the HTML does not contain any text, is this considered a change?') }}</span>
</div>
{% if form.requests.proxy %}
<div>
<br>
<div class="inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">{{ _('Choose a default proxy for all watches') }}</span>
</div>
</div>
{% endif %}
</fieldset>
</div>
@@ -340,15 +350,6 @@ nav
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }}
<span class="pure-form-message-inline">{{ _('"Name" will be used for selecting the proxy in the Watch Edit settings') }}</span><br>
<span class="pure-form-message-inline">{{ _('SOCKS5 proxies with authentication are only supported with \'plain requests\' fetcher, for other fetchers you should whitelist the IP access instead') }}</span>
{% if form.requests.proxy %}
<div>
<br>
<div class="inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">{{ _('Choose a default proxy for all watches') }}</span>
</div>
</div>
{% endif %}
</div>
<div class="pure-control-group" id="extra-browsers-setting">
<p>

View File

@@ -10,7 +10,7 @@ from changedetectionio.blueprint.ui.notification import construct_blueprint as c
from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint
from changedetectionio.blueprint.ui import diff, preview
def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
from flask import request, flash
if op == 'delete':
@@ -63,7 +63,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
if datastore.data['watching'].get(uuid):
# Recheck and require a full reprocessing
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if emit_flash:
flash(gettext("{} watches queued for rechecking").format(len(uuids)))
@@ -114,7 +114,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint
@@ -222,14 +222,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
@login_optionally_required
def form_delete():
uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if uuid != 'all' and not uuid in datastore.data['watching'].keys():
flash(gettext('The watch by UUID {} does not exist.').format(uuid), 'error')
return redirect(url_for('watchlist.index'))
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
datastore.delete(uuid)
flash(gettext('Deleted.'))
@@ -239,14 +239,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
@login_optionally_required
def form_clone():
uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid)
if not datastore.data['watching'].get(uuid).get('paused'):
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
flash(gettext('Cloned, you are editing the new watch.'))
@@ -262,10 +262,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
if uuid:
# Single watch - check if already queued or running
if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
if worker_pool.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
flash(gettext("Watch is already queued or being checked."))
else:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
flash(gettext("Queued 1 watch for rechecking."))
else:
# Multiple watches - first count how many need to be queued
@@ -284,7 +284,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient)
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
running_uuids = set(worker_pool.get_running_uuids())
# Filter out watches that are already queued or running
watches_to_queue_filtered = []
@@ -294,7 +294,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
# Queue only the filtered watches
for watch_uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
# Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -310,7 +310,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
# 20+ watches - queue in background thread to avoid blocking HTTP response
# Capture queued/running state before background thread
queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids())
running_uuids = set(worker_pool.get_running_uuids())
def queue_watches_background():
"""Background thread to queue watches - discarded after completion."""
@@ -320,7 +320,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
for watch_uuid in watches_to_queue:
# Check if already queued or running (state captured at start)
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
queued_count += 1
else:
skipped_count += 1
@@ -349,7 +349,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
extra_data=extra_data,
queuedWatchMetaData=queuedWatchMetaData,
uuids=uuids,
worker_handler=worker_handler,
worker_pool=worker_pool,
update_q=update_q,
watch_check_update=watch_check_update,
op=op,
@@ -367,9 +367,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
import json
from copy import deepcopy
# more for testing
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# copy it to memory as trim off what we dont need (history)
watch = deepcopy(datastore.data['watching'].get(uuid))

View File

@@ -83,7 +83,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
If a processor doesn't have a difference module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
@@ -144,10 +143,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::render_form()
If a processor doesn't have an extract module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
@@ -200,7 +199,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::process_extraction()
If a processor doesn't have an extract module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
@@ -267,7 +266,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
- /diff/{uuid}/processor-asset/after
- /diff/{uuid}/processor-asset/rendered_diff
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()

View File

@@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.time_handler import is_within_schedule
from changedetectionio import worker_handler
from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
@@ -30,14 +30,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import processors
import importlib
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# More for testing, possible to return the first/only
if not datastore.data['watching'].keys():
flash(gettext("No watches to edit"), "error")
return redirect(url_for('watchlist.index'))
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if not uuid in datastore.data['watching']:
flash(gettext("No watch with the UUID {} found.").format(uuid), "error")
return redirect(url_for('watchlist.index'))
@@ -283,7 +282,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
#############################
if not datastore.data['watching'][uuid].get('paused') and is_in_schedule:
# Queue the watch for immediate recheck, with a higher priority
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff':
@@ -314,7 +313,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
c = [f"processor-{watch.get('processor')}"]
if worker_handler.is_watch_running(uuid):
if worker_pool.is_watch_running(uuid):
c.append('checking-now')
template_args = {
@@ -371,6 +370,8 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from flask import send_file
import brotli
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
watch = datastore.data['watching'].get(uuid)
if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir):
latest_filename = list(watch.history.keys())[-1]
@@ -395,6 +396,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
def watch_get_preview_rendered(uuid):
'''For when viewing the "preview" of the rendered text from inside of Edit'''
from flask import jsonify
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
from changedetectionio.processors.text_json_diff import prepare_filter_prevew
result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore)
return jsonify(result)

View File

@@ -26,10 +26,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/preview.py::render()
If a processor doesn't have a preview module, falls back to default text preview.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
@@ -150,10 +149,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
"""
from flask import make_response
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:

View File

@@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, url_for, flash
from flask_babel import gettext
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio import worker_handler
from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update):
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag')))
else:
# Straight into the queue.
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
flash(gettext("Watch added."))
return redirect(url_for('watchlist.index', tag=request.args.get('tag','')))

View File

@@ -223,7 +223,7 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
<span class="processor-badge processor-badge-{{ watch['processor'] }}" title="{{ processor_descriptions.get(watch['processor'], watch['processor']) }}">{{ processor_badge_texts[watch['processor']] }}</span>
{%- endif -%}
{%- for watch_tag_uuid, watch_tag in datastore.get_all_tags_for_watch(watch['uuid']).items() -%}
<span class="watch-tag-list tag-{{ watch_tag.title|sanitize_tag_class }}">{{ watch_tag.title }}</span>
<a href="{{url_for('watchlist.index', tag=watch_tag_uuid) }}" class="watch-tag-list tag-{{ watch_tag.title|sanitize_tag_class }}">{{ watch_tag.title }}</a>
{%- endfor -%}
</div>
<div class="status-icons">

View File

@@ -55,6 +55,26 @@ class fetcher(Fetcher):
session = requests.Session()
# Configure retry adapter for low-level network errors only
# Retries connection timeouts, read timeouts, connection resets - not HTTP status codes
# Especially helpful in parallel test execution when servers are slow/overloaded
# Configurable via REQUESTS_RETRY_MAX_COUNT (default: 3 attempts)
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
max_retries = int(os.getenv("REQUESTS_RETRY_MAX_COUNT", "3"))
retry_strategy = Retry(
total=max_retries,
connect=max_retries, # Retry connection timeouts
read=max_retries, # Retry read timeouts
status=0, # Don't retry on HTTP status codes
backoff_factor=0.3, # Wait 0.3s, 0.6s, 1.2s between retries
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
from requests_file import FileAdapter
@@ -142,10 +162,11 @@ class fetcher(Fetcher):
watch_uuid=None,
):
"""Async wrapper that runs the synchronous requests code in a thread pool"""
loop = asyncio.get_event_loop()
# Run the synchronous _run_sync in a thread pool to avoid blocking the event loop
# Retry logic is handled by requests' HTTPAdapter (see _run_sync for configuration)
await loop.run_in_executor(
None, # Use default ThreadPoolExecutor
lambda: self._run_sync(

View File

@@ -14,7 +14,7 @@ from pathlib import Path
from changedetectionio.strtobool import strtobool
from threading import Event
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from flask import (
Flask,
@@ -195,7 +195,7 @@ def _jinja2_filter_format_number_locale(value: float) -> str:
@app.template_global('is_checking_now')
def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"):
return worker_handler.is_watch_running(watch_obj['uuid'])
return worker_pool.is_watch_running(watch_obj['uuid'])
@app.template_global('get_watch_queue_position')
def _get_watch_queue_position(watch_obj):
@@ -206,13 +206,13 @@ def _get_watch_queue_position(watch_obj):
@app.template_global('get_current_worker_count')
def _get_current_worker_count():
"""Get the current number of operational workers"""
return worker_handler.get_worker_count()
return worker_pool.get_worker_count()
@app.template_global('get_worker_status_info')
def _get_worker_status_info():
"""Get detailed worker status information for display"""
status = worker_handler.get_worker_status()
running_uuids = worker_handler.get_running_uuids()
status = worker_pool.get_worker_status()
running_uuids = worker_pool.get_running_uuids()
return {
'count': status['worker_count'],
@@ -801,7 +801,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update))
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
@@ -838,10 +838,10 @@ def changedetection_app(config=None, datastore_o=None):
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
# Get basic status
status = worker_handler.get_worker_status()
status = worker_pool.get_worker_status()
# Perform health check
health_result = worker_handler.check_worker_health(
health_result = worker_pool.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
@@ -905,14 +905,24 @@ def changedetection_app(config=None, datastore_o=None):
# Can be overridden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
logger.info(f"Starting {n_workers} workers during app initialization")
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
worker_pool.start_workers(n_workers, update_q, notification_q, app, datastore)
# Skip background threads in batch mode (just process queue and exit)
batch_mode = app.config.get('batch_mode', False)
if not batch_mode:
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
# Start configurable number of notification workers (default 1)
notification_workers = int(os.getenv("NOTIFICATION_WORKERS", "1"))
for i in range(notification_workers):
threading.Thread(
target=notification_runner,
args=(i,),
daemon=True,
name=f"NotificationRunner-{i}"
).start()
logger.info(f"Started {notification_workers} notification worker(s)")
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
@@ -954,14 +964,14 @@ def check_for_new_version():
app.config.exit.wait(86400)
def notification_runner():
def notification_runner(worker_id=0):
global notification_debug_log
from datetime import datetime
import json
with app.app_context():
while not app.config.exit.is_set():
try:
# At the moment only one thread runs (single runner)
# Multiple workers can run concurrently (configurable via NOTIFICATION_WORKERS)
n_object = notification_q.get(block=False)
except queue.Empty:
app.config.exit.wait(1)
@@ -987,7 +997,7 @@ def notification_runner():
sent_obj = process_notification(n_object, datastore)
except Exception as e:
logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}")
logger.error(f"Notification worker {worker_id} - Watch URL: {n_object['watch_url']} Error {str(e)}")
# UUID wont be present when we submit a 'test' from the global settings
if 'uuid' in n_object:
@@ -1028,7 +1038,7 @@ def ticker_thread_check_time_launch_checks():
now = time.time()
if now - last_health_check > 60:
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
health_result = worker_handler.check_worker_health(
health_result = worker_pool.check_worker_health(
expected_count=expected_workers,
update_q=update_q,
notification_q=notification_q,
@@ -1047,7 +1057,7 @@ def ticker_thread_check_time_launch_checks():
continue
# Get a list of watches by UUID that are currently fetching data
running_uuids = worker_handler.get_running_uuids()
running_uuids = worker_pool.get_running_uuids()
# Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch
queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue}
@@ -1153,7 +1163,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time())
# Into the queue with you
queued_successfully = worker_handler.queue_item_async_safe(update_q,
queued_successfully = worker_pool.queue_item_async_safe(update_q,
queuedWatchMetaData.PrioritizedItem(priority=priority,
item={'uuid': uuid})
)

View File

@@ -29,7 +29,7 @@ class model(dict):
'proxy': None, # Preferred proxy connection
'time_between_check': {'weeks': None, 'days': None, 'hours': 3, 'minutes': None, 'seconds': None},
'timeout': int(getenv("DEFAULT_SETTINGS_REQUESTS_TIMEOUT", "45")), # Default 45 seconds
'workers': int(getenv("DEFAULT_SETTINGS_REQUESTS_WORKERS", "10")), # Number of threads, lower is better for slow connections
'workers': int(getenv("DEFAULT_SETTINGS_REQUESTS_WORKERS", "5")), # Number of threads, lower is better for slow connections
'default_ua': {
'html_requests': getenv("DEFAULT_SETTINGS_HEADERS_USERAGENT", DEFAULT_SETTINGS_HEADERS_USERAGENT),
'html_webdriver': None,

View File

@@ -5,51 +5,57 @@ import heapq
import queue
import threading
try:
import janus
except ImportError:
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus")
raise
# Janus is no longer required - we use pure threading.Queue for multi-loop support
# try:
# import janus
# except ImportError:
# pass # Not needed anymore
class RecheckPriorityQueue:
"""
Ultra-reliable priority queue using janus for async/sync bridging.
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because:
- sync_q: Used by Flask routes, ticker threads, and other synchronous code
- async_q: Used by async workers (the actual fetchers/processors) and coroutines
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts:
- Synchronous code (Flask, threads) cannot use async methods without blocking
- Async code cannot use sync methods without blocking the event loop
- janus provides the only safe bridge between these two worlds
Attempting to unify to async-only would require:
- Converting all Flask routes to async (major breaking change)
- Using asyncio.run() in sync contexts (causes deadlocks)
- Thread-pool wrapping (adds complexity and overhead)
Minimal implementation focused on reliability:
- Pure janus for sync/async bridge
- Thread-safe priority ordering
- Bulletproof error handling with critical logging
Thread-safe priority queue supporting multiple async event loops.
ARCHITECTURE:
- Multiple async workers, each with its own event loop in its own thread
- Hybrid sync/async design for maximum scalability
- Sync interface for ticker thread (threading.Queue)
- Async interface for workers (asyncio.Event - NO executor threads!)
SCALABILITY:
- Scales to 100-200+ workers without executor thread exhaustion
- Async workers wait on asyncio.Event (pure coroutines, no threads)
- Sync callers use threading.Queue (backward compatible)
WHY NOT JANUS:
- Janus binds to ONE event loop at creation time
- Our architecture has 15+ workers, each with separate event loops
- Workers in different threads/loops cannot share janus async interface
WHY NOT RUN_IN_EXECUTOR:
- With 200 workers, run_in_executor() would block 200 threads
- Exhausts ThreadPoolExecutor, starves Flask HTTP handlers
- Pure async approach uses 0 threads while waiting
"""
def __init__(self, maxsize: int = 0):
try:
self._janus_queue = janus.Queue(maxsize=maxsize)
# BOTH interfaces required - see class docstring for why
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread
self.async_q = self._janus_queue.async_q # Async workers
import asyncio
# Sync interface: threading.Queue for ticker thread and Flask routes
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
# Priority storage - thread-safe
self._priority_items = []
self._lock = threading.RLock()
# No event signaling needed - pure polling approach
# Workers check queue every 50ms (latency acceptable: 0-500ms)
# Scales to 1000+ workers: each sleeping worker = ~4KB coroutine, not thread
# Signals for UI updates
self.queue_length_signal = signal('queue_length')
logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
@@ -58,38 +64,48 @@ class RecheckPriorityQueue:
# SYNC INTERFACE (for ticker thread)
def put(self, item, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with priority ordering"""
logger.trace(f"RecheckQueue.put() called for item: {self._get_item_uuid(item)}, block={block}, timeout={timeout}")
try:
# Add to priority storage
# CRITICAL: Add to both priority storage AND notification queue atomically
# to prevent desynchronization where item exists but no notification
with self._lock:
heapq.heappush(self._priority_items, item)
# Notify via janus sync queue
self.sync_q.put(True, block=block, timeout=timeout)
# Emit signals
self._emit_put_signals(item)
# Add notification - use blocking with timeout for safety
# Notification queue is unlimited size, so should never block in practice
# but timeout ensures we detect any unexpected issues (deadlock, etc)
try:
self._notification_queue.put(True, block=True, timeout=5.0)
except Exception as notif_e:
# Notification failed - MUST remove from priority_items to keep in sync
# This prevents "Priority queue inconsistency" errors in get()
logger.critical(f"CRITICAL: Notification queue put failed, removing from priority_items: {notif_e}")
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
raise # Re-raise to be caught by outer exception handler
# Signal emission after successful queue - log but don't fail the operation
# Item is already safely queued, so signal failure shouldn't affect queue state
try:
self._emit_put_signals(item)
except Exception as signal_e:
logger.error(f"Failed to emit put signals but item queued successfully: {signal_e}")
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {type(e).__name__}: {str(e)}")
# Item should have been cleaned up in the inner try/except if notification failed
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get with priority ordering"""
import queue
logger.trace(f"RecheckQueue.get() called, block={block}, timeout={timeout}")
import queue as queue_module
try:
# Wait for notification
self.sync_q.get(block=block, timeout=timeout)
# Wait for notification (this doesn't return the actual item, just signals availability)
self._notification_queue.get(block=block, timeout=timeout)
# Get highest priority item
with self._lock:
@@ -98,69 +114,91 @@ class RecheckPriorityQueue:
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
# Signal emission after successful retrieval - log but don't lose the item
# Item is already retrieved, so signal failure shouldn't affect queue state
try:
self._emit_get_signals()
except Exception as signal_e:
logger.error(f"Failed to emit get signals but item retrieved successfully: {signal_e}")
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
logger.trace(f"RecheckQueue.get() successfully retrieved item: {self._get_item_uuid(item)}")
return item
except queue.Empty:
# Queue is empty with timeout - expected behavior, re-raise without logging
raise
except queue_module.Empty:
# Queue is empty with timeout - expected behavior
logger.trace(f"RecheckQueue.get() timed out - queue is empty (timeout={timeout})")
raise # noqa
except Exception as e:
# Re-raise without logging - caller (worker) will handle and log appropriately
logger.trace(f"RecheckQueue.get() failed with exception: {type(e).__name__}: {str(e)}")
raise
# ASYNC INTERFACE (for workers)
async def async_put(self, item):
"""Pure async put with priority ordering"""
async def async_put(self, item, executor=None):
"""Async put with priority ordering - uses thread pool to avoid blocking
Args:
item: Item to add to queue
executor: Optional ThreadPoolExecutor. If None, uses default pool.
"""
logger.trace(f"RecheckQueue.async_put() called for item: {self._get_item_uuid(item)}, executor={executor}")
import asyncio
try:
# Add to priority storage
with self._lock:
heapq.heappush(self._priority_items, item)
# Notify via janus async queue
await self.async_q.put(True)
# Emit signals
self._emit_put_signals(item)
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
return True
# Use run_in_executor to call sync put without blocking event loop
loop = asyncio.get_event_loop()
result = await loop.run_in_executor(
executor, # Use provided executor or default
lambda: self.put(item, block=True, timeout=5.0)
)
logger.trace(f"RecheckQueue.async_put() successfully queued item: {self._get_item_uuid(item)}")
return result
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
return False
async def async_get(self):
"""Pure async get with priority ordering"""
async def async_get(self, executor=None, timeout=1.0):
"""
Efficient async get using executor for blocking call.
HYBRID APPROACH: Best of both worlds
- Uses run_in_executor for efficient blocking (no polling overhead)
- Single timeout (no double-timeout race condition)
- Scales well: executor sized to match worker count
With FETCH_WORKERS=10: 10 threads blocked max (acceptable)
With FETCH_WORKERS=200: Need executor with 200+ threads (see worker_pool.py)
Args:
executor: ThreadPoolExecutor (sized to match worker count)
timeout: Maximum time to wait in seconds
Returns:
Item from queue
Raises:
queue.Empty: If timeout expires with no item available
"""
logger.trace(f"RecheckQueue.async_get() called, timeout={timeout}")
import asyncio
try:
# Wait for notification
await self.async_q.get()
# Use run_in_executor to call sync get efficiently
# No outer asyncio.wait_for wrapper = no double timeout issue!
loop = asyncio.get_event_loop()
item = await loop.run_in_executor(
executor,
lambda: self.get(block=True, timeout=timeout)
)
# Get highest priority item
with self._lock:
if not self._priority_items:
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
logger.trace(f"RecheckQueue.async_get() successfully retrieved item: {self._get_item_uuid(item)}")
return item
except queue.Empty:
logger.trace(f"RecheckQueue.async_get() timed out - queue is empty")
raise
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}")
logger.critical(f"CRITICAL: Failed to async get item from queue: {type(e).__name__}: {str(e)}")
raise
# UTILITY METHODS
@@ -186,10 +224,35 @@ class RecheckPriorityQueue:
logger.critical(f"CRITICAL: Failed to get queued UUIDs: {str(e)}")
return []
def close(self):
"""Close the janus queue"""
def clear(self):
"""Clear all items from both priority storage and notification queue"""
try:
self._janus_queue.close()
with self._lock:
# Clear priority items
self._priority_items.clear()
# Drain all notifications to prevent stale notifications
# This is critical for test cleanup to prevent queue desynchronization
drained = 0
while not self._notification_queue.empty():
try:
self._notification_queue.get_nowait()
drained += 1
except queue.Empty:
break
if drained > 0:
logger.debug(f"Cleared queue: removed {drained} notifications")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to clear queue: {str(e)}")
return False
def close(self):
"""Close the queue"""
try:
# Nothing to close for threading.Queue
logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
@@ -321,7 +384,7 @@ class RecheckPriorityQueue:
except Exception:
pass
return 'unknown'
def _emit_put_signals(self, item):
"""Emit signals when item is added"""
try:
@@ -330,14 +393,14 @@ class RecheckPriorityQueue:
watch_check_update = signal('watch_check_update')
if watch_check_update:
watch_check_update.send(watch_uuid=item.item['uuid'])
# Queue length signal
if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize())
except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
def _emit_get_signals(self):
"""Emit signals when item is removed"""
try:
@@ -363,12 +426,11 @@ class NotificationQueue:
def __init__(self, maxsize: int = 0, datastore=None):
try:
self._janus_queue = janus.Queue(maxsize=maxsize)
# BOTH interfaces required - see class docstring for why
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
self.async_q = self._janus_queue.async_q # Async workers
# Use pure threading.Queue to avoid event loop binding issues
self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
self.notification_event_signal = signal('notification_event')
self.datastore = datastore # For checking all_muted setting
self._lock = threading.RLock()
logger.debug("NotificationQueue initialized successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
@@ -380,72 +442,97 @@ class NotificationQueue:
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with signal emission"""
logger.trace(f"NotificationQueue.put() called for item: {item.get('uuid', 'unknown')}, block={block}, timeout={timeout}")
try:
# Check if all notifications are muted
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False
self.sync_q.put(item, block=block, timeout=timeout)
with self._lock:
self._notification_queue.put(item, block=block, timeout=timeout)
self._emit_notification_signal(item)
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}")
logger.trace(f"NotificationQueue.put() successfully queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
async def async_put(self, item: Dict[str, Any]):
"""Pure async put with signal emission"""
async def async_put(self, item: Dict[str, Any], executor=None):
"""Async put with signal emission - uses thread pool
Args:
item: Notification item to queue
executor: Optional ThreadPoolExecutor
"""
logger.trace(f"NotificationQueue.async_put() called for item: {item.get('uuid', 'unknown')}, executor={executor}")
import asyncio
try:
# Check if all notifications are muted
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False
await self.async_q.put(item)
self._emit_notification_signal(item)
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}")
loop = asyncio.get_event_loop()
await loop.run_in_executor(executor, lambda: self.put(item, block=True, timeout=5.0))
logger.trace(f"NotificationQueue.async_put() successfully queued notification: {item.get('uuid', 'unknown')}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False
def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get"""
logger.trace(f"NotificationQueue.get() called, block={block}, timeout={timeout}")
try:
return self.sync_q.get(block=block, timeout=timeout)
with self._lock:
item = self._notification_queue.get(block=block, timeout=timeout)
logger.trace(f"NotificationQueue.get() retrieved item: {item.get('uuid', 'unknown') if isinstance(item, dict) else 'unknown'}")
return item
except queue.Empty as e:
logger.trace(f"NotificationQueue.get() timed out - queue is empty (timeout={timeout})")
raise e
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}")
logger.critical(f"CRITICAL: Failed to get notification: {type(e).__name__}: {str(e)}")
raise e
async def async_get(self):
"""Pure async get"""
async def async_get(self, executor=None):
"""Async get - uses thread pool
Args:
executor: Optional ThreadPoolExecutor
"""
logger.trace(f"NotificationQueue.async_get() called, executor={executor}")
import asyncio
try:
return await self.async_q.get()
loop = asyncio.get_event_loop()
item = await loop.run_in_executor(executor, lambda: self.get(block=True, timeout=1.0))
logger.trace(f"NotificationQueue.async_get() retrieved item: {item.get('uuid', 'unknown') if isinstance(item, dict) else 'unknown'}")
return item
except queue.Empty as e:
logger.trace(f"NotificationQueue.async_get() timed out - queue is empty")
raise e
except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}")
logger.critical(f"CRITICAL: Failed to async get notification: {type(e).__name__}: {str(e)}")
raise e
def qsize(self) -> int:
"""Get current queue size"""
try:
return self.sync_q.qsize()
with self._lock:
return self._notification_queue.qsize()
except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
return 0
def empty(self) -> bool:
"""Check if queue is empty"""
return self.qsize() == 0
def close(self):
"""Close the janus queue"""
"""Close the queue"""
try:
self._janus_queue.close()
# Nothing to close for threading.Queue
logger.debug("NotificationQueue closed successfully")
except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")

View File

@@ -37,9 +37,9 @@ def register_watch_operation_handlers(socketio, datastore):
# Import here to avoid circular imports
from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler
from changedetectionio import worker_pool
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
logger.info(f"Socket.IO: Queued recheck for watch {uuid}")
else:
emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'})

View File

@@ -145,10 +145,10 @@ def handle_watch_update(socketio, **kwargs):
# Emit the watch update to all connected clients
from changedetectionio.flask_app import update_q
from changedetectionio.flask_app import _jinja2_filter_datetime
from changedetectionio import worker_handler
from changedetectionio import worker_pool
# Get list of watches that are currently running
running_uuids = worker_handler.get_running_uuids()
running_uuids = worker_pool.get_running_uuids()
# Get list of watches in the queue (efficient single-lock method)
queue_list = update_q.get_queued_uuids()
@@ -252,7 +252,7 @@ def init_socketio(app, datastore):
def event_checkbox_operations(data):
from changedetectionio.blueprint.ui import _handle_operations
from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, watch_check_update
import threading
@@ -268,7 +268,7 @@ def init_socketio(app, datastore):
uuids=data.get('uuids'),
datastore=datastore,
extra_data=data.get('extra_data'),
worker_handler=worker_handler,
worker_pool=worker_pool,
update_q=update_q,
queuedWatchMetaData=queuedWatchMetaData,
watch_check_update=watch_check_update,

View File

@@ -10,6 +10,7 @@
set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
rm tests/logs/* -f
# Since theres no curl installed lets roll with python3
check_sanity() {
@@ -66,16 +67,19 @@ echo "-------------------- Running rest of tests in parallel -------------------
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
REMOVE_REQUESTS_OLD_SCREENSHOTS=false \
pytest tests/test_*.py \
-n 30 \
-n 12 \
--dist=load \
-vvv \
-s \
--capture=no \
--ignore=tests/test_queue_handler.py \
--log-cli-level=DEBUG \
--log-cli-format="%(asctime)s [%(process)d] [%(levelname)s] %(name)s: %(message)s"
echo "---------------------------- DONE parallel test ---------------------------------------"
FETCH_WORKERS=20 pytest -vvv -s tests/test_queue_handler.py
echo "RUNNING WITH BASE_URL SET"
# Now re-run some tests with BASE_URL enabled

View File

@@ -222,6 +222,19 @@ code {
color: var(--color-white);
background: var(--color-text-watch-tag-list);
@extend .inline-tag;
/* Remove default anchor styling when used as links */
text-decoration: none;
&:hover {
text-decoration: none;
opacity: 0.8;
cursor: pointer;
}
&:visited {
color: var(--color-white);
}
}
@media (min-width: 768px) {

View File

@@ -166,6 +166,9 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
"""
logger.info(f"Datastore path is '{datastore_path}'")
# CRITICAL: Update datastore_path (was using old path from __init__)
self.datastore_path = datastore_path
# Initialize data structure
self.__data = App.model()
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
@@ -218,21 +221,16 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
# Load the legacy datastore to get its schema_version
from .legacy_loader import load_legacy_format
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
legacy_data = load_legacy_format(legacy_path)
with open(legacy_path) as f:
self.__data = json.load(f)
if not legacy_data:
if not self.__data:
raise Exception("Failed to load legacy datastore from url-watches.json")
# Get the schema version from legacy datastore (defaults to 0 if not present)
legacy_schema_version = legacy_data.get('settings', {}).get('application', {}).get('schema_version', 0)
logger.info(f"Legacy datastore schema version: {legacy_schema_version}")
# Set our schema version to match the legacy one
self.__data['settings']['application']['schema_version'] = legacy_schema_version
# update_26 will load the legacy data again and migrate to new format
# Only run updates AFTER the legacy schema version (e.g., if legacy is at 25, only run 26+)
self.run_updates(current_schema_version=legacy_schema_version)
self.run_updates()
else:
# Fresh install - create new datastore
@@ -307,7 +305,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
else:
watch_class = get_custom_watch_obj_for_processor(entity.get('processor'))
if entity.get('uuid') != 'text_json_diff':
if entity.get('processor') != 'text_json_diff':
logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}")
entity = watch_class(datastore_path=self.datastore_path, default=entity)
@@ -373,6 +371,13 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
self.__data['watching'] = watching
self._watch_hashes = watch_hashes
# Verify all watches have hashes
missing_hashes = [uuid for uuid in watching.keys() if uuid not in watch_hashes]
if missing_hashes:
logger.error(f"WARNING: {len(missing_hashes)} watches missing hashes after load: {missing_hashes[:5]}")
else:
logger.debug(f"All {len(watching)} watches have valid hashes")
def _delete_watch(self, uuid):
"""
Delete a watch from storage.

View File

@@ -16,11 +16,11 @@ import os
import tempfile
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from distutils.util import strtobool
from threading import Thread
from loguru import logger
from .base import DataStore
from .. import strtobool
# Try to import orjson for faster JSON serialization
try:
@@ -322,8 +322,9 @@ def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
if watch and raw_data:
watching[uuid_dir] = watch
# Compute hash from raw data BEFORE rehydration to match saved hash
watch_hashes[uuid_dir] = compute_hash_func(raw_data)
# Compute hash from rehydrated Watch object (as dict) to match how we compute on save
# This ensures hash matches what audit will compute from dict(watch)
watch_hashes[uuid_dir] = compute_hash_func(dict(watch))
loaded += 1
if loaded % 100 == 0:
@@ -743,7 +744,7 @@ class FileSavingDataStore(DataStore):
self._dirty_watches.add(uuid)
changes_found += 1
logger.warning(
f"Audit detected unmarked change in watch {uuid[:8]}... "
f"Audit detected unmarked change in watch {uuid[:8]}... current {current_hash:8} stored hash {stored_hash[:8]}"
f"(hash changed but not marked dirty)"
)
self.needs_write = True

View File

@@ -534,7 +534,7 @@ class DatastoreUpdatesMixin:
logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...")
shutil.move(old_history_txt, new_history_txt)
def update_26(self):
def migrate_legacy_db_format(self):
"""
Migration: Individual watch persistence (COPY-based, safe rollback).
@@ -578,25 +578,6 @@ class DatastoreUpdatesMixin:
# Populate settings from legacy data
logger.info("Populating settings from legacy data...")
if 'settings' in legacy_data:
self.data['settings'] = legacy_data['settings']
if 'app_guid' in legacy_data:
self.data['app_guid'] = legacy_data['app_guid']
if 'build_sha' in legacy_data:
self.data['build_sha'] = legacy_data['build_sha']
if 'version_tag' in legacy_data:
self.data['version_tag'] = legacy_data['version_tag']
# Rehydrate watches from legacy data
logger.info("Rehydrating watches from legacy data...")
self.data['watching'] = {}
for uuid, watch_data in legacy_data.get('watching', {}).items():
try:
self.data['watching'][uuid] = self.rehydrate_entity(uuid, watch_data)
except Exception as e:
logger.error(f"Failed to rehydrate watch {uuid}: {e}")
raise Exception(f"Migration failed: Could not rehydrate watch {uuid}. Error: {e}")
watch_count = len(self.data['watching'])
logger.success(f"Loaded {watch_count} watches from legacy format")
@@ -609,12 +590,10 @@ class DatastoreUpdatesMixin:
watch_dict = dict(watch)
watch_dir = os.path.join(self.datastore_path, uuid)
save_watch_atomic(watch_dir, uuid, watch_dict)
# Initialize hash
self._watch_hashes[uuid] = self._compute_hash(watch_dict)
saved_count += 1
if saved_count % 100 == 0:
logger.info(f" Progress: {saved_count}/{watch_count} watches saved...")
logger.info(f" Progress: {saved_count}/{watch_count} watches migrated...")
except Exception as e:
logger.error(f"Failed to save watch {uuid}: {e}")
@@ -667,9 +646,25 @@ class DatastoreUpdatesMixin:
# Success! Now reload from new format
logger.critical("Reloading datastore from new format...")
self._load_state()
self._load_state() # Includes load_watches
logger.success("Datastore reloaded from new format successfully")
# Verify all watches have hashes after migration
missing_hashes = [uuid for uuid in self.data['watching'].keys() if uuid not in self._watch_hashes]
if missing_hashes:
logger.error(f"WARNING: {len(missing_hashes)} watches missing hashes after migration: {missing_hashes[:5]}")
else:
logger.success(f"All {len(self.data['watching'])} watches have valid hashes after migration")
# Set schema version to latest available update
# This prevents re-running updates and re-marking all watches as dirty
updates_available = self.get_updates_available()
latest_schema = updates_available[-1] if updates_available else 26
self.data['settings']['application']['schema_version'] = latest_schema
self.mark_settings_dirty()
logger.info(f"Set schema_version to {latest_schema} (migration complete, all watches already saved)")
logger.critical("=" * 80)
logger.critical("MIGRATION COMPLETED SUCCESSFULLY!")
logger.critical("=" * 80)
@@ -687,4 +682,5 @@ class DatastoreUpdatesMixin:
logger.info(f" - rm {os.path.join(self.datastore_path, 'url-watches.json')}")
logger.info("")
# Schema version will be updated by run_updates()
def update_26(self):
self.migrate_legacy_db_format()

View File

@@ -70,8 +70,8 @@ test_single_url() {
local test_id=$1
local dir="/tmp/cli-test-single-${test_id}-$$"
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 1 ]
# Count watch directories (UUID directories containing watch.json)
[ "$(find "$dir" -mindepth 2 -maxdepth 2 -name 'watch.json' | wc -l)" -eq 1 ]
}
test_multiple_urls() {
@@ -82,8 +82,8 @@ test_multiple_urls() {
-u https://github.com \
-u https://httpbin.org \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 3 ]
# Count watch directories (UUID directories containing watch.json)
[ "$(find "$dir" -mindepth 2 -maxdepth 2 -name 'watch.json' | wc -l)" -eq 3 ]
}
test_url_with_options() {
@@ -93,8 +93,17 @@ test_url_with_options() {
-u https://example.com \
-u0 '{"title":"Test Site","processor":"text_json_diff"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); exit(0 if any(w.get('title')=='Test Site' for w in watches.values()) else 1)"
# Check that at least one watch.json contains the title "Test Site"
python3 -c "
import json, glob, sys
watch_files = glob.glob('$dir/*/watch.json')
for wf in watch_files:
with open(wf) as f:
data = json.load(f)
if data.get('title') == 'Test Site':
sys.exit(0)
sys.exit(1)
"
}
test_multiple_urls_with_options() {
@@ -106,9 +115,19 @@ test_multiple_urls_with_options() {
-u https://github.com \
-u1 '{"title":"Site Two"}' \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ] && \
python3 -c "import json; data=json.load(open('$dir/url-watches.json')); watches=data.get('watching', {}); titles=[w.get('title') for w in watches.values()]; exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)"
# Check that we have 2 watches and both titles are present
python3 -c "
import json, glob, sys
watch_files = glob.glob('$dir/*/watch.json')
if len(watch_files) != 2:
sys.exit(1)
titles = []
for wf in watch_files:
with open(wf) as f:
data = json.load(f)
titles.append(data.get('title'))
sys.exit(0 if 'Site One' in titles and 'Site Two' in titles else 1)
"
}
test_batch_mode_exit() {
@@ -126,21 +145,24 @@ test_batch_mode_exit() {
test_recheck_all() {
local test_id=$1
local dir="/tmp/cli-test-recheck-all-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"test-uuid":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"test-uuid"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing all"
# Create a watch using CLI, then recheck it
timeout 10 python3 changedetection.py -d "$dir" -C -u https://example.com -b &>/dev/null
# Now recheck all watches
timeout 10 python3 changedetection.py -d "$dir" -r all -b 2>&1 | grep -q "Queuing"
}
test_recheck_specific() {
local test_id=$1
local dir="/tmp/cli-test-recheck-uuid-${test_id}-$$"
mkdir -p "$dir"
cat > "$dir/url-watches.json" << 'EOF'
{"watching":{"uuid-1":{"url":"https://example.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-1"},"uuid-2":{"url":"https://github.com","last_checked":0,"processor":"text_json_diff","uuid":"uuid-2"}},"settings":{"application":{"password":false}}}
EOF
timeout 10 python3 changedetection.py -d "$dir" -r uuid-1,uuid-2 -b 2>&1 | grep -q "Queuing 2 specific watches"
# Create 2 watches using CLI
timeout 12 python3 changedetection.py -d "$dir" -C \
-u https://example.com \
-u https://github.com \
-b &>/dev/null
# Get the UUIDs that were created
local uuids=$(find "$dir" -mindepth 2 -maxdepth 2 -name 'watch.json' -exec dirname {} \; | xargs -n1 basename | tr '\n' ',' | sed 's/,$//')
# Now recheck specific UUIDs
timeout 10 python3 changedetection.py -d "$dir" -r "$uuids" -b 2>&1 | grep -q "Queuing"
}
test_combined_operations() {
@@ -151,8 +173,8 @@ test_combined_operations() {
-u https://github.com \
-r all \
-b &>/dev/null
[ -f "$dir/url-watches.json" ] && \
[ "$(python3 -c "import json; print(len(json.load(open('$dir/url-watches.json')).get('watching', {})))")" -eq 2 ]
# Count watch directories (UUID directories containing watch.json)
[ "$(find "$dir" -mindepth 2 -maxdepth 2 -name 'watch.json' | wc -l)" -eq 2 ]
}
test_invalid_json() {

View File

@@ -9,6 +9,11 @@ from changedetectionio import store
import os
import sys
# CRITICAL: Set short timeout for tests to prevent 45-second hangs
# When test server is slow/unresponsive, workers fail fast instead of holding UUIDs for 45s
# This prevents exponential priority growth from repeated deferrals (priority × 10 each defer)
os.environ['DEFAULT_SETTINGS_REQUESTS_TIMEOUT'] = '5'
from changedetectionio.flask_app import init_app_secret, changedetection_app
from changedetectionio.tests.util import live_server_setup, new_live_server_setup
@@ -29,6 +34,93 @@ def reportlog(pytestconfig):
logger.remove(handler_id)
@pytest.fixture(autouse=True)
def per_test_log_file(request):
"""Create a separate log file for each test function with pytest output."""
import re
# Create logs directory if it doesn't exist
log_dir = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(log_dir, exist_ok=True)
# Generate log filename from test name and worker ID (for parallel runs)
test_name = request.node.name
# Sanitize test name - replace unsafe characters with underscores
# Keep only alphanumeric, dash, underscore, and period
safe_test_name = re.sub(r'[^\w\-.]', '_', test_name)
# Limit length to avoid filesystem issues (max 200 chars)
if len(safe_test_name) > 200:
# Keep first 150 chars + hash of full name + last 30 chars
import hashlib
name_hash = hashlib.md5(test_name.encode()).hexdigest()[:8]
safe_test_name = f"{safe_test_name[:150]}_{name_hash}_{safe_test_name[-30:]}"
worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
log_file = os.path.join(log_dir, f"{safe_test_name}_{worker_id}.log")
# Add file handler for this test with TRACE level
handler_id = logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {process} | {name}:{function}:{line} - {message}",
level="TRACE",
mode="w", # Overwrite if exists
enqueue=True # Thread-safe
)
logger.info(f"=== Starting test: {test_name} (worker: {worker_id}) ===")
logger.info(f"Test location: {request.node.nodeid}")
yield
# Capture test outcome (PASSED/FAILED/SKIPPED/ERROR)
outcome = "UNKNOWN"
exc_info = None
stdout = None
stderr = None
if hasattr(request.node, 'rep_call'):
outcome = request.node.rep_call.outcome.upper()
if request.node.rep_call.failed:
exc_info = request.node.rep_call.longreprtext
# Capture stdout/stderr from call phase
if hasattr(request.node.rep_call, 'sections'):
for section_name, section_content in request.node.rep_call.sections:
if 'stdout' in section_name.lower():
stdout = section_content
elif 'stderr' in section_name.lower():
stderr = section_content
elif hasattr(request.node, 'rep_setup'):
if request.node.rep_setup.failed:
outcome = "SETUP_FAILED"
exc_info = request.node.rep_setup.longreprtext
logger.info(f"=== Test Result: {outcome} ===")
if exc_info:
logger.error(f"=== Test Failure Details ===\n{exc_info}")
if stdout:
logger.info(f"=== Captured stdout ===\n{stdout}")
if stderr:
logger.warning(f"=== Captured stderr ===\n{stderr}")
logger.info(f"=== Finished test: {test_name} ===")
logger.remove(handler_id)
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Hook to capture test results and attach to the test node."""
outcome = yield
rep = outcome.get_result()
# Store report on the test node for access in fixtures
setattr(item, f"rep_{rep.when}", rep)
@pytest.fixture
def environment(mocker):
"""Mock arrow.now() to return a fixed datetime for testing jinja2 time extension."""
@@ -165,6 +257,57 @@ def prepare_test_function(live_server, datastore_path):
except:
break
# Add test helper methods to the app for worker management
def set_workers(count):
"""Set the number of workers for testing - brutal shutdown, no delays"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
current_count = worker_pool.get_worker_count()
# Special case: Setting to 0 means shutdown all workers brutally
if count == 0:
logger.debug(f"Brutally shutting down all {current_count} workers")
worker_pool.shutdown_workers()
return {
'status': 'success',
'message': f'Shutdown all {current_count} workers',
'previous_count': current_count,
'current_count': 0
}
# Adjust worker count (no delays, no verification)
result = worker_pool.adjust_async_worker_count(
count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
return result
def check_all_workers_alive(expected_count):
"""Check that all expected workers are alive"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
result = worker_pool.check_worker_health(
expected_count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
assert result['status'] == 'healthy', f"Workers not healthy: {result['message']}"
return result
# Attach helper methods to app for easy test access
live_server.app.set_workers = set_workers
live_server.app.check_all_workers_alive = check_all_workers_alive
# Prevent background thread from writing during cleanup/reload
datastore.needs_write = False
datastore.needs_write_urgent = False
@@ -262,8 +405,8 @@ def app(request, datastore_path):
# Shutdown workers gracefully before loguru cleanup
try:
from changedetectionio import worker_handler
worker_handler.shutdown_workers()
from changedetectionio import worker_pool
worker_pool.shutdown_workers()
except Exception:
pass
@@ -311,4 +454,3 @@ def app(request, datastore_path):
yield app

View File

@@ -0,0 +1,166 @@
#!/usr/bin/env python3
"""
Test notification_urls validation in Watch and Tag API endpoints.
Ensures that invalid AppRise URLs are rejected when setting notification_urls.
Valid AppRise notification URLs use specific protocols like:
- posts://example.com - POST to HTTP endpoint
- gets://example.com - GET to HTTP endpoint
- mailto://user@example.com - Email
- slack://token/channel - Slack
- discord://webhook_id/webhook_token - Discord
- etc.
Invalid notification URLs:
- https://example.com - Plain HTTPS is NOT a valid AppRise notification protocol
- ftp://example.com - FTP is NOT a valid AppRise notification protocol
- Plain URLs without proper AppRise protocol prefix
"""
from flask import url_for
import json
def test_watch_notification_urls_validation(client, live_server, measure_memory_usage, datastore_path):
"""Test that Watch PUT/POST endpoints validate notification_urls."""
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Test 1: Create a watch with valid notification URLs
valid_urls = ["posts://example.com/notify1", "posts://example.com/notify2"]
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": "https://example.com",
"notification_urls": valid_urls
}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 201, "Should accept valid notification URLs on watch creation"
watch_uuid = res.json['uuid']
# Verify the notification URLs were saved
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert set(res.json['notification_urls']) == set(valid_urls), "Valid notification URLs should be saved"
# Test 2: Try to create a watch with invalid notification URLs (https:// is not valid)
invalid_urls = ["https://example.com/webhook"]
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": "https://example.com",
"notification_urls": invalid_urls
}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject https:// notification URLs (not a valid AppRise protocol)"
assert b"is not a valid AppRise URL" in res.data, "Should provide AppRise validation error message"
# Test 2b: Also test other invalid protocols
invalid_urls_ftp = ["ftp://not-apprise-url"]
res = client.post(
url_for("createwatch"),
data=json.dumps({
"url": "https://example.com",
"notification_urls": invalid_urls_ftp
}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject ftp:// notification URLs"
assert b"is not a valid AppRise URL" in res.data, "Should provide AppRise validation error message"
# Test 3: Update watch with valid notification URLs
new_valid_urls = ["posts://newserver.com"]
res = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"notification_urls": new_valid_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 200, "Should accept valid notification URLs on watch update"
# Verify the notification URLs were updated
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json['notification_urls'] == new_valid_urls, "Valid notification URLs should be updated"
# Test 4: Try to update watch with invalid notification URLs (plain https:// not valid)
invalid_https_url = ["https://example.com/webhook"]
res = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"notification_urls": invalid_https_url}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject https:// notification URLs on watch update"
assert b"is not a valid AppRise URL" in res.data, "Should provide AppRise validation error message"
# Test 5: Update watch with non-list notification_urls (caught by OpenAPI schema validation)
res = client.put(
url_for("watch", uuid=watch_uuid),
data=json.dumps({"notification_urls": "not-a-list"}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject non-list notification_urls"
assert b"OpenAPI validation failed" in res.data or b"Request body validation error" in res.data
# Test 6: Verify original URLs are preserved after failed update
res = client.get(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json['notification_urls'] == new_valid_urls, "URLs should remain unchanged after validation failure"
def test_tag_notification_urls_validation(client, live_server, measure_memory_usage, datastore_path):
"""Test that Tag PUT endpoint validates notification_urls."""
from changedetectionio.model import Tag
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
datastore = live_server.app.config['DATASTORE']
# Create a tag
tag_uuid = datastore.add_tag(title="Test Tag")
assert tag_uuid is not None
# Test 1: Update tag with valid notification URLs
valid_urls = ["posts://example.com/tag-notify"]
res = client.put(
url_for("tag", uuid=tag_uuid),
data=json.dumps({"notification_urls": valid_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 200, "Should accept valid notification URLs on tag update"
# Verify the notification URLs were saved
tag = datastore.data['settings']['application']['tags'][tag_uuid]
assert tag['notification_urls'] == valid_urls, "Valid notification URLs should be saved to tag"
# Test 2: Try to update tag with invalid notification URLs (https:// not valid)
invalid_urls = ["https://example.com/webhook"]
res = client.put(
url_for("tag", uuid=tag_uuid),
data=json.dumps({"notification_urls": invalid_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject https:// notification URLs on tag update"
assert b"is not a valid AppRise URL" in res.data, "Should provide AppRise validation error message"
# Test 3: Update tag with non-list notification_urls (caught by OpenAPI schema validation)
res = client.put(
url_for("tag", uuid=tag_uuid),
data=json.dumps({"notification_urls": "not-a-list"}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400, "Should reject non-list notification_urls"
assert b"OpenAPI validation failed" in res.data or b"Request body validation error" in res.data
# Test 4: Verify original URLs are preserved after failed update
tag = datastore.data['settings']['application']['tags'][tag_uuid]
assert tag['notification_urls'] == valid_urls, "URLs should remain unchanged after validation failure"

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks, delete_all_watches
import os
@@ -116,7 +116,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# And not this cause its not the ld-json
assert b"So let's see what happens" not in res.data
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
##########################################################################################
# And we shouldnt see the offer
@@ -131,7 +131,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
assert b'ldjson-price-track-offer' not in res.data
##########################################################################################
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
@@ -147,7 +147,7 @@ def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_
##########################################################################################
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_bad_ldjson_is_correctly_ignored(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -414,4 +414,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
assert b'Abonnementen bijwerken' in res.data
assert b'&lt;foobar' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
res = delete_all_watches(client)

View File

@@ -6,7 +6,7 @@ from .util import (
set_original_response,
set_modified_response,
live_server_setup,
wait_for_all_checks
wait_for_all_checks, delete_all_watches
)
from loguru import logger
@@ -104,7 +104,7 @@ def run_socketio_watch_update_test(client, live_server, password_mode="", datast
assert watch.has_unviewed, "The watch was not marked as unviewed after content change"
# Clean up
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_everything(live_server, client, measure_memory_usage, datastore_path):

View File

@@ -69,7 +69,7 @@ def test_conditions_with_text_and_number(client, live_server, measure_memory_usa
# 1. The page filtered text must contain "5" (first digit of value)
# 2. The extracted number should be >= 20 and <= 100
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"url": test_url,
"fetch_backend": "html_requests",
@@ -110,25 +110,20 @@ def test_conditions_with_text_and_number(client, live_server, measure_memory_usa
wait_for_all_checks(client)
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2)
wait_for_all_checks(client)
time.sleep(1)
# Case 1
set_number_in_range_response(datastore_path=datastore_path, number="70.5")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(2)
# 75 is > 20 and < 100 and contains "5"
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
# Case 2: Change with one condition violated
# Number out of range (150) but contains '5'
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2)
set_number_out_of_range_response(datastore_path=datastore_path, number="150.5")
@@ -154,7 +149,6 @@ def test_condition_validate_rule_row(client, live_server, measure_memory_usage,
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# the front end submits the current form state which should override the watch in a temporary copy
res = client.post(
@@ -195,12 +189,8 @@ def test_condition_validate_rule_row(client, live_server, measure_memory_usage,
)
assert res.status_code == 200
assert b'false' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
delete_all_watches(client)
# If there was only a change in the whitespacing, then we shouldnt have a change detected
@@ -230,17 +220,12 @@ def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage,
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
# Assert the word count is counted correctly
assert b'<td>13</td>' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
delete_all_watches(client)
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_lev_conditions_plugin(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -1,7 +1,7 @@
import os
import time
from flask import url_for
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output, delete_all_watches
from ..notification import valid_notification_formats
@@ -118,8 +118,10 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
res = client.get(url_for("watchlist.index"))
assert b'Warning, no filters were found' in res.data
assert not os.path.isfile(notification_file)
time.sleep(1)
time.sleep(2)
wait_for_all_checks(client)
wait_for_all_checks(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 5
time.sleep(2)
@@ -178,6 +180,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
follow_redirects=True
)
os.unlink(notification_file)
delete_all_watches(client)
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage, datastore_path):
@@ -185,10 +188,12 @@ def test_check_include_filters_failure_notification(client, live_server, measure
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path)
# Check markup send conversion didnt affect plaintext preference
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text'), datastore_path=datastore_path)
delete_all_watches(client)
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage, datastore_path):
# # live_server_setup(live_server) # Setup on conftest per function
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path)
delete_all_watches(client)
# Test that notification is never sent
@@ -197,3 +202,4 @@ def test_basic_markup_from_text(client, live_server, measure_memory_usage, datas
from ..notification.handler import markup_text_links_to_html
x = markup_text_links_to_html("hello https://google.com")
assert 'a href' in x
delete_all_watches(client)

View File

@@ -166,7 +166,8 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage, datastore_path
delete_all_watches(client)
def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path):
delete_all_watches(client)
set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True)

View File

@@ -142,6 +142,8 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
delete_all_watches(client)
def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path):
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
@@ -162,7 +164,7 @@ def test_check_text_history_view(client, live_server, measure_memory_usage, data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("ui.ui_diff.diff_history_page", uuid="first"))
res = client.get(url_for("ui.ui_diff.diff_history_page", uuid=uuid))
assert b'test-one' in res.data
assert b'test-two' in res.data

View File

@@ -40,10 +40,7 @@ def set_some_changed_response(datastore_path):
def test_normal_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path):
# Give the endpoint time to spin up
time.sleep(1)
from loguru import logger
set_original_response(datastore_path=datastore_path)
@@ -62,20 +59,41 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
logger.info(f"TEST: First check - queuing UUID {uuid}")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
logger.info(f"TEST: Waiting for first check to complete")
wait_result = wait_for_all_checks(client)
logger.info(f"TEST: First check wait completed: {wait_result}")
# Check history after first check
watch = client.application.config.get('DATASTORE').data['watching'][uuid]
logger.info(f"TEST: After first check - history count: {len(watch.history.keys())}")
set_some_changed_response(datastore_path=datastore_path)
wait_for_all_checks(client)
# Trigger a check
logger.info(f"TEST: Second check - queuing UUID {uuid}")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
logger.info(f"TEST: Waiting for second check to complete")
wait_result = wait_for_all_checks(client)
logger.info(f"TEST: Second check wait completed: {wait_result}")
# Check history after second check
watch = client.application.config.get('DATASTORE').data['watching'][uuid]
logger.info(f"TEST: After second check - history count: {len(watch.history.keys())}")
logger.info(f"TEST: Watch history keys: {list(watch.history.keys())}")
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))
if b'has-unread-changes' not in res.data:
logger.error(f"TEST FAILED: has-unread-changes not found in response")
logger.error(f"TEST: Watch last_error: {watch.get('last_error')}")
logger.error(f"TEST: Watch last_checked: {watch.get('last_checked')}")
assert b'has-unread-changes' in res.data
assert b'/test-endpoint' in res.data

View File

@@ -82,7 +82,7 @@ def test_import_distillio(client, live_server, measure_memory_usage, datastore_p
# Give the endpoint time to spin up
time.sleep(1)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
res = client.post(
url_for("imports.import_page"),
data={

View File

@@ -224,6 +224,7 @@ def check_json_filter(json_filter, client, live_server, datastore_path):
set_original_response(datastore_path=datastore_path)
delete_all_watches(client)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
@@ -297,14 +298,17 @@ def check_json_filter_bool_val(json_filter, client, live_server, datastore_path)
def test_check_jsonpath_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
check_json_filter_bool_val("json:$['available']", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jq_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
if jq_support:
check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
if jq_support:
check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
# Re #265 - Extended JSON selector test
# Stuff to consider here
@@ -452,14 +456,17 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage, datastore_path):
check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jq_ext_filter(client, live_server, measure_memory_usage, datastore_path):
if jq_support:
check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jqraw_ext_filter(client, live_server, measure_memory_usage, datastore_path):
if jq_support:
check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_jsonpath_BOM_utf8(client, live_server, measure_memory_usage, datastore_path):
from .. import html_tools
@@ -470,5 +477,6 @@ def test_jsonpath_BOM_utf8(client, live_server, measure_memory_usage, datastore_
# See that we can find the second <script> one, which is not broken, and matches our filter
text = html_tools.extract_json_as_string(json_str, "json:$.name")
assert text == '"José"'
delete_all_watches(client)

View File

@@ -313,14 +313,8 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
# Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
watch_uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, tag="nice one")
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)

View File

@@ -1,7 +1,7 @@
import os
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
import logging
def test_check_notification_error_handling(client, live_server, measure_memory_usage, datastore_path):
@@ -81,4 +81,4 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
os.unlink(os.path.join(datastore_path, "notification.txt"))
assert 'xxxxx' in notification_submission
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)

View File

@@ -0,0 +1,52 @@
import os
import time
from flask import url_for
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
from ..notification import valid_notification_formats
from loguru import logger
def test_queue_system(client, live_server, measure_memory_usage, datastore_path):
"""Test that multiple workers can process queue concurrently without blocking each other"""
# (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)).
items = os.cpu_count() +3
delay = 10
# Auto-queue is off here.
live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True
test_urls = [
f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}"
for i in range(0, items)
]
# Import 30 URLs to queue
res = client.post(
url_for("imports.import_page"),
data={"urls": "\r\n".join(test_urls)},
follow_redirects=True
)
assert f"{items} Imported".encode('utf-8') in res.data
client.application.set_workers(items)
start = time.time()
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(delay/2)
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
logger.debug( f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}")
assert len(running_uuids) != 0, f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}"
wait_for_all_checks(client)
# all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too)
total_time = (time.time() - start)
logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total")
# if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds
assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s"
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
assert len(running_uuids) == 0, f"Expected all workers to be idle, but {len(running_uuids)} UUIDs still being processed: {running_uuids}"

View File

@@ -17,12 +17,12 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
test_url = test_url.replace('localhost', 'changedet')
# Add the test URL twice, we will check
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
uuidA = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
uuidB = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -31,7 +31,7 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
# Add some headers to a request
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuidA),
data={
"url": test_url,
"tags": "",
@@ -42,13 +42,14 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
)
assert b"Updated watch." in res.data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick up the first version
wait_for_all_checks(client)
# The service should echo back the request headers
res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"),
url_for("ui.ui_preview.preview_page", uuid=uuidA),
follow_redirects=True
)
@@ -92,7 +93,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# add the first 'version'
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"url": test_url,
"tags": "",
@@ -110,7 +111,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
body_value = 'Test Body Value {{ 1+1 }}'
body_value_formatted = 'Test Body Value 2'
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"url": test_url,
"tags": "",
@@ -126,7 +127,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# The service should echo back the body
res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"),
url_for("ui.ui_preview.preview_page", uuid=uuid),
follow_redirects=True
)
@@ -157,7 +158,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# Attempt to add a body with a GET method
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"url": test_url,
"tags": "",

View File

@@ -236,6 +236,7 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
}
_run_test_minmax_limit(client, extra_watch_edit_form=extras,datastore_path=datastore_path)
delete_all_watches(client)
@@ -388,9 +389,10 @@ def test_change_with_notification_values(client, live_server, measure_memory_usa
os.unlink(os.path.join(datastore_path, "notification.txt"))
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
res = client.post(url_for("ui.ui_notification.ajax_callback_send_notification_test", watch_uuid=uuid), data={}, follow_redirects=True)
time.sleep(5)
wait_for_notification_endpoint_output(datastore_path=datastore_path)
assert os.path.isfile(os.path.join(datastore_path, "notification.txt")), "Notification received"
delete_all_watches(client)
def test_data_sanity(client, live_server, measure_memory_usage, datastore_path):
@@ -462,3 +464,4 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
assert b'ception' not in res.data
assert b'155.55' in res.data
delete_all_watches(client)

View File

@@ -107,7 +107,7 @@ def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path
assert b"Access denied, bad token" not in res.data
assert b"Random content" in res.data
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -23,6 +23,7 @@ def test_rss_feed_empty(client, live_server, measure_memory_usage, datastore_pat
)
assert res.status_code == 400
assert b'does not have enough history snapshots to show' in res.data
delete_all_watches(client)
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
"""

View File

@@ -24,20 +24,20 @@ def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
assert bytes(include_filters.encode('utf-8')) in res.data
# click share the link
res = client.get(
url_for("ui.form_share_put_watch", uuid="first"),
url_for("ui.form_share_put_watch", uuid=uuid),
follow_redirects=True
)
@@ -63,13 +63,16 @@ def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
# Now hit edit, we should see what we expect
# that the import fetched the meta-data
uuids = list(client.application.config.get('DATASTORE').data['watching'])
assert uuids, "It saved/imported and created a new URL from the share"
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuids[0]),
)
assert bytes(include_filters.encode('utf-8')) in res.data
# Check it saved the URL
res = client.get(url_for("watchlist.index"))
assert bytes(test_url.encode('utf-8')) in res.data
delete_all_watches(client)

View File

@@ -25,6 +25,7 @@ def test_recheck_time_field_validation_global_settings(client, live_server, meas
assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data
delete_all_watches(client)
def test_recheck_time_field_validation_single_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -94,6 +95,7 @@ def test_recheck_time_field_validation_single_watch(client, live_server, measure
assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
delete_all_watches(client)
def test_checkbox_open_diff_in_new_tab(client, live_server, measure_memory_usage, datastore_path):
@@ -242,6 +244,7 @@ def test_page_title_listing_behaviour(client, live_server, measure_memory_usage,
# No page title description, and 'use_page_title_in_list' is on, it should show the <title>
res = client.get(url_for("watchlist.index"))
assert b"head titlecustom html" in res.data
delete_all_watches(client)
def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage, datastore_path):
@@ -283,4 +286,5 @@ def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage, datast
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2)
res = client.get(url_for("watchlist.index"))
assert b'<span id="unread-tab-counter">0</span>' in res.data
assert b'<span id="unread-tab-counter">0</span>' in res.data
delete_all_watches(client)

View File

@@ -366,7 +366,7 @@ def test_check_with_prefix_include_filters(client, live_server, measure_memory_u
assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" not in res.data # not in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_various_rules(client, live_server, measure_memory_usage, datastore_path):
@@ -423,7 +423,7 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": "//*[contains(@class, 'sametext')]|//*[contains(@class, 'changetext')]",
"url": test_url,
"tags": "",
@@ -437,14 +437,14 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"),
url_for("ui.ui_preview.preview_page", uuid=uuid),
follow_redirects=True
)
assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_xpath_20_function_count(client, live_server, measure_memory_usage, datastore_path):
@@ -477,7 +477,7 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage, data
assert b"246913579975308642" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_xpath_20_function_count2(client, live_server, measure_memory_usage, datastore_path):
@@ -501,6 +501,8 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
)
assert b"Updated watch." in res.data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
@@ -510,7 +512,7 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
assert b"246913579975308642" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_xpath_20_function_string_join_matches(client, live_server, measure_memory_usage, datastore_path):
@@ -544,7 +546,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
assert b"Some text thats the samespecialconjunctionSome text that will change" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
@@ -582,7 +584,7 @@ def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'"
assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server, measure_memory_usage, datastore_path):

View File

@@ -6,6 +6,42 @@ from flask import url_for
import logging
import time
import os
import threading
# Thread-safe global storage for test endpoint content
# Avoids filesystem cache issues in parallel tests
_test_endpoint_content_lock = threading.Lock()
_test_endpoint_content = {}
def write_test_file_and_sync(filepath, content, mode='w'):
"""
Write test data to file and ensure it's synced to disk.
Also stores in thread-safe global dict to bypass filesystem cache.
Critical for parallel tests where workers may read files immediately after write.
Without fsync(), data may still be in OS buffers when workers try to read,
causing race conditions where old data is seen.
Args:
filepath: Full path to file
content: Content to write (str or bytes)
mode: File mode ('w' for text, 'wb' for binary)
"""
# Convert content to bytes if needed
if isinstance(content, str):
content_bytes = content.encode('utf-8')
else:
content_bytes = content
# Store in thread-safe global dict for instant access
with _test_endpoint_content_lock:
_test_endpoint_content[os.path.basename(filepath)] = content_bytes
# Also write to file for compatibility
with open(filepath, mode) as f:
f.write(content)
f.flush() # Flush Python buffer to OS
os.fsync(f.fileno()) # Force OS to write to disk
def set_original_response(datastore_path, extra_title=''):
test_return_data = f"""<html>
@@ -20,8 +56,7 @@ def set_original_response(datastore_path, extra_title=''):
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
return None
def set_modified_response(datastore_path):
@@ -36,9 +71,7 @@ def set_modified_response(datastore_path):
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
return None
def set_longer_modified_response(datastore_path):
test_return_data = """<html>
@@ -55,9 +88,7 @@ def set_longer_modified_response(datastore_path):
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
return None
def set_more_modified_response(datastore_path):
@@ -73,17 +104,14 @@ def set_more_modified_response(datastore_path):
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
return None
def set_empty_text_response(datastore_path):
test_return_data = """<html><body></body></html>"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
return None
@@ -132,21 +160,40 @@ def extract_UUID_from_client(client):
return uuid.strip()
def delete_all_watches(client=None):
# Change tracking
client.application.config.get('DATASTORE')._dirty_watches = set() # Watch UUIDs that need saving
client.application.config.get('DATASTORE')._dirty_settings = False # Settings changed
client.application.config.get('DATASTORE')._watch_hashes = {} # UUID -> SHA256 hash for change detection
uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid)
from changedetectionio.flask_app import update_q
# Clear the queue to prevent leakage to next test
# Use clear() method to ensure both priority_items and notification_queue are drained
if hasattr(update_q, 'clear'):
update_q.clear()
else:
# Fallback for old implementation
while not update_q.empty():
try:
update_q.get_nowait()
except:
break
time.sleep(0.2)
def wait_for_all_checks(client=None):
"""
Waits until the queue is empty and workers are idle.
Delegates to worker_handler.wait_for_all_checks for shared logic.
Delegates to worker_pool.wait_for_all_checks for shared logic.
"""
from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_handler
from changedetectionio import worker_pool
return worker_pool.wait_for_all_checks(global_update_q, timeout=150)
# Use the shared wait logic from worker_handler
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
def wait_for_watch_history(client, min_history_count=2, timeout=10):
"""
@@ -218,15 +265,35 @@ def new_live_server_setup(live_server):
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
# Tried using a global var here but didn't seem to work, so reading from a file instead.
datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore')
with open(os.path.join(datastore_path, "endpoint-content.txt"), "rb") as f:
resp = make_response(f.read(), status_code)
if uppercase_headers:
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
else:
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
# Check thread-safe global dict first (instant, no cache issues)
# Fall back to file if not in dict (for tests that write directly)
with _test_endpoint_content_lock:
content_data = _test_endpoint_content.get("endpoint-content.txt")
if content_data is None:
# Not in global dict, read from file
datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore')
filepath = os.path.join(datastore_path, "endpoint-content.txt")
# Force filesystem sync before reading
try:
os.sync()
except (AttributeError, PermissionError):
pass
try:
with open(filepath, "rb") as f:
content_data = f.read()
except Exception as e:
logger.error(f"Error reading endpoint-content.txt: {e}")
raise
resp = make_response(content_data, status_code)
if uppercase_headers:
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
else:
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
except FileNotFoundError:
return make_response('', status_code)
@@ -301,6 +368,12 @@ def new_live_server_setup(live_server):
def test_pdf_endpoint():
datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore')
# Force filesystem sync before reading to ensure fresh data
try:
os.sync()
except (AttributeError, PermissionError):
pass
# Tried using a global var here but didn't seem to work, so reading from a file instead.
with open(os.path.join(datastore_path, "endpoint-test.pdf"), "rb") as f:
resp = make_response(f.read(), 200)

View File

@@ -3,7 +3,9 @@ from .processors.exceptions import ProcessorException
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
from changedetectionio import html_tools
from changedetectionio import worker_pool
from changedetectionio.flask_app import watch_check_update
from changedetectionio.queuedWatchMetaData import PrioritizedItem
import asyncio
import importlib
@@ -46,19 +48,33 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
jobs_processed = 0
start_time = time.time()
logger.info(f"Starting async worker {worker_id} (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
# Log thread name for debugging
import threading
thread_name = threading.current_thread().name
logger.info(f"Starting async worker {worker_id} on thread '{thread_name}' (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
while not app.config.exit.is_set():
update_handler = None
watch = None
try:
# Use sync interface via run_in_executor since each worker has its own event loop
loop = asyncio.get_event_loop()
queued_item_data = await asyncio.wait_for(
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
timeout=1.5
)
# Efficient blocking via run_in_executor (no polling overhead!)
# Worker blocks in threading.Queue.get() which uses Condition.wait()
# Executor must be sized to match worker count (see worker_pool.py: 50 threads default)
# Single timeout (no double-timeout wrapper) = no race condition
queued_item_data = await q.async_get(executor=executor, timeout=1.0)
# CRITICAL: Claim UUID immediately after getting from queue to prevent race condition
# in wait_for_all_checks() which checks qsize() and running_uuids separately
uuid = queued_item_data.item.get('uuid')
if not worker_pool.claim_uuid_for_processing(uuid, worker_id):
# Already being processed - re-queue and continue
logger.trace(f"Worker {worker_id} detected UUID {uuid} already processing during claim - deferring")
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_pool.queue_item_async_safe(q, deferred_item, silent=True)
continue
except asyncio.TimeoutError:
# No jobs available - check if we should restart based on time while idle
@@ -67,6 +83,17 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
return "restart"
continue
except RuntimeError as e:
# Handle executor shutdown gracefully - this is expected during shutdown
if "cannot schedule new futures after shutdown" in str(e):
# Executor shut down - exit gracefully without logging in pytest
if not IN_PYTEST:
logger.debug(f"Worker {worker_id} detected executor shutdown, exiting")
break
# Other RuntimeError - log and continue
logger.error(f"Worker {worker_id} runtime error: {e}")
await asyncio.sleep(0.1)
continue
except Exception as e:
# Handle expected Empty exception from queue timeout
import queue
@@ -88,26 +115,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
await asyncio.sleep(0.1)
continue
uuid = queued_item_data.item.get('uuid')
# RACE CONDITION FIX: Atomically claim this UUID for processing
from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem
# Try to claim the UUID atomically - prevents duplicate processing
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
# Already being processed by another worker
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
# Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
# Re-queue with lower priority so it gets checked again after current processing finishes
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
continue
# UUID already claimed above immediately after getting from queue
# to prevent race condition with wait_for_all_checks()
fetch_start_time = round(time.time())
@@ -224,6 +233,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
except FilterNotFoundInResponse as e:
if not datastore.data['watching'].get(uuid):
continue
logger.debug(f"Received FilterNotFoundInResponse exception for {uuid}")
err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary."
datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text})
@@ -243,17 +253,19 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
c += 1
# Send notification if we reached the threshold?
threshold = datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}")
logger.debug(f"FilterNotFoundInResponse - Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}")
if c >= threshold:
if not watch.get('notification_muted'):
logger.debug(f"Sending filter failed notification for {uuid}")
logger.debug(f"FilterNotFoundInResponse - Sending filter failed notification for {uuid}")
await send_filter_failure_notification(uuid, notification_q, datastore)
c = 0
logger.debug(f"Reset filter failure count back to zero")
logger.debug(f"FilterNotFoundInResponse - Reset filter failure count back to zero")
else:
logger.debug(f"FilterNotFoundInResponse - {c} of threshold {threshold}..")
datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
else:
logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping")
logger.trace(f"FilterNotFoundInResponse - {uuid} - filter_failure_notification_send not enabled, skipping")
process_changedetection_results = False
@@ -490,7 +502,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
try:
# Release UUID from processing (thread-safe)
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id)
worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id)
# Send completion signal
if watch:

View File

@@ -23,11 +23,14 @@ _uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids
USE_ASYNC_WORKERS = True
# Custom ThreadPoolExecutor for queue operations with named threads
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
# Scale executor threads to match FETCH_WORKERS (no minimum, no maximum)
# Thread naming: "QueueGetter-N" for easy debugging in thread dumps/traces
# With FETCH_WORKERS=10: 10 workers + 10 executor threads = 20 threads total
# With FETCH_WORKERS=500: 500 workers + 500 executor threads = 1000 threads total (acceptable on modern systems)
_max_executor_workers = int(os.getenv("FETCH_WORKERS", "10"))
queue_executor = ThreadPoolExecutor(
max_workers=_max_executor_workers,
thread_name_prefix="QueueGetter-"
thread_name_prefix="QueueGetter-" # Shows in thread dumps as "QueueGetter-0", "QueueGetter-1", etc.
)
@@ -82,16 +85,17 @@ class WorkerThread:
self.loop = None
def start(self):
"""Start the worker thread"""
"""Start the worker thread with descriptive name for debugging"""
self.thread = threading.Thread(
target=self.run,
daemon=True,
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}" # Shows in thread dumps with worker ID
)
self.thread.start()
def stop(self):
"""Stop the worker thread"""
"""Stop the worker thread brutally - no waiting"""
# Try to stop the event loop if it exists
if self.loop and self.running:
try:
# Signal the loop to stop
@@ -99,8 +103,7 @@ class WorkerThread:
except RuntimeError:
pass
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
# Don't wait - thread is daemon and will die when needed
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
@@ -125,7 +128,7 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
"""Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker
from changedetectionio.worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging
import os
@@ -337,24 +340,36 @@ def queue_item_async_safe(update_q, item, silent=False):
def shutdown_workers():
"""Shutdown all async workers fast and aggressively"""
global worker_threads
"""Shutdown all async workers brutally - no delays, no waiting"""
global worker_threads, queue_executor
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Fast shutdown of async workers initiated...")
logger.info("Brutal shutdown of async workers initiated...")
# Stop all worker threads
# Stop all worker event loops
for worker in worker_threads:
worker.stop()
# Clear immediately - threads are daemon and will die
worker_threads.clear()
# Shutdown the queue executor to prevent "cannot schedule new futures after shutdown" errors
# This must happen AFTER workers are stopped to avoid race conditions
if queue_executor:
try:
queue_executor.shutdown(wait=False)
if not in_pytest:
logger.debug("Queue executor shut down")
except Exception as e:
if not in_pytest:
logger.warning(f"Error shutting down queue executor: {e}")
if not in_pytest:
logger.info("Async workers fast shutdown complete")
logger.info("Async workers brutal shutdown complete")
@@ -469,12 +484,14 @@ def wait_for_all_checks(update_q, timeout=150):
elif time.time() - empty_since >= 0.3:
# Add small buffer for filesystem operations to complete
time.sleep(0.2)
logger.trace("wait_for_all_checks: All checks complete (queue empty, workers idle)")
return True
else:
empty_since = None
attempt += 1
logger.warning(f"wait_for_all_checks: Timeout after {timeout} attempts")
return False # Timeout

View File

@@ -8,7 +8,7 @@ flask-paginate
flask_expects_json~=1.7
flask_restful
flask_cors # For the Chrome extension to operate
janus # Thread-safe async/sync queue bridge
# janus # No longer needed - using pure threading.Queue for multi-loop support
flask_wtf~=1.2
flask~=3.1
flask-socketio~=5.6.0