Compare commits

...

29 Commits

Author SHA1 Message Date
dgtlmoon c7dc25bdfc tweaks
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-02 22:27:31 +01:00
dgtlmoon ca85310fb0 WIP 2026-02-02 22:15:43 +01:00
dgtlmoon 6907bfab1e tweaks 2026-02-02 22:01:52 +01:00
dgtlmoon 65e6b461cf Tweaks
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2026-02-02 20:24:33 +01:00
dgtlmoon d96ddc0f23 Tweaks 2026-02-02 20:20:07 +01:00
dgtlmoon e09c2813b1 Lower workers for testing 2026-02-02 20:04:17 +01:00
dgtlmoon 1f3c0995e5 test speeupds 2026-02-02 19:51:04 +01:00
dgtlmoon d420bda7e4 tweaks 2026-02-02 18:55:59 +01:00
dgtlmoon f6a1b6d808 Timing tune 2026-02-02 18:49:54 +01:00
dgtlmoon 6f12412396 Queue changes 2026-02-02 18:37:07 +01:00
dgtlmoon ff2ead88dd test tweak 2026-02-02 18:19:27 +01:00
dgtlmoon c38e3df4ee Bump ignore 2026-02-02 18:10:55 +01:00
dgtlmoon 899e21a018 Queue timing fixes 2026-02-02 18:10:47 +01:00
dgtlmoon aea7fc6f0a test cleanup 2026-02-02 15:09:05 +01:00
dgtlmoon d6d4960762 test tweak 2026-02-02 15:08:22 +01:00
dgtlmoon 72073bfc5e include cleanup 2026-02-02 15:04:10 +01:00
dgtlmoon 8c809872e8 GitHub build - attempt to cache container build better 2026-02-02 14:58:19 +01:00
dgtlmoon 081d803977 test fix 2026-02-02 14:56:34 +01:00
dgtlmoon 61826bbf94 WIP 2026-02-02 14:47:14 +01:00
dgtlmoon 5fc920db5d WIP
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-02 13:02:20 +01:00
dgtlmoon 68fb5cf898 test tweak 2026-02-02 12:59:21 +01:00
dgtlmoon 5b153ca25d Revert test changes 2026-02-02 12:51:39 +01:00
dgtlmoon f166e96466 Test fix 2026-02-02 11:16:16 +01:00
dgtlmoon b7eaeb4ae4 Test fixes 2026-02-02 11:06:11 +01:00
dgtlmoon ef310e4a67 test tweaks
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-02-01 18:26:33 +01:00
dgtlmoon cf32bf5f47 test improvements 2026-02-01 18:18:22 +01:00
dgtlmoon 424e4ec1aa Add test for worker active count 2026-02-01 12:27:19 +01:00
dgtlmoon c1dca306ad Refactor queue handling, add tests 2026-02-01 12:21:39 +01:00
dgtlmoon e219e8cada Janus queue worker not needed, improves multiple workers 2026-02-01 10:57:04 +01:00
48 changed files with 849 additions and 375 deletions
@@ -37,10 +37,29 @@ jobs:
${{ runner.os }}-pip-py${{ env.PYTHON_VERSION }}- ${{ runner.os }}-pip-py${{ env.PYTHON_VERSION }}-
${{ runner.os }}-pip- ${{ runner.os }}-pip-
- name: Get current date for cache key
id: date
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Build changedetection.io container for testing under Python ${{ env.PYTHON_VERSION }} - name: Build changedetection.io container for testing under Python ${{ env.PYTHON_VERSION }}
uses: docker/build-push-action@v6
with:
context: ./
file: ./Dockerfile
build-args: |
PYTHON_VERSION=${{ env.PYTHON_VERSION }}
LOGGER_LEVEL=TRACE
tags: test-changedetectionio
load: true
cache-from: type=gha,scope=build-${{ github.ref_name }}-py${{ env.PYTHON_VERSION }}-${{ hashFiles('requirements.txt', 'Dockerfile') }}-${{ steps.date.outputs.date }}
cache-to: type=gha,mode=max,scope=build-${{ github.ref_name }}-py${{ env.PYTHON_VERSION }}-${{ hashFiles('requirements.txt', 'Dockerfile') }}-${{ steps.date.outputs.date }}
- name: Verify build
run: | run: |
echo "---- Building for Python ${{ env.PYTHON_VERSION }} -----" echo "---- Built for Python ${{ env.PYTHON_VERSION }} -----"
docker build --build-arg PYTHON_VERSION=${{ env.PYTHON_VERSION }} --build-arg LOGGER_LEVEL=TRACE -t test-changedetectionio .
docker run test-changedetectionio bash -c 'pip list' docker run test-changedetectionio bash -c 'pip list'
- name: We should be Python ${{ env.PYTHON_VERSION }} ... - name: We should be Python ${{ env.PYTHON_VERSION }} ...
+1
View File
@@ -29,3 +29,4 @@ test-datastore/
# Memory consumption log # Memory consumption log
test-memory.log test-memory.log
tests/logs/
+8 -8
View File
@@ -102,8 +102,8 @@ def sigshutdown_handler(_signo, _stack_frame):
# Shutdown workers and queues immediately # Shutdown workers and queues immediately
try: try:
from changedetectionio import worker_handler from changedetectionio import worker_pool
worker_handler.shutdown_workers() worker_pool.shutdown_workers()
except Exception as e: except Exception as e:
logger.error(f"Error shutting down workers: {str(e)}") logger.error(f"Error shutting down workers: {str(e)}")
@@ -415,12 +415,12 @@ def main():
# This must happen AFTER app initialization so update_q is available # This must happen AFTER app initialization so update_q is available
if batch_mode and added_watch_uuids: if batch_mode and added_watch_uuids:
from changedetectionio.flask_app import update_q from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler from changedetectionio import queuedWatchMetaData, worker_pool
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches") logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
for watch_uuid in added_watch_uuids: for watch_uuid in added_watch_uuids:
try: try:
worker_handler.queue_item_async_safe( worker_pool.queue_item_async_safe(
update_q, update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
) )
@@ -432,7 +432,7 @@ def main():
# This must happen AFTER app initialization so update_q is available # This must happen AFTER app initialization so update_q is available
if recheck_watches is not None: if recheck_watches is not None:
from changedetectionio.flask_app import update_q from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData, worker_handler from changedetectionio import queuedWatchMetaData, worker_pool
watches_to_queue = [] watches_to_queue = []
if recheck_watches == 'all': if recheck_watches == 'all':
@@ -454,7 +454,7 @@ def main():
for watch_uuid in watches_to_queue: for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']: if watch_uuid in datastore.data['watching']:
try: try:
worker_handler.queue_item_async_safe( worker_pool.queue_item_async_safe(
update_q, update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
) )
@@ -516,7 +516,7 @@ def main():
for watch_uuid in watches_to_queue: for watch_uuid in watches_to_queue:
if watch_uuid in datastore.data['watching']: if watch_uuid in datastore.data['watching']:
try: try:
worker_handler.queue_item_async_safe( worker_pool.queue_item_async_safe(
update_q, update_q,
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
) )
@@ -549,7 +549,7 @@ def main():
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...") logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
# Use the shared wait_for_all_checks function # Use the shared wait_for_all_checks function
completed = worker_handler.wait_for_all_checks(update_q, timeout=300) completed = worker_pool.wait_for_all_checks(update_q, timeout=300)
if not completed: if not completed:
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds") logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
+3 -3
View File
@@ -1,5 +1,5 @@
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler from changedetectionio import worker_pool
from flask_expects_json import expects_json from flask_expects_json import expects_json
from flask_restful import abort, Resource from flask_restful import abort, Resource
from loguru import logger from loguru import logger
@@ -42,7 +42,7 @@ class Tag(Resource):
# If less than 20 watches, queue synchronously for immediate feedback # If less than 20 watches, queue synchronously for immediate feedback
if len(watches_to_queue) < 20: if len(watches_to_queue) < 20:
for watch_uuid in watches_to_queue: for watch_uuid in watches_to_queue:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200 return {'status': f'OK, queued {len(watches_to_queue)} watches for rechecking'}, 200
else: else:
# 20+ watches - queue in background thread to avoid blocking API response # 20+ watches - queue in background thread to avoid blocking API response
@@ -50,7 +50,7 @@ class Tag(Resource):
"""Background thread to queue watches - discarded after completion.""" """Background thread to queue watches - discarded after completion."""
try: try:
for watch_uuid in watches_to_queue: for watch_uuid in watches_to_queue:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued") logger.info(f"Background queueing complete for tag {tag['uuid']}: {len(watches_to_queue)} watches queued")
except Exception as e: except Exception as e:
logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}") logger.error(f"Error in background queueing for tag {tag['uuid']}: {e}")
+7 -7
View File
@@ -6,7 +6,7 @@ from changedetectionio.favicon_utils import get_favicon_mime_type
from . import auth from . import auth
from changedetectionio import queuedWatchMetaData, strtobool from changedetectionio import queuedWatchMetaData, strtobool
from changedetectionio import worker_handler from changedetectionio import worker_pool
from flask import request, make_response, send_from_directory from flask import request, make_response, send_from_directory
from flask_expects_json import expects_json from flask_expects_json import expects_json
from flask_restful import abort, Resource from flask_restful import abort, Resource
@@ -85,7 +85,7 @@ class Watch(Resource):
abort(404, message='No watch exists with the UUID of {}'.format(uuid)) abort(404, message='No watch exists with the UUID of {}'.format(uuid))
if request.args.get('recheck'): if request.args.get('recheck'):
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return "OK", 200 return "OK", 200
if request.args.get('paused', '') == 'paused': if request.args.get('paused', '') == 'paused':
self.datastore.data['watching'].get(uuid).pause() self.datastore.data['watching'].get(uuid).pause()
@@ -477,7 +477,7 @@ class CreateWatch(Resource):
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags) new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
if new_uuid: if new_uuid:
# Dont queue because the scheduler will check that it hasnt been checked before anyway # Dont queue because the scheduler will check that it hasnt been checked before anyway
# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) # worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
return {'uuid': new_uuid}, 201 return {'uuid': new_uuid}, 201
else: else:
return "Invalid or unsupported URL", 400 return "Invalid or unsupported URL", 400
@@ -514,7 +514,7 @@ class CreateWatch(Resource):
if len(watches_to_queue) < 20: if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient) # Get already queued/running UUIDs once (efficient)
queued_uuids = set(self.update_q.get_queued_uuids()) queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids()) running_uuids = set(worker_pool.get_running_uuids())
# Filter out watches that are already queued or running # Filter out watches that are already queued or running
watches_to_queue_filtered = [ watches_to_queue_filtered = [
@@ -524,7 +524,7 @@ class CreateWatch(Resource):
# Queue only the filtered watches # Queue only the filtered watches
for uuid in watches_to_queue_filtered: for uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Provide feedback about skipped watches # Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered) skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -536,7 +536,7 @@ class CreateWatch(Resource):
# 20+ watches - queue in background thread to avoid blocking API response # 20+ watches - queue in background thread to avoid blocking API response
# Capture queued/running state before background thread # Capture queued/running state before background thread
queued_uuids = set(self.update_q.get_queued_uuids()) queued_uuids = set(self.update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids()) running_uuids = set(worker_pool.get_running_uuids())
def queue_all_watches_background(): def queue_all_watches_background():
"""Background thread to queue all watches - discarded after completion.""" """Background thread to queue all watches - discarded after completion."""
@@ -546,7 +546,7 @@ class CreateWatch(Resource):
for uuid in watches_to_queue: for uuid in watches_to_queue:
# Check if already queued or running (state captured at start) # Check if already queued or running (state captured at start)
if uuid not in queued_uuids and uuid not in running_uuids: if uuid not in queued_uuids and uuid not in running_uuids:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
queued_count += 1 queued_count += 1
else: else:
skipped_count += 1 skipped_count += 1
@@ -14,7 +14,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import forms from changedetectionio import forms
# #
if request.method == 'POST': if request.method == 'POST':
# from changedetectionio import worker_handler # from changedetectionio import worker_pool
from changedetectionio.blueprint.imports.importer import ( from changedetectionio.blueprint.imports.importer import (
import_url_list, import_url_list,
@@ -31,7 +31,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs") logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs")
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids: # for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if len(importer_handler.remaining_data) == 0: if len(importer_handler.remaining_data) == 0:
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))
@@ -45,7 +45,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids: # for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# XLSX importer # XLSX importer
@@ -70,7 +70,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue # Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids: # for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) # worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Could be some remaining, or we could be on GET # Could be some remaining, or we could be on GET
@@ -4,7 +4,7 @@ from flask import Blueprint, flash, redirect, url_for
from flask_login import login_required from flask_login import login_required
from changedetectionio.store import ChangeDetectionStore from changedetectionio.store import ChangeDetectionStore
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler from changedetectionio import worker_pool
from queue import PriorityQueue from queue import PriorityQueue
PRICE_DATA_TRACK_ACCEPT = 'accepted' PRICE_DATA_TRACK_ACCEPT = 'accepted'
@@ -20,7 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q: PriorityQueue
datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT datastore.data['watching'][uuid]['track_ldjson_price_data'] = PRICE_DATA_TRACK_ACCEPT
datastore.data['watching'][uuid]['processor'] = 'restock_diff' datastore.data['watching'][uuid]['processor'] = 'restock_diff'
datastore.data['watching'][uuid].clear_watch() datastore.data['watching'][uuid].clear_watch()
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
return redirect(url_for("watchlist.index")) return redirect(url_for("watchlist.index"))
@login_required @login_required
@@ -37,6 +37,8 @@ def construct_single_watch_routes(rss_blueprint, datastore):
rss_content_format = datastore.data['settings']['application'].get('rss_content_format') rss_content_format = datastore.data['settings']['application'].get('rss_content_format')
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# Get the watch by UUID # Get the watch by UUID
watch = datastore.data['watching'].get(uuid) watch = datastore.data['watching'].get(uuid)
if not watch: if not watch:
@@ -83,7 +83,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Adjust worker count if it changed # Adjust worker count if it changed
if new_worker_count != old_worker_count: if new_worker_count != old_worker_count:
from changedetectionio import worker_handler from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds from changedetectionio.flask_app import update_q, notification_q, app, datastore as ds
# Check CPU core availability and warn if worker count is high # Check CPU core availability and warn if worker count is high
@@ -92,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format( flash(gettext("Warning: Worker count ({}) is close to or exceeds available CPU cores ({})").format(
new_worker_count, cpu_count), 'warning') new_worker_count, cpu_count), 'warning')
result = worker_handler.adjust_async_worker_count( result = worker_pool.adjust_async_worker_count(
new_count=new_worker_count, new_count=new_worker_count,
update_q=update_q, update_q=update_q,
notification_q=notification_q, notification_q=notification_q,
+15 -18
View File
@@ -10,7 +10,7 @@ from changedetectionio.blueprint.ui.notification import construct_blueprint as c
from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint from changedetectionio.blueprint.ui.views import construct_blueprint as construct_views_blueprint
from changedetectionio.blueprint.ui import diff, preview from changedetectionio.blueprint.ui import diff, preview
def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True): def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchMetaData, watch_check_update, extra_data=None, emit_flash=True):
from flask import request, flash from flask import request, flash
if op == 'delete': if op == 'delete':
@@ -63,7 +63,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
# Recheck and require a full reprocessing # Recheck and require a full reprocessing
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if emit_flash: if emit_flash:
flash(gettext("{} watches queued for rechecking").format(len(uuids))) flash(gettext("{} watches queued for rechecking").format(len(uuids)))
@@ -114,7 +114,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids: for uuid in uuids:
watch_check_update.send(watch_uuid=uuid) watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update): def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates") ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint # Register the edit blueprint
@@ -222,14 +222,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
@login_optionally_required @login_optionally_required
def form_delete(): def form_delete():
uuid = request.args.get('uuid') uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if uuid != 'all' and not uuid in datastore.data['watching'].keys(): if uuid != 'all' and not uuid in datastore.data['watching'].keys():
flash(gettext('The watch by UUID {} does not exist.').format(uuid), 'error') flash(gettext('The watch by UUID {} does not exist.').format(uuid), 'error')
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
datastore.delete(uuid) datastore.delete(uuid)
flash(gettext('Deleted.')) flash(gettext('Deleted.'))
@@ -239,14 +239,14 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
@login_optionally_required @login_optionally_required
def form_clone(): def form_clone():
uuid = request.args.get('uuid') uuid = request.args.get('uuid')
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
new_uuid = datastore.clone(uuid) new_uuid = datastore.clone(uuid)
if not datastore.data['watching'].get(uuid).get('paused'): if not datastore.data['watching'].get(uuid).get('paused'):
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=5, item={'uuid': new_uuid}))
flash(gettext('Cloned, you are editing the new watch.')) flash(gettext('Cloned, you are editing the new watch.'))
@@ -262,10 +262,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
if uuid: if uuid:
# Single watch - check if already queued or running # Single watch - check if already queued or running
if worker_handler.is_watch_running(uuid) or uuid in update_q.get_queued_uuids(): if worker_pool.is_watch_running(uuid) or uuid in update_q.get_queued_uuids():
flash(gettext("Watch is already queued or being checked.")) flash(gettext("Watch is already queued or being checked."))
else: else:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
flash(gettext("Queued 1 watch for rechecking.")) flash(gettext("Queued 1 watch for rechecking."))
else: else:
# Multiple watches - first count how many need to be queued # Multiple watches - first count how many need to be queued
@@ -284,7 +284,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
if len(watches_to_queue) < 20: if len(watches_to_queue) < 20:
# Get already queued/running UUIDs once (efficient) # Get already queued/running UUIDs once (efficient)
queued_uuids = set(update_q.get_queued_uuids()) queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids()) running_uuids = set(worker_pool.get_running_uuids())
# Filter out watches that are already queued or running # Filter out watches that are already queued or running
watches_to_queue_filtered = [] watches_to_queue_filtered = []
@@ -294,7 +294,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
# Queue only the filtered watches # Queue only the filtered watches
for watch_uuid in watches_to_queue_filtered: for watch_uuid in watches_to_queue_filtered:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
# Provide feedback about skipped watches # Provide feedback about skipped watches
skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered) skipped_count = len(watches_to_queue) - len(watches_to_queue_filtered)
@@ -310,7 +310,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
# 20+ watches - queue in background thread to avoid blocking HTTP response # 20+ watches - queue in background thread to avoid blocking HTTP response
# Capture queued/running state before background thread # Capture queued/running state before background thread
queued_uuids = set(update_q.get_queued_uuids()) queued_uuids = set(update_q.get_queued_uuids())
running_uuids = set(worker_handler.get_running_uuids()) running_uuids = set(worker_pool.get_running_uuids())
def queue_watches_background(): def queue_watches_background():
"""Background thread to queue watches - discarded after completion.""" """Background thread to queue watches - discarded after completion."""
@@ -320,7 +320,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
for watch_uuid in watches_to_queue: for watch_uuid in watches_to_queue:
# Check if already queued or running (state captured at start) # Check if already queued or running (state captured at start)
if watch_uuid not in queued_uuids and watch_uuid not in running_uuids: if watch_uuid not in queued_uuids and watch_uuid not in running_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}))
queued_count += 1 queued_count += 1
else: else:
skipped_count += 1 skipped_count += 1
@@ -349,7 +349,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
extra_data=extra_data, extra_data=extra_data,
queuedWatchMetaData=queuedWatchMetaData, queuedWatchMetaData=queuedWatchMetaData,
uuids=uuids, uuids=uuids,
worker_handler=worker_handler, worker_pool=worker_pool,
update_q=update_q, update_q=update_q,
watch_check_update=watch_check_update, watch_check_update=watch_check_update,
op=op, op=op,
@@ -367,9 +367,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
import json import json
from copy import deepcopy from copy import deepcopy
# more for testing
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# copy it to memory as trim off what we dont need (history) # copy it to memory as trim off what we dont need (history)
watch = deepcopy(datastore.data['watching'].get(uuid)) watch = deepcopy(datastore.data['watching'].get(uuid))
+4 -5
View File
@@ -83,7 +83,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
If a processor doesn't have a difference module, falls back to text_json_diff. If a processor doesn't have a difference module, falls back to text_json_diff.
""" """
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
@@ -144,10 +143,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::render_form() Each processor implements processors/{type}/extract.py::render_form()
If a processor doesn't have an extract module, falls back to text_json_diff. If a processor doesn't have an extract module, falls back to text_json_diff.
""" """
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
try: try:
watch = datastore.data['watching'][uuid] watch = datastore.data['watching'][uuid]
except KeyError: except KeyError:
@@ -200,7 +199,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/extract.py::process_extraction() Each processor implements processors/{type}/extract.py::process_extraction()
If a processor doesn't have an extract module, falls back to text_json_diff. If a processor doesn't have an extract module, falls back to text_json_diff.
""" """
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
@@ -267,7 +266,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
- /diff/{uuid}/processor-asset/after - /diff/{uuid}/processor-asset/after
- /diff/{uuid}/processor-asset/rendered_diff - /diff/{uuid}/processor-asset/rendered_diff
""" """
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
+10 -6
View File
@@ -9,7 +9,7 @@ from jinja2 import Environment, FileSystemLoader
from changedetectionio.store import ChangeDetectionStore from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.time_handler import is_within_schedule from changedetectionio.time_handler import is_within_schedule
from changedetectionio import worker_handler from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData): def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates") edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
@@ -30,14 +30,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from changedetectionio import processors from changedetectionio import processors
import importlib import importlib
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
# More for testing, possible to return the first/only # More for testing, possible to return the first/only
if not datastore.data['watching'].keys(): if not datastore.data['watching'].keys():
flash(gettext("No watches to edit"), "error") flash(gettext("No watches to edit"), "error")
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
if not uuid in datastore.data['watching']: if not uuid in datastore.data['watching']:
flash(gettext("No watch with the UUID {} found.").format(uuid), "error") flash(gettext("No watch with the UUID {} found.").format(uuid), "error")
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))
@@ -283,7 +282,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
############################# #############################
if not datastore.data['watching'][uuid].get('paused') and is_in_schedule: if not datastore.data['watching'][uuid].get('paused') and is_in_schedule:
# Queue the watch for immediate recheck, with a higher priority # Queue the watch for immediate recheck, with a higher priority
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Diff page [edit] link should go back to diff page # Diff page [edit] link should go back to diff page
if request.args.get("next") and request.args.get("next") == 'diff': if request.args.get("next") and request.args.get("next") == 'diff':
@@ -314,7 +313,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
app_rss_token = datastore.data['settings']['application'].get('rss_access_token'), app_rss_token = datastore.data['settings']['application'].get('rss_access_token'),
c = [f"processor-{watch.get('processor')}"] c = [f"processor-{watch.get('processor')}"]
if worker_handler.is_watch_running(uuid): if worker_pool.is_watch_running(uuid):
c.append('checking-now') c.append('checking-now')
template_args = { template_args = {
@@ -371,6 +370,8 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from flask import send_file from flask import send_file
import brotli import brotli
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
watch = datastore.data['watching'].get(uuid) watch = datastore.data['watching'].get(uuid)
if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir): if watch and watch.history.keys() and os.path.isdir(watch.watch_data_dir):
latest_filename = list(watch.history.keys())[-1] latest_filename = list(watch.history.keys())[-1]
@@ -395,6 +396,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
def watch_get_preview_rendered(uuid): def watch_get_preview_rendered(uuid):
'''For when viewing the "preview" of the rendered text from inside of Edit''' '''For when viewing the "preview" of the rendered text from inside of Edit'''
from flask import jsonify from flask import jsonify
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
from changedetectionio.processors.text_json_diff import prepare_filter_prevew from changedetectionio.processors.text_json_diff import prepare_filter_prevew
result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore) result = prepare_filter_prevew(watch_uuid=uuid, form_data=request.form, datastore=datastore)
return jsonify(result) return jsonify(result)
+1 -4
View File
@@ -26,10 +26,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
Each processor implements processors/{type}/preview.py::render() Each processor implements processors/{type}/preview.py::render()
If a processor doesn't have a preview module, falls back to default text preview. If a processor doesn't have a preview module, falls back to default text preview.
""" """
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
try: try:
watch = datastore.data['watching'][uuid] watch = datastore.data['watching'][uuid]
except KeyError: except KeyError:
@@ -150,10 +149,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
""" """
from flask import make_response from flask import make_response
# More for testing, possible to return the first/only
if uuid == 'first': if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop() uuid = list(datastore.data['watching'].keys()).pop()
try: try:
watch = datastore.data['watching'][uuid] watch = datastore.data['watching'][uuid]
except KeyError: except KeyError:
+2 -2
View File
@@ -2,7 +2,7 @@ from flask import Blueprint, request, redirect, url_for, flash
from flask_babel import gettext from flask_babel import gettext
from changedetectionio.store import ChangeDetectionStore from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio import worker_handler from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update): def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, watch_check_update):
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag'))) return redirect(url_for('ui.ui_edit.edit_page', uuid=new_uuid, unpause_on_save=1, tag=request.args.get('tag')))
else: else:
# Straight into the queue. # Straight into the queue.
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
flash(gettext("Watch added.")) flash(gettext("Watch added."))
return redirect(url_for('watchlist.index', tag=request.args.get('tag',''))) return redirect(url_for('watchlist.index', tag=request.args.get('tag','')))
+23 -2
View File
@@ -55,6 +55,26 @@ class fetcher(Fetcher):
session = requests.Session() session = requests.Session()
# Configure retry adapter for low-level network errors only
# Retries connection timeouts, read timeouts, connection resets - not HTTP status codes
# Especially helpful in parallel test execution when servers are slow/overloaded
# Configurable via REQUESTS_RETRY_MAX_COUNT (default: 3 attempts)
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
max_retries = int(os.getenv("REQUESTS_RETRY_MAX_COUNT", "6"))
retry_strategy = Retry(
total=max_retries,
connect=max_retries, # Retry connection timeouts
read=max_retries, # Retry read timeouts
status=0, # Don't retry on HTTP status codes
backoff_factor=0.5, # Wait 0.3s, 0.6s, 1.2s between retries
allowed_methods=["HEAD", "GET", "OPTIONS", "POST"],
raise_on_status=False
)
adapter = HTTPAdapter(max_retries=retry_strategy)
session.mount("http://", adapter)
session.mount("https://", adapter)
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'): if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
from requests_file import FileAdapter from requests_file import FileAdapter
@@ -142,10 +162,11 @@ class fetcher(Fetcher):
watch_uuid=None, watch_uuid=None,
): ):
"""Async wrapper that runs the synchronous requests code in a thread pool""" """Async wrapper that runs the synchronous requests code in a thread pool"""
loop = asyncio.get_event_loop() loop = asyncio.get_event_loop()
# Run the synchronous _run_sync in a thread pool to avoid blocking the event loop # Run the synchronous _run_sync in a thread pool to avoid blocking the event loop
# Retry logic is handled by requests' HTTPAdapter (see _run_sync for configuration)
await loop.run_in_executor( await loop.run_in_executor(
None, # Use default ThreadPoolExecutor None, # Use default ThreadPoolExecutor
lambda: self._run_sync( lambda: self._run_sync(
+26 -16
View File
@@ -14,7 +14,7 @@ from pathlib import Path
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from threading import Event from threading import Event
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
from changedetectionio import worker_handler from changedetectionio import worker_pool
from flask import ( from flask import (
Flask, Flask,
@@ -195,7 +195,7 @@ def _jinja2_filter_format_number_locale(value: float) -> str:
@app.template_global('is_checking_now') @app.template_global('is_checking_now')
def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"): def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"):
return worker_handler.is_watch_running(watch_obj['uuid']) return worker_pool.is_watch_running(watch_obj['uuid'])
@app.template_global('get_watch_queue_position') @app.template_global('get_watch_queue_position')
def _get_watch_queue_position(watch_obj): def _get_watch_queue_position(watch_obj):
@@ -206,13 +206,13 @@ def _get_watch_queue_position(watch_obj):
@app.template_global('get_current_worker_count') @app.template_global('get_current_worker_count')
def _get_current_worker_count(): def _get_current_worker_count():
"""Get the current number of operational workers""" """Get the current number of operational workers"""
return worker_handler.get_worker_count() return worker_pool.get_worker_count()
@app.template_global('get_worker_status_info') @app.template_global('get_worker_status_info')
def _get_worker_status_info(): def _get_worker_status_info():
"""Get detailed worker status information for display""" """Get detailed worker status information for display"""
status = worker_handler.get_worker_status() status = worker_pool.get_worker_status()
running_uuids = worker_handler.get_running_uuids() running_uuids = worker_pool.get_running_uuids()
return { return {
'count': status['worker_count'], 'count': status['worker_count'],
@@ -801,7 +801,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc # watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update)) app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='') app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
@@ -838,10 +838,10 @@ def changedetection_app(config=None, datastore_o=None):
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
# Get basic status # Get basic status
status = worker_handler.get_worker_status() status = worker_pool.get_worker_status()
# Perform health check # Perform health check
health_result = worker_handler.check_worker_health( health_result = worker_pool.check_worker_health(
expected_count=expected_workers, expected_count=expected_workers,
update_q=update_q, update_q=update_q,
notification_q=notification_q, notification_q=notification_q,
@@ -905,14 +905,24 @@ def changedetection_app(config=None, datastore_o=None):
# Can be overridden by ENV or use the default settings # Can be overridden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
logger.info(f"Starting {n_workers} workers during app initialization") logger.info(f"Starting {n_workers} workers during app initialization")
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) worker_pool.start_workers(n_workers, update_q, notification_q, app, datastore)
# Skip background threads in batch mode (just process queue and exit) # Skip background threads in batch mode (just process queue and exit)
batch_mode = app.config.get('batch_mode', False) batch_mode = app.config.get('batch_mode', False)
if not batch_mode: if not batch_mode:
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
# Start configurable number of notification workers (default 1)
notification_workers = int(os.getenv("NOTIFICATION_WORKERS", "1"))
for i in range(notification_workers):
threading.Thread(
target=notification_runner,
args=(i,),
daemon=True,
name=f"NotificationRunner-{i}"
).start()
logger.info(f"Started {notification_workers} notification worker(s)")
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest # Check for new release version, but not when running in test/build or pytest
@@ -954,14 +964,14 @@ def check_for_new_version():
app.config.exit.wait(86400) app.config.exit.wait(86400)
def notification_runner(): def notification_runner(worker_id=0):
global notification_debug_log global notification_debug_log
from datetime import datetime from datetime import datetime
import json import json
with app.app_context(): with app.app_context():
while not app.config.exit.is_set(): while not app.config.exit.is_set():
try: try:
# At the moment only one thread runs (single runner) # Multiple workers can run concurrently (configurable via NOTIFICATION_WORKERS)
n_object = notification_q.get(block=False) n_object = notification_q.get(block=False)
except queue.Empty: except queue.Empty:
app.config.exit.wait(1) app.config.exit.wait(1)
@@ -987,7 +997,7 @@ def notification_runner():
sent_obj = process_notification(n_object, datastore) sent_obj = process_notification(n_object, datastore)
except Exception as e: except Exception as e:
logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}") logger.error(f"Notification worker {worker_id} - Watch URL: {n_object['watch_url']} Error {str(e)}")
# UUID wont be present when we submit a 'test' from the global settings # UUID wont be present when we submit a 'test' from the global settings
if 'uuid' in n_object: if 'uuid' in n_object:
@@ -1028,7 +1038,7 @@ def ticker_thread_check_time_launch_checks():
now = time.time() now = time.time()
if now - last_health_check > 60: if now - last_health_check > 60:
expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
health_result = worker_handler.check_worker_health( health_result = worker_pool.check_worker_health(
expected_count=expected_workers, expected_count=expected_workers,
update_q=update_q, update_q=update_q,
notification_q=notification_q, notification_q=notification_q,
@@ -1047,7 +1057,7 @@ def ticker_thread_check_time_launch_checks():
continue continue
# Get a list of watches by UUID that are currently fetching data # Get a list of watches by UUID that are currently fetching data
running_uuids = worker_handler.get_running_uuids() running_uuids = worker_pool.get_running_uuids()
# Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch # Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch
queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue} queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue}
@@ -1153,7 +1163,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time()) priority = int(time.time())
# Into the queue with you # Into the queue with you
queued_successfully = worker_handler.queue_item_async_safe(update_q, queued_successfully = worker_pool.queue_item_async_safe(update_q,
queuedWatchMetaData.PrioritizedItem(priority=priority, queuedWatchMetaData.PrioritizedItem(priority=priority,
item={'uuid': uuid}) item={'uuid': uuid})
) )
+1 -1
View File
@@ -29,7 +29,7 @@ class model(dict):
'proxy': None, # Preferred proxy connection 'proxy': None, # Preferred proxy connection
'time_between_check': {'weeks': None, 'days': None, 'hours': 3, 'minutes': None, 'seconds': None}, 'time_between_check': {'weeks': None, 'days': None, 'hours': 3, 'minutes': None, 'seconds': None},
'timeout': int(getenv("DEFAULT_SETTINGS_REQUESTS_TIMEOUT", "45")), # Default 45 seconds 'timeout': int(getenv("DEFAULT_SETTINGS_REQUESTS_TIMEOUT", "45")), # Default 45 seconds
'workers': int(getenv("DEFAULT_SETTINGS_REQUESTS_WORKERS", "10")), # Number of threads, lower is better for slow connections 'workers': int(getenv("DEFAULT_SETTINGS_REQUESTS_WORKERS", "5")), # Number of threads, lower is better for slow connections
'default_ua': { 'default_ua': {
'html_requests': getenv("DEFAULT_SETTINGS_HEADERS_USERAGENT", DEFAULT_SETTINGS_HEADERS_USERAGENT), 'html_requests': getenv("DEFAULT_SETTINGS_HEADERS_USERAGENT", DEFAULT_SETTINGS_HEADERS_USERAGENT),
'html_webdriver': None, 'html_webdriver': None,
+218 -131
View File
@@ -5,51 +5,57 @@ import heapq
import queue import queue
import threading import threading
try: # Janus is no longer required - we use pure threading.Queue for multi-loop support
import janus # try:
except ImportError: # import janus
logger.critical(f"CRITICAL: janus library is required. Install with: pip install janus") # except ImportError:
raise # pass # Not needed anymore
class RecheckPriorityQueue: class RecheckPriorityQueue:
""" """
Ultra-reliable priority queue using janus for async/sync bridging. Thread-safe priority queue supporting multiple async event loops.
CRITICAL DESIGN NOTE: Both sync_q and async_q are required because: ARCHITECTURE:
- sync_q: Used by Flask routes, ticker threads, and other synchronous code - Multiple async workers, each with its own event loop in its own thread
- async_q: Used by async workers (the actual fetchers/processors) and coroutines - Hybrid sync/async design for maximum scalability
- Sync interface for ticker thread (threading.Queue)
DO NOT REMOVE EITHER INTERFACE - they bridge different execution contexts: - Async interface for workers (asyncio.Event - NO executor threads!)
- Synchronous code (Flask, threads) cannot use async methods without blocking
- Async code cannot use sync methods without blocking the event loop SCALABILITY:
- janus provides the only safe bridge between these two worlds - Scales to 100-200+ workers without executor thread exhaustion
- Async workers wait on asyncio.Event (pure coroutines, no threads)
Attempting to unify to async-only would require: - Sync callers use threading.Queue (backward compatible)
- Converting all Flask routes to async (major breaking change)
- Using asyncio.run() in sync contexts (causes deadlocks) WHY NOT JANUS:
- Thread-pool wrapping (adds complexity and overhead) - Janus binds to ONE event loop at creation time
- Our architecture has 15+ workers, each with separate event loops
Minimal implementation focused on reliability: - Workers in different threads/loops cannot share janus async interface
- Pure janus for sync/async bridge
- Thread-safe priority ordering WHY NOT RUN_IN_EXECUTOR:
- Bulletproof error handling with critical logging - With 200 workers, run_in_executor() would block 200 threads
- Exhausts ThreadPoolExecutor, starves Flask HTTP handlers
- Pure async approach uses 0 threads while waiting
""" """
def __init__(self, maxsize: int = 0): def __init__(self, maxsize: int = 0):
try: try:
self._janus_queue = janus.Queue(maxsize=maxsize) import asyncio
# BOTH interfaces required - see class docstring for why
self.sync_q = self._janus_queue.sync_q # Flask routes, ticker thread # Sync interface: threading.Queue for ticker thread and Flask routes
self.async_q = self._janus_queue.async_q # Async workers self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
# Priority storage - thread-safe # Priority storage - thread-safe
self._priority_items = [] self._priority_items = []
self._lock = threading.RLock() self._lock = threading.RLock()
# No event signaling needed - pure polling approach
# Workers check queue every 50ms (latency acceptable: 0-500ms)
# Scales to 1000+ workers: each sleeping worker = ~4KB coroutine, not thread
# Signals for UI updates # Signals for UI updates
self.queue_length_signal = signal('queue_length') self.queue_length_signal = signal('queue_length')
logger.debug("RecheckPriorityQueue initialized successfully") logger.debug("RecheckPriorityQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to initialize RecheckPriorityQueue: {str(e)}")
@@ -58,38 +64,48 @@ class RecheckPriorityQueue:
# SYNC INTERFACE (for ticker thread) # SYNC INTERFACE (for ticker thread)
def put(self, item, block: bool = True, timeout: Optional[float] = None): def put(self, item, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with priority ordering""" """Thread-safe sync put with priority ordering"""
logger.trace(f"RecheckQueue.put() called for item: {self._get_item_uuid(item)}, block={block}, timeout={timeout}")
try: try:
# Add to priority storage # CRITICAL: Add to both priority storage AND notification queue atomically
# to prevent desynchronization where item exists but no notification
with self._lock: with self._lock:
heapq.heappush(self._priority_items, item) heapq.heappush(self._priority_items, item)
# Notify via janus sync queue # Add notification - use blocking with timeout for safety
self.sync_q.put(True, block=block, timeout=timeout) # Notification queue is unlimited size, so should never block in practice
# but timeout ensures we detect any unexpected issues (deadlock, etc)
# Emit signals try:
self._emit_put_signals(item) self._notification_queue.put(True, block=True, timeout=5.0)
except Exception as notif_e:
# Notification failed - MUST remove from priority_items to keep in sync
# This prevents "Priority queue inconsistency" errors in get()
logger.critical(f"CRITICAL: Notification queue put failed, removing from priority_items: {notif_e}")
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
raise # Re-raise to be caught by outer exception handler
# Signal emission after successful queue - log but don't fail the operation
# Item is already safely queued, so signal failure shouldn't affect queue state
try:
self._emit_put_signals(item)
except Exception as signal_e:
logger.error(f"Failed to emit put signals but item queued successfully: {signal_e}")
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}") logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {str(e)}") logger.critical(f"CRITICAL: Failed to put item {self._get_item_uuid(item)}: {type(e).__name__}: {str(e)}")
# Remove from priority storage if janus put failed # Item should have been cleaned up in the inner try/except if notification failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after put failure: {str(e)}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get with priority ordering""" """Thread-safe sync get with priority ordering"""
import queue logger.trace(f"RecheckQueue.get() called, block={block}, timeout={timeout}")
import queue as queue_module
try: try:
# Wait for notification # Wait for notification (this doesn't return the actual item, just signals availability)
self.sync_q.get(block=block, timeout=timeout) self._notification_queue.get(block=block, timeout=timeout)
# Get highest priority item # Get highest priority item
with self._lock: with self._lock:
@@ -98,69 +114,91 @@ class RecheckPriorityQueue:
raise Exception("Priority queue inconsistency") raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items) item = heapq.heappop(self._priority_items)
# Emit signals # Signal emission after successful retrieval - log but don't lose the item
self._emit_get_signals() # Item is already retrieved, so signal failure shouldn't affect queue state
try:
self._emit_get_signals()
except Exception as signal_e:
logger.error(f"Failed to emit get signals but item retrieved successfully: {signal_e}")
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}") logger.trace(f"RecheckQueue.get() successfully retrieved item: {self._get_item_uuid(item)}")
return item return item
except queue.Empty: except queue_module.Empty:
# Queue is empty with timeout - expected behavior, re-raise without logging # Queue is empty with timeout - expected behavior
raise logger.trace(f"RecheckQueue.get() timed out - queue is empty (timeout={timeout})")
raise # noqa
except Exception as e: except Exception as e:
# Re-raise without logging - caller (worker) will handle and log appropriately # Re-raise without logging - caller (worker) will handle and log appropriately
logger.trace(f"RecheckQueue.get() failed with exception: {type(e).__name__}: {str(e)}")
raise raise
# ASYNC INTERFACE (for workers) # ASYNC INTERFACE (for workers)
async def async_put(self, item): async def async_put(self, item, executor=None):
"""Pure async put with priority ordering""" """Async put with priority ordering - uses thread pool to avoid blocking
Args:
item: Item to add to queue
executor: Optional ThreadPoolExecutor. If None, uses default pool.
"""
logger.trace(f"RecheckQueue.async_put() called for item: {self._get_item_uuid(item)}, executor={executor}")
import asyncio
try: try:
# Add to priority storage # Use run_in_executor to call sync put without blocking event loop
with self._lock: loop = asyncio.get_event_loop()
heapq.heappush(self._priority_items, item) result = await loop.run_in_executor(
executor, # Use provided executor or default
# Notify via janus async queue lambda: self.put(item, block=True, timeout=5.0)
await self.async_q.put(True) )
# Emit signals logger.trace(f"RecheckQueue.async_put() successfully queued item: {self._get_item_uuid(item)}")
self._emit_put_signals(item) return result
logger.debug(f"Successfully async queued item: {self._get_item_uuid(item)}")
return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}") logger.critical(f"CRITICAL: Failed to async put item {self._get_item_uuid(item)}: {str(e)}")
# Remove from priority storage if janus put failed
try:
with self._lock:
if item in self._priority_items:
self._priority_items.remove(item)
heapq.heapify(self._priority_items)
except Exception as cleanup_e:
logger.critical(f"CRITICAL: Failed to cleanup after async put failure: {str(e)}")
return False return False
async def async_get(self): async def async_get(self, executor=None, timeout=1.0):
"""Pure async get with priority ordering""" """
Efficient async get using executor for blocking call.
HYBRID APPROACH: Best of both worlds
- Uses run_in_executor for efficient blocking (no polling overhead)
- Single timeout (no double-timeout race condition)
- Scales well: executor sized to match worker count
With FETCH_WORKERS=10: 10 threads blocked max (acceptable)
With FETCH_WORKERS=200: Need executor with 200+ threads (see worker_pool.py)
Args:
executor: ThreadPoolExecutor (sized to match worker count)
timeout: Maximum time to wait in seconds
Returns:
Item from queue
Raises:
queue.Empty: If timeout expires with no item available
"""
logger.trace(f"RecheckQueue.async_get() called, timeout={timeout}")
import asyncio
try: try:
# Wait for notification # Use run_in_executor to call sync get efficiently
await self.async_q.get() # No outer asyncio.wait_for wrapper = no double timeout issue!
loop = asyncio.get_event_loop()
item = await loop.run_in_executor(
executor,
lambda: self.get(block=True, timeout=timeout)
)
# Get highest priority item logger.trace(f"RecheckQueue.async_get() successfully retrieved item: {self._get_item_uuid(item)}")
with self._lock:
if not self._priority_items:
logger.critical(f"CRITICAL: Async queue notification received but no priority items available")
raise Exception("Priority queue inconsistency")
item = heapq.heappop(self._priority_items)
# Emit signals
self._emit_get_signals()
logger.debug(f"Successfully async retrieved item: {self._get_item_uuid(item)}")
return item return item
except queue.Empty:
logger.trace(f"RecheckQueue.async_get() timed out - queue is empty")
raise
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get item from queue: {str(e)}") logger.critical(f"CRITICAL: Failed to async get item from queue: {type(e).__name__}: {str(e)}")
raise raise
# UTILITY METHODS # UTILITY METHODS
@@ -186,10 +224,35 @@ class RecheckPriorityQueue:
logger.critical(f"CRITICAL: Failed to get queued UUIDs: {str(e)}") logger.critical(f"CRITICAL: Failed to get queued UUIDs: {str(e)}")
return [] return []
def close(self): def clear(self):
"""Close the janus queue""" """Clear all items from both priority storage and notification queue"""
try: try:
self._janus_queue.close() with self._lock:
# Clear priority items
self._priority_items.clear()
# Drain all notifications to prevent stale notifications
# This is critical for test cleanup to prevent queue desynchronization
drained = 0
while not self._notification_queue.empty():
try:
self._notification_queue.get_nowait()
drained += 1
except queue.Empty:
break
if drained > 0:
logger.debug(f"Cleared queue: removed {drained} notifications")
return True
except Exception as e:
logger.critical(f"CRITICAL: Failed to clear queue: {str(e)}")
return False
def close(self):
"""Close the queue"""
try:
# Nothing to close for threading.Queue
logger.debug("RecheckPriorityQueue closed successfully") logger.debug("RecheckPriorityQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to close RecheckPriorityQueue: {str(e)}")
@@ -321,7 +384,7 @@ class RecheckPriorityQueue:
except Exception: except Exception:
pass pass
return 'unknown' return 'unknown'
def _emit_put_signals(self, item): def _emit_put_signals(self, item):
"""Emit signals when item is added""" """Emit signals when item is added"""
try: try:
@@ -330,14 +393,14 @@ class RecheckPriorityQueue:
watch_check_update = signal('watch_check_update') watch_check_update = signal('watch_check_update')
if watch_check_update: if watch_check_update:
watch_check_update.send(watch_uuid=item.item['uuid']) watch_check_update.send(watch_uuid=item.item['uuid'])
# Queue length signal # Queue length signal
if self.queue_length_signal: if self.queue_length_signal:
self.queue_length_signal.send(length=self.qsize()) self.queue_length_signal.send(length=self.qsize())
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}") logger.critical(f"CRITICAL: Failed to emit put signals: {str(e)}")
def _emit_get_signals(self): def _emit_get_signals(self):
"""Emit signals when item is removed""" """Emit signals when item is removed"""
try: try:
@@ -363,12 +426,11 @@ class NotificationQueue:
def __init__(self, maxsize: int = 0, datastore=None): def __init__(self, maxsize: int = 0, datastore=None):
try: try:
self._janus_queue = janus.Queue(maxsize=maxsize) # Use pure threading.Queue to avoid event loop binding issues
# BOTH interfaces required - see class docstring for why self._notification_queue = queue.Queue(maxsize=maxsize if maxsize > 0 else 0)
self.sync_q = self._janus_queue.sync_q # Flask routes, threads
self.async_q = self._janus_queue.async_q # Async workers
self.notification_event_signal = signal('notification_event') self.notification_event_signal = signal('notification_event')
self.datastore = datastore # For checking all_muted setting self.datastore = datastore # For checking all_muted setting
self._lock = threading.RLock()
logger.debug("NotificationQueue initialized successfully") logger.debug("NotificationQueue initialized successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to initialize NotificationQueue: {str(e)}")
@@ -380,72 +442,97 @@ class NotificationQueue:
def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None): def put(self, item: Dict[str, Any], block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync put with signal emission""" """Thread-safe sync put with signal emission"""
logger.trace(f"NotificationQueue.put() called for item: {item.get('uuid', 'unknown')}, block={block}, timeout={timeout}")
try: try:
# Check if all notifications are muted # Check if all notifications are muted
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False): if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}") logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False return False
self.sync_q.put(item, block=block, timeout=timeout) with self._lock:
self._notification_queue.put(item, block=block, timeout=timeout)
self._emit_notification_signal(item) self._emit_notification_signal(item)
logger.debug(f"Successfully queued notification: {item.get('uuid', 'unknown')}") logger.trace(f"NotificationQueue.put() successfully queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}") logger.critical(f"CRITICAL: Failed to put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False return False
async def async_put(self, item: Dict[str, Any]): async def async_put(self, item: Dict[str, Any], executor=None):
"""Pure async put with signal emission""" """Async put with signal emission - uses thread pool
Args:
item: Notification item to queue
executor: Optional ThreadPoolExecutor
"""
logger.trace(f"NotificationQueue.async_put() called for item: {item.get('uuid', 'unknown')}, executor={executor}")
import asyncio
try: try:
# Check if all notifications are muted # Check if all notifications are muted
if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False): if self.datastore and self.datastore.data['settings']['application'].get('all_muted', False):
logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}") logger.debug(f"Notification blocked - all notifications are muted: {item.get('uuid', 'unknown')}")
return False return False
await self.async_q.put(item) loop = asyncio.get_event_loop()
self._emit_notification_signal(item) await loop.run_in_executor(executor, lambda: self.put(item, block=True, timeout=5.0))
logger.debug(f"Successfully async queued notification: {item.get('uuid', 'unknown')}") logger.trace(f"NotificationQueue.async_put() successfully queued notification: {item.get('uuid', 'unknown')}")
return True return True
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}") logger.critical(f"CRITICAL: Failed to async put notification {item.get('uuid', 'unknown')}: {str(e)}")
return False return False
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get""" """Thread-safe sync get"""
logger.trace(f"NotificationQueue.get() called, block={block}, timeout={timeout}")
try: try:
return self.sync_q.get(block=block, timeout=timeout) with self._lock:
item = self._notification_queue.get(block=block, timeout=timeout)
logger.trace(f"NotificationQueue.get() retrieved item: {item.get('uuid', 'unknown') if isinstance(item, dict) else 'unknown'}")
return item
except queue.Empty as e: except queue.Empty as e:
logger.trace(f"NotificationQueue.get() timed out - queue is empty (timeout={timeout})")
raise e raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification: {str(e)}") logger.critical(f"CRITICAL: Failed to get notification: {type(e).__name__}: {str(e)}")
raise e raise e
async def async_get(self): async def async_get(self, executor=None):
"""Pure async get""" """Async get - uses thread pool
Args:
executor: Optional ThreadPoolExecutor
"""
logger.trace(f"NotificationQueue.async_get() called, executor={executor}")
import asyncio
try: try:
return await self.async_q.get() loop = asyncio.get_event_loop()
item = await loop.run_in_executor(executor, lambda: self.get(block=True, timeout=1.0))
logger.trace(f"NotificationQueue.async_get() retrieved item: {item.get('uuid', 'unknown') if isinstance(item, dict) else 'unknown'}")
return item
except queue.Empty as e: except queue.Empty as e:
logger.trace(f"NotificationQueue.async_get() timed out - queue is empty")
raise e raise e
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to async get notification: {str(e)}") logger.critical(f"CRITICAL: Failed to async get notification: {type(e).__name__}: {str(e)}")
raise e raise e
def qsize(self) -> int: def qsize(self) -> int:
"""Get current queue size""" """Get current queue size"""
try: try:
return self.sync_q.qsize() with self._lock:
return self._notification_queue.qsize()
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}") logger.critical(f"CRITICAL: Failed to get notification queue size: {str(e)}")
return 0 return 0
def empty(self) -> bool: def empty(self) -> bool:
"""Check if queue is empty""" """Check if queue is empty"""
return self.qsize() == 0 return self.qsize() == 0
def close(self): def close(self):
"""Close the janus queue""" """Close the queue"""
try: try:
self._janus_queue.close() # Nothing to close for threading.Queue
logger.debug("NotificationQueue closed successfully") logger.debug("NotificationQueue closed successfully")
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}") logger.critical(f"CRITICAL: Failed to close NotificationQueue: {str(e)}")
+2 -2
View File
@@ -37,9 +37,9 @@ def register_watch_operation_handlers(socketio, datastore):
# Import here to avoid circular imports # Import here to avoid circular imports
from changedetectionio.flask_app import update_q from changedetectionio.flask_app import update_q
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler from changedetectionio import worker_pool
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid})) worker_pool.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
logger.info(f"Socket.IO: Queued recheck for watch {uuid}") logger.info(f"Socket.IO: Queued recheck for watch {uuid}")
else: else:
emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'}) emit('operation_result', {'success': False, 'error': f'Unknown operation: {op}'})
+4 -4
View File
@@ -145,10 +145,10 @@ def handle_watch_update(socketio, **kwargs):
# Emit the watch update to all connected clients # Emit the watch update to all connected clients
from changedetectionio.flask_app import update_q from changedetectionio.flask_app import update_q
from changedetectionio.flask_app import _jinja2_filter_datetime from changedetectionio.flask_app import _jinja2_filter_datetime
from changedetectionio import worker_handler from changedetectionio import worker_pool
# Get list of watches that are currently running # Get list of watches that are currently running
running_uuids = worker_handler.get_running_uuids() running_uuids = worker_pool.get_running_uuids()
# Get list of watches in the queue (efficient single-lock method) # Get list of watches in the queue (efficient single-lock method)
queue_list = update_q.get_queued_uuids() queue_list = update_q.get_queued_uuids()
@@ -252,7 +252,7 @@ def init_socketio(app, datastore):
def event_checkbox_operations(data): def event_checkbox_operations(data):
from changedetectionio.blueprint.ui import _handle_operations from changedetectionio.blueprint.ui import _handle_operations
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, watch_check_update from changedetectionio.flask_app import update_q, watch_check_update
import threading import threading
@@ -268,7 +268,7 @@ def init_socketio(app, datastore):
uuids=data.get('uuids'), uuids=data.get('uuids'),
datastore=datastore, datastore=datastore,
extra_data=data.get('extra_data'), extra_data=data.get('extra_data'),
worker_handler=worker_handler, worker_pool=worker_pool,
update_q=update_q, update_q=update_q,
queuedWatchMetaData=queuedWatchMetaData, queuedWatchMetaData=queuedWatchMetaData,
watch_check_update=watch_check_update, watch_check_update=watch_check_update,
+6 -2
View File
@@ -10,6 +10,7 @@
set -e set -e
SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd ) SCRIPT_DIR=$( cd -- "$( dirname -- "${BASH_SOURCE[0]}" )" &> /dev/null && pwd )
rm tests/logs/* -f
# Since theres no curl installed lets roll with python3 # Since theres no curl installed lets roll with python3
check_sanity() { check_sanity() {
@@ -64,18 +65,21 @@ data_sanity_test
echo "-------------------- Running rest of tests in parallel -------------------------------" echo "-------------------- Running rest of tests in parallel -------------------------------"
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser # REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
REMOVE_REQUESTS_OLD_SCREENSHOTS=false \ FETCH_WORKERS=2 REMOVE_REQUESTS_OLD_SCREENSHOTS=false \
pytest tests/test_*.py \ pytest tests/test_*.py \
-n 30 \ -n 18 \
--dist=load \ --dist=load \
-vvv \ -vvv \
-s \ -s \
--capture=no \ --capture=no \
-k "not test_queue_system" \
--log-cli-level=DEBUG \ --log-cli-level=DEBUG \
--log-cli-format="%(asctime)s [%(process)d] [%(levelname)s] %(name)s: %(message)s" --log-cli-format="%(asctime)s [%(process)d] [%(levelname)s] %(name)s: %(message)s"
echo "---------------------------- DONE parallel test ---------------------------------------" echo "---------------------------- DONE parallel test ---------------------------------------"
FETCH_WORKERS=20 pytest -vvv -s tests/test_queue_handler.py
echo "RUNNING WITH BASE_URL SET" echo "RUNNING WITH BASE_URL SET"
# Now re-run some tests with BASE_URL enabled # Now re-run some tests with BASE_URL enabled
+3
View File
@@ -166,6 +166,9 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
""" """
logger.info(f"Datastore path is '{datastore_path}'") logger.info(f"Datastore path is '{datastore_path}'")
# CRITICAL: Update datastore_path (was using old path from __init__)
self.datastore_path = datastore_path
# Initialize data structure # Initialize data structure
self.__data = App.model() self.__data = App.model()
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json") self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
+145 -3
View File
@@ -9,6 +9,11 @@ from changedetectionio import store
import os import os
import sys import sys
# CRITICAL: Set short timeout for tests to prevent 45-second hangs
# When test server is slow/unresponsive, workers fail fast instead of holding UUIDs for 45s
# This prevents exponential priority growth from repeated deferrals (priority × 10 each defer)
os.environ['DEFAULT_SETTINGS_REQUESTS_TIMEOUT'] = '5'
from changedetectionio.flask_app import init_app_secret, changedetection_app from changedetectionio.flask_app import init_app_secret, changedetection_app
from changedetectionio.tests.util import live_server_setup, new_live_server_setup from changedetectionio.tests.util import live_server_setup, new_live_server_setup
@@ -29,6 +34,93 @@ def reportlog(pytestconfig):
logger.remove(handler_id) logger.remove(handler_id)
@pytest.fixture(autouse=True)
def per_test_log_file(request):
"""Create a separate log file for each test function with pytest output."""
import re
# Create logs directory if it doesn't exist
log_dir = os.path.join(os.path.dirname(__file__), "logs")
os.makedirs(log_dir, exist_ok=True)
# Generate log filename from test name and worker ID (for parallel runs)
test_name = request.node.name
# Sanitize test name - replace unsafe characters with underscores
# Keep only alphanumeric, dash, underscore, and period
safe_test_name = re.sub(r'[^\w\-.]', '_', test_name)
# Limit length to avoid filesystem issues (max 200 chars)
if len(safe_test_name) > 200:
# Keep first 150 chars + hash of full name + last 30 chars
import hashlib
name_hash = hashlib.md5(test_name.encode()).hexdigest()[:8]
safe_test_name = f"{safe_test_name[:150]}_{name_hash}_{safe_test_name[-30:]}"
worker_id = os.environ.get('PYTEST_XDIST_WORKER', 'master')
log_file = os.path.join(log_dir, f"{safe_test_name}_{worker_id}.log")
# Add file handler for this test with TRACE level
handler_id = logger.add(
log_file,
format="{time:YYYY-MM-DD HH:mm:ss.SSS} | {level: <8} | {process} | {name}:{function}:{line} - {message}",
level="TRACE",
mode="w", # Overwrite if exists
enqueue=True # Thread-safe
)
logger.info(f"=== Starting test: {test_name} (worker: {worker_id}) ===")
logger.info(f"Test location: {request.node.nodeid}")
yield
# Capture test outcome (PASSED/FAILED/SKIPPED/ERROR)
outcome = "UNKNOWN"
exc_info = None
stdout = None
stderr = None
if hasattr(request.node, 'rep_call'):
outcome = request.node.rep_call.outcome.upper()
if request.node.rep_call.failed:
exc_info = request.node.rep_call.longreprtext
# Capture stdout/stderr from call phase
if hasattr(request.node.rep_call, 'sections'):
for section_name, section_content in request.node.rep_call.sections:
if 'stdout' in section_name.lower():
stdout = section_content
elif 'stderr' in section_name.lower():
stderr = section_content
elif hasattr(request.node, 'rep_setup'):
if request.node.rep_setup.failed:
outcome = "SETUP_FAILED"
exc_info = request.node.rep_setup.longreprtext
logger.info(f"=== Test Result: {outcome} ===")
if exc_info:
logger.error(f"=== Test Failure Details ===\n{exc_info}")
if stdout:
logger.info(f"=== Captured stdout ===\n{stdout}")
if stderr:
logger.warning(f"=== Captured stderr ===\n{stderr}")
logger.info(f"=== Finished test: {test_name} ===")
logger.remove(handler_id)
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Hook to capture test results and attach to the test node."""
outcome = yield
rep = outcome.get_result()
# Store report on the test node for access in fixtures
setattr(item, f"rep_{rep.when}", rep)
@pytest.fixture @pytest.fixture
def environment(mocker): def environment(mocker):
"""Mock arrow.now() to return a fixed datetime for testing jinja2 time extension.""" """Mock arrow.now() to return a fixed datetime for testing jinja2 time extension."""
@@ -165,6 +257,57 @@ def prepare_test_function(live_server, datastore_path):
except: except:
break break
# Add test helper methods to the app for worker management
def set_workers(count):
"""Set the number of workers for testing - brutal shutdown, no delays"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
current_count = worker_pool.get_worker_count()
# Special case: Setting to 0 means shutdown all workers brutally
if count == 0:
logger.debug(f"Brutally shutting down all {current_count} workers")
worker_pool.shutdown_workers()
return {
'status': 'success',
'message': f'Shutdown all {current_count} workers',
'previous_count': current_count,
'current_count': 0
}
# Adjust worker count (no delays, no verification)
result = worker_pool.adjust_async_worker_count(
count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
return result
def check_all_workers_alive(expected_count):
"""Check that all expected workers are alive"""
from changedetectionio import worker_pool
from changedetectionio.flask_app import update_q, notification_q
result = worker_pool.check_worker_health(
expected_count,
update_q=update_q,
notification_q=notification_q,
app=live_server.app,
datastore=datastore
)
assert result['status'] == 'healthy', f"Workers not healthy: {result['message']}"
return result
# Attach helper methods to app for easy test access
live_server.app.set_workers = set_workers
live_server.app.check_all_workers_alive = check_all_workers_alive
# Prevent background thread from writing during cleanup/reload # Prevent background thread from writing during cleanup/reload
datastore.needs_write = False datastore.needs_write = False
datastore.needs_write_urgent = False datastore.needs_write_urgent = False
@@ -262,8 +405,8 @@ def app(request, datastore_path):
# Shutdown workers gracefully before loguru cleanup # Shutdown workers gracefully before loguru cleanup
try: try:
from changedetectionio import worker_handler from changedetectionio import worker_pool
worker_handler.shutdown_workers() worker_pool.shutdown_workers()
except Exception: except Exception:
pass pass
@@ -311,4 +454,3 @@ def app(request, datastore_path):
yield app yield app
@@ -2,7 +2,7 @@
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks from .util import live_server_setup, extract_UUID_from_client, wait_for_all_checks, delete_all_watches
import os import os
@@ -116,7 +116,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# And not this cause its not the ld-json # And not this cause its not the ld-json
assert b"So let's see what happens" not in res.data assert b"So let's see what happens" not in res.data
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
########################################################################################## ##########################################################################################
# And we shouldnt see the offer # And we shouldnt see the offer
@@ -131,7 +131,7 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
assert b'ldjson-price-track-offer' not in res.data assert b'ldjson-price-track-offer' not in res.data
########################################################################################## ##########################################################################################
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data): def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
@@ -147,7 +147,7 @@ def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_
########################################################################################## ##########################################################################################
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_bad_ldjson_is_correctly_ignored(client, live_server, measure_memory_usage, datastore_path): def test_bad_ldjson_is_correctly_ignored(client, live_server, measure_memory_usage, datastore_path):
+1 -1
View File
@@ -414,4 +414,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
assert b'Abonnementen bijwerken' in res.data assert b'Abonnementen bijwerken' in res.data
assert b'&lt;foobar' not in res.data assert b'&lt;foobar' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) res = delete_all_watches(client)
@@ -6,7 +6,7 @@ from .util import (
set_original_response, set_original_response,
set_modified_response, set_modified_response,
live_server_setup, live_server_setup,
wait_for_all_checks wait_for_all_checks, delete_all_watches
) )
from loguru import logger from loguru import logger
@@ -104,7 +104,7 @@ def run_socketio_watch_update_test(client, live_server, password_mode="", datast
assert watch.has_unviewed, "The watch was not marked as unviewed after content change" assert watch.has_unviewed, "The watch was not marked as unviewed after content change"
# Clean up # Clean up
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_everything(live_server, client, measure_memory_usage, datastore_path): def test_everything(live_server, client, measure_memory_usage, datastore_path):
+5 -20
View File
@@ -69,7 +69,7 @@ def test_conditions_with_text_and_number(client, live_server, measure_memory_usa
# 1. The page filtered text must contain "5" (first digit of value) # 1. The page filtered text must contain "5" (first digit of value)
# 2. The extracted number should be >= 20 and <= 100 # 2. The extracted number should be >= 20 and <= 100
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={ data={
"url": test_url, "url": test_url,
"fetch_backend": "html_requests", "fetch_backend": "html_requests",
@@ -110,25 +110,20 @@ def test_conditions_with_text_and_number(client, live_server, measure_memory_usa
wait_for_all_checks(client) wait_for_all_checks(client)
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True) client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2) time.sleep(1)
wait_for_all_checks(client)
# Case 1 # Case 1
set_number_in_range_response(datastore_path=datastore_path, number="70.5") set_number_in_range_response(datastore_path=datastore_path, number="70.5")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
time.sleep(2)
# 75 is > 20 and < 100 and contains "5" # 75 is > 20 and < 100 and contains "5"
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data assert b'has-unread-changes' in res.data
# Case 2: Change with one condition violated # Case 2: Change with one condition violated
# Number out of range (150) but contains '5' # Number out of range (150) but contains '5'
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True) client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2)
set_number_out_of_range_response(datastore_path=datastore_path, number="150.5") set_number_out_of_range_response(datastore_path=datastore_path, number="150.5")
@@ -154,7 +149,6 @@ def test_condition_validate_rule_row(client, live_server, measure_memory_usage,
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# the front end submits the current form state which should override the watch in a temporary copy # the front end submits the current form state which should override the watch in a temporary copy
res = client.post( res = client.post(
@@ -195,12 +189,8 @@ def test_condition_validate_rule_row(client, live_server, measure_memory_usage,
) )
assert res.status_code == 200 assert res.status_code == 200
assert b'false' in res.data assert b'false' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
delete_all_watches(client)
# If there was only a change in the whitespacing, then we shouldnt have a change detected # If there was only a change in the whitespacing, then we shouldnt have a change detected
@@ -230,17 +220,12 @@ def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage,
# Check it saved # Check it saved
res = client.get( res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
) )
# Assert the word count is counted correctly # Assert the word count is counted correctly
assert b'<td>13</td>' in res.data assert b'<td>13</td>' in res.data
delete_all_watches(client)
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
# If there was only a change in the whitespacing, then we shouldnt have a change detected # If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_lev_conditions_plugin(client, live_server, measure_memory_usage, datastore_path): def test_lev_conditions_plugin(client, live_server, measure_memory_usage, datastore_path):
@@ -64,6 +64,7 @@ def test_DNS_errors(client, live_server, measure_memory_usage, datastore_path):
follow_redirects=True follow_redirects=True
) )
assert b"1 Imported" in res.data assert b"1 Imported" in res.data
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
wait_for_all_checks(client) wait_for_all_checks(client)
@@ -79,7 +80,7 @@ def test_DNS_errors(client, live_server, measure_memory_usage, datastore_path):
) )
assert found_name_resolution_error assert found_name_resolution_error
# Should always record that we tried # Should always record that we tried
assert bytes("just now".encode('utf-8')) in res.data assert "just now".encode('utf-8') in res.data or 'seconds ago'.encode('utf-8') in res.data
delete_all_watches(client) delete_all_watches(client)
# Re 1513 # Re 1513
@@ -1,7 +1,7 @@
import os import os
import time import time
from flask import url_for from flask import url_for
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output, delete_all_watches
from ..notification import valid_notification_formats from ..notification import valid_notification_formats
@@ -118,8 +118,10 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert b'Warning, no filters were found' in res.data assert b'Warning, no filters were found' in res.data
assert not os.path.isfile(notification_file) assert not os.path.isfile(notification_file)
time.sleep(1) time.sleep(2)
wait_for_all_checks(client)
wait_for_all_checks(client)
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 5 assert live_server.app.config['DATASTORE'].data['watching'][uuid]['consecutive_filter_failures'] == 5
time.sleep(2) time.sleep(2)
@@ -178,6 +180,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
follow_redirects=True follow_redirects=True
) )
os.unlink(notification_file) os.unlink(notification_file)
delete_all_watches(client)
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage, datastore_path): def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage, datastore_path):
@@ -185,10 +188,12 @@ def test_check_include_filters_failure_notification(client, live_server, measure
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path) run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path)
# Check markup send conversion didnt affect plaintext preference # Check markup send conversion didnt affect plaintext preference
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text'), datastore_path=datastore_path) run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text'), datastore_path=datastore_path)
delete_all_watches(client)
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage, datastore_path): def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage, datastore_path):
# # live_server_setup(live_server) # Setup on conftest per function # # live_server_setup(live_server) # Setup on conftest per function
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path) run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor'), datastore_path=datastore_path)
delete_all_watches(client)
# Test that notification is never sent # Test that notification is never sent
@@ -197,3 +202,4 @@ def test_basic_markup_from_text(client, live_server, measure_memory_usage, datas
from ..notification.handler import markup_text_links_to_html from ..notification.handler import markup_text_links_to_html
x = markup_text_links_to_html("hello https://google.com") x = markup_text_links_to_html("hello https://google.com")
assert 'a href' in x assert 'a href' in x
delete_all_watches(client)
+2 -1
View File
@@ -166,7 +166,8 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage, datastore_path
delete_all_watches(client) delete_all_watches(client)
def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path): def test_group_tag_notification(client, live_server, measure_memory_usage, datastore_path):
delete_all_watches(client)
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
@@ -142,6 +142,8 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved" assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
delete_all_watches(client)
def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path): def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path):
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
@@ -162,7 +164,7 @@ def test_check_text_history_view(client, live_server, measure_memory_usage, data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
res = client.get(url_for("ui.ui_diff.diff_history_page", uuid="first")) res = client.get(url_for("ui.ui_diff.diff_history_page", uuid=uuid))
assert b'test-one' in res.data assert b'test-one' in res.data
assert b'test-two' in res.data assert b'test-two' in res.data
@@ -40,10 +40,7 @@ def set_some_changed_response(datastore_path):
def test_normal_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path): def test_normal_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path):
from loguru import logger
# Give the endpoint time to spin up
time.sleep(1)
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)
@@ -62,20 +59,41 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
logger.info(f"TEST: First check - queuing UUID {uuid}")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) logger.info(f"TEST: Waiting for first check to complete")
wait_result = wait_for_all_checks(client)
logger.info(f"TEST: First check wait completed: {wait_result}")
# Check history after first check
watch = client.application.config.get('DATASTORE').data['watching'][uuid]
logger.info(f"TEST: After first check - history count: {len(watch.history.keys())}")
set_some_changed_response(datastore_path=datastore_path) set_some_changed_response(datastore_path=datastore_path)
wait_for_all_checks(client)
# Trigger a check # Trigger a check
logger.info(f"TEST: Second check - queuing UUID {uuid}")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up logger.info(f"TEST: Waiting for second check to complete")
wait_for_all_checks(client) wait_result = wait_for_all_checks(client)
logger.info(f"TEST: Second check wait completed: {wait_result}")
# Check history after second check
watch = client.application.config.get('DATASTORE').data['watching'][uuid]
logger.info(f"TEST: After second check - history count: {len(watch.history.keys())}")
logger.info(f"TEST: Watch history keys: {list(watch.history.keys())}")
# It should report nothing found (no new 'has-unread-changes' class) # It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
if b'has-unread-changes' not in res.data:
logger.error(f"TEST FAILED: has-unread-changes not found in response")
logger.error(f"TEST: Watch last_error: {watch.get('last_error')}")
logger.error(f"TEST: Watch last_checked: {watch.get('last_checked')}")
assert b'has-unread-changes' in res.data assert b'has-unread-changes' in res.data
assert b'/test-endpoint' in res.data assert b'/test-endpoint' in res.data
+1 -1
View File
@@ -82,7 +82,7 @@ def test_import_distillio(client, live_server, measure_memory_usage, datastore_p
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1) time.sleep(1)
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
res = client.post( res = client.post(
url_for("imports.import_page"), url_for("imports.import_page"),
data={ data={
@@ -224,6 +224,7 @@ def check_json_filter(json_filter, client, live_server, datastore_path):
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)
delete_all_watches(client)
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True) test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()}) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
@@ -297,14 +298,17 @@ def check_json_filter_bool_val(json_filter, client, live_server, datastore_path)
def test_check_jsonpath_filter_bool_val(client, live_server, measure_memory_usage, datastore_path): def test_check_jsonpath_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
check_json_filter_bool_val("json:$['available']", client, live_server, datastore_path=datastore_path) check_json_filter_bool_val("json:$['available']", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jq_filter_bool_val(client, live_server, measure_memory_usage, datastore_path): def test_check_jq_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
if jq_support: if jq_support:
check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path) check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage, datastore_path): def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage, datastore_path):
if jq_support: if jq_support:
check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path) check_json_filter_bool_val("jq:.available", client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
# Re #265 - Extended JSON selector test # Re #265 - Extended JSON selector test
# Stuff to consider here # Stuff to consider here
@@ -452,14 +456,17 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage, datastore_path): def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage, datastore_path):
check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server, datastore_path=datastore_path) check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jq_ext_filter(client, live_server, measure_memory_usage, datastore_path): def test_check_jq_ext_filter(client, live_server, measure_memory_usage, datastore_path):
if jq_support: if jq_support:
check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path) check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_check_jqraw_ext_filter(client, live_server, measure_memory_usage, datastore_path): def test_check_jqraw_ext_filter(client, live_server, measure_memory_usage, datastore_path):
if jq_support: if jq_support:
check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path) check_json_ext_filter('jq:.[] | select(.status | contains("Sold"))', client, live_server, datastore_path=datastore_path)
delete_all_watches(client)
def test_jsonpath_BOM_utf8(client, live_server, measure_memory_usage, datastore_path): def test_jsonpath_BOM_utf8(client, live_server, measure_memory_usage, datastore_path):
from .. import html_tools from .. import html_tools
@@ -470,5 +477,6 @@ def test_jsonpath_BOM_utf8(client, live_server, measure_memory_usage, datastore_
# See that we can find the second <script> one, which is not broken, and matches our filter # See that we can find the second <script> one, which is not broken, and matches our filter
text = html_tools.extract_json_as_string(json_str, "json:$.name") text = html_tools.extract_json_as_string(json_str, "json:$.name")
assert text == '"José"' assert text == '"José"'
delete_all_watches(client)
+2 -8
View File
@@ -313,14 +313,8 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
# Add a watch and trigger a HTTP POST # Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
res = client.post( watch_uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, tag="nice one")
url_for("ui.ui_views.form_quick_watch_add"), res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
wait_for_all_checks(client) wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path) set_modified_response(datastore_path=datastore_path)
@@ -1,7 +1,7 @@
import os import os
import time import time
from flask import url_for from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
import logging import logging
def test_check_notification_error_handling(client, live_server, measure_memory_usage, datastore_path): def test_check_notification_error_handling(client, live_server, measure_memory_usage, datastore_path):
@@ -81,4 +81,4 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
os.unlink(os.path.join(datastore_path, "notification.txt")) os.unlink(os.path.join(datastore_path, "notification.txt"))
assert 'xxxxx' in notification_submission assert 'xxxxx' in notification_submission
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
@@ -0,0 +1,52 @@
import os
import time
from flask import url_for
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
from ..notification import valid_notification_formats
from loguru import logger
def test_queue_system(client, live_server, measure_memory_usage, datastore_path):
"""Test that multiple workers can process queue concurrently without blocking each other"""
# (pytest) Werkzeug's threaded server uses ThreadPoolExecutor with a default limit of around 40 threads (or min(32, os.cpu_count() + 4)).
items = os.cpu_count() +3
delay = 10
# Auto-queue is off here.
live_server.app.config['DATASTORE'].data['settings']['application']['all_paused'] = True
test_urls = [
f"{url_for('test_endpoint', _external=True)}?delay={delay}&id={i}&content=hello+test+content+{i}"
for i in range(0, items)
]
# Import 30 URLs to queue
res = client.post(
url_for("imports.import_page"),
data={"urls": "\r\n".join(test_urls)},
follow_redirects=True
)
assert f"{items} Imported".encode('utf-8') in res.data
client.application.set_workers(items)
start = time.time()
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(delay/2)
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
logger.debug( f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}")
assert len(running_uuids) != 0, f"Should be atleast some workers running - {len(running_uuids)} UUIDs still being processed: {running_uuids}"
wait_for_all_checks(client)
# all workers should be done in less than say 10 seconds (they take time to 'see' something is in the queue too)
total_time = (time.time() - start)
logger.debug(f"All workers finished {items} items in less than {delay} seconds per job. {total_time}s total")
# if there was a bug in queue handler not running parallel, this would blow out to items*delay seconds
assert total_time < delay + 10, f"All workers finished {items} items in less than {delay} seconds per job, total time {total_time}s"
# Verify all workers are idle (no UUIDs being processed)
from changedetectionio import worker_pool
running_uuids = worker_pool.get_running_uuids()
assert len(running_uuids) == 0, f"Expected all workers to be idle, but {len(running_uuids)} UUIDs still being processed: {running_uuids}"
+9 -8
View File
@@ -17,12 +17,12 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
test_url = test_url.replace('localhost', 'changedet') test_url = test_url.replace('localhost', 'changedet')
# Add the test URL twice, we will check # Add the test URL twice, we will check
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuidA = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuidB = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
@@ -31,7 +31,7 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
# Add some headers to a request # Add some headers to a request
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuidA),
data={ data={
"url": test_url, "url": test_url,
"tags": "", "tags": "",
@@ -42,13 +42,14 @@ def test_headers_in_request(client, live_server, measure_memory_usage, datastore
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick up the first version # Give the thread time to pick up the first version
wait_for_all_checks(client) wait_for_all_checks(client)
# The service should echo back the request headers # The service should echo back the request headers
res = client.get( res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"), url_for("ui.ui_preview.preview_page", uuid=uuidA),
follow_redirects=True follow_redirects=True
) )
@@ -92,7 +93,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# add the first 'version' # add the first 'version'
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={ data={
"url": test_url, "url": test_url,
"tags": "", "tags": "",
@@ -110,7 +111,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
body_value = 'Test Body Value {{ 1+1 }}' body_value = 'Test Body Value {{ 1+1 }}'
body_value_formatted = 'Test Body Value 2' body_value_formatted = 'Test Body Value 2'
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={ data={
"url": test_url, "url": test_url,
"tags": "", "tags": "",
@@ -126,7 +127,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# The service should echo back the body # The service should echo back the body
res = client.get( res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"), url_for("ui.ui_preview.preview_page", uuid=uuid),
follow_redirects=True follow_redirects=True
) )
@@ -157,7 +158,7 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
# Attempt to add a body with a GET method # Attempt to add a body with a GET method
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={ data={
"url": test_url, "url": test_url,
"tags": "", "tags": "",
@@ -236,6 +236,7 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
} }
_run_test_minmax_limit(client, extra_watch_edit_form=extras,datastore_path=datastore_path) _run_test_minmax_limit(client, extra_watch_edit_form=extras,datastore_path=datastore_path)
delete_all_watches(client)
@@ -388,9 +389,10 @@ def test_change_with_notification_values(client, live_server, measure_memory_usa
os.unlink(os.path.join(datastore_path, "notification.txt")) os.unlink(os.path.join(datastore_path, "notification.txt"))
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
res = client.post(url_for("ui.ui_notification.ajax_callback_send_notification_test", watch_uuid=uuid), data={}, follow_redirects=True) res = client.post(url_for("ui.ui_notification.ajax_callback_send_notification_test", watch_uuid=uuid), data={}, follow_redirects=True)
time.sleep(5) wait_for_notification_endpoint_output(datastore_path=datastore_path)
assert os.path.isfile(os.path.join(datastore_path, "notification.txt")), "Notification received" assert os.path.isfile(os.path.join(datastore_path, "notification.txt")), "Notification received"
delete_all_watches(client)
def test_data_sanity(client, live_server, measure_memory_usage, datastore_path): def test_data_sanity(client, live_server, measure_memory_usage, datastore_path):
@@ -406,6 +408,7 @@ def test_data_sanity(client, live_server, measure_memory_usage, datastore_path):
follow_redirects=True follow_redirects=True
) )
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
@@ -417,6 +420,7 @@ def test_data_sanity(client, live_server, measure_memory_usage, datastore_path):
data={"url": test_url2, "tags": 'restock tests', 'processor': 'restock_diff'}, data={"url": test_url2, "tags": 'restock tests', 'processor': 'restock_diff'},
follow_redirects=True follow_redirects=True
) )
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert str(res.data.decode()).count("950.95") == 1, "Price should only show once (for the watch added, no other watches yet)" assert str(res.data.decode()).count("950.95") == 1, "Price should only show once (for the watch added, no other watches yet)"
@@ -462,3 +466,4 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
assert b'ception' not in res.data assert b'ception' not in res.data
assert b'155.55' in res.data assert b'155.55' in res.data
delete_all_watches(client)
+1 -1
View File
@@ -107,7 +107,7 @@ def test_rss_and_token(client, live_server, measure_memory_usage, datastore_path
assert b"Access denied, bad token" not in res.data assert b"Access denied, bad token" not in res.data
assert b"Random content" in res.data assert b"Random content" in res.data
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage, datastore_path): def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage, datastore_path):
@@ -23,6 +23,7 @@ def test_rss_feed_empty(client, live_server, measure_memory_usage, datastore_pat
) )
assert res.status_code == 400 assert res.status_code == 400
assert b'does not have enough history snapshots to show' in res.data assert b'does not have enough history snapshots to show' in res.data
delete_all_watches(client)
def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path): def test_rss_single_watch_order(client, live_server, measure_memory_usage, datastore_path):
""" """
+8 -5
View File
@@ -24,20 +24,20 @@ def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
# Goto the edit page, add our ignore text # Goto the edit page, add our ignore text
# Add our URL to the import page # Add our URL to the import page
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"}, data={"include_filters": include_filters, "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests", "time_between_check_use_default": "y"},
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
# Check it saved # Check it saved
res = client.get( res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
) )
assert bytes(include_filters.encode('utf-8')) in res.data assert bytes(include_filters.encode('utf-8')) in res.data
# click share the link # click share the link
res = client.get( res = client.get(
url_for("ui.form_share_put_watch", uuid="first"), url_for("ui.form_share_put_watch", uuid=uuid),
follow_redirects=True follow_redirects=True
) )
@@ -63,13 +63,16 @@ def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
# Now hit edit, we should see what we expect # Now hit edit, we should see what we expect
# that the import fetched the meta-data # that the import fetched the meta-data
uuids = list(client.application.config.get('DATASTORE').data['watching'])
assert uuids, "It saved/imported and created a new URL from the share"
# Check it saved # Check it saved
res = client.get( res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuids[0]),
) )
assert bytes(include_filters.encode('utf-8')) in res.data assert bytes(include_filters.encode('utf-8')) in res.data
# Check it saved the URL # Check it saved the URL
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert bytes(test_url.encode('utf-8')) in res.data assert bytes(test_url.encode('utf-8')) in res.data
delete_all_watches(client)
+5 -1
View File
@@ -25,6 +25,7 @@ def test_recheck_time_field_validation_global_settings(client, live_server, meas
assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data
delete_all_watches(client)
def test_recheck_time_field_validation_single_watch(client, live_server, measure_memory_usage, datastore_path): def test_recheck_time_field_validation_single_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -94,6 +95,7 @@ def test_recheck_time_field_validation_single_watch(client, live_server, measure
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
delete_all_watches(client)
def test_checkbox_open_diff_in_new_tab(client, live_server, measure_memory_usage, datastore_path): def test_checkbox_open_diff_in_new_tab(client, live_server, measure_memory_usage, datastore_path):
@@ -242,6 +244,7 @@ def test_page_title_listing_behaviour(client, live_server, measure_memory_usage,
# No page title description, and 'use_page_title_in_list' is on, it should show the <title> # No page title description, and 'use_page_title_in_list' is on, it should show the <title>
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert b"head titlecustom html" in res.data assert b"head titlecustom html" in res.data
delete_all_watches(client)
def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage, datastore_path): def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage, datastore_path):
@@ -283,4 +286,5 @@ def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage, datast
client.get(url_for("ui.mark_all_viewed"), follow_redirects=True) client.get(url_for("ui.mark_all_viewed"), follow_redirects=True)
time.sleep(0.2) time.sleep(0.2)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert b'<span id="unread-tab-counter">0</span>' in res.data assert b'<span id="unread-tab-counter">0</span>' in res.data
delete_all_watches(client)
+10 -8
View File
@@ -366,7 +366,7 @@ def test_check_with_prefix_include_filters(client, live_server, measure_memory_u
assert b"Some text thats the same" in res.data # in selector assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" not in res.data # not in selector assert b"Some text that will change" not in res.data # not in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_various_rules(client, live_server, measure_memory_usage, datastore_path): def test_various_rules(client, live_server, measure_memory_usage, datastore_path):
@@ -423,7 +423,7 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": "//*[contains(@class, 'sametext')]|//*[contains(@class, 'changetext')]", data={"include_filters": "//*[contains(@class, 'sametext')]|//*[contains(@class, 'changetext')]",
"url": test_url, "url": test_url,
"tags": "", "tags": "",
@@ -437,14 +437,14 @@ def test_xpath_20(client, live_server, measure_memory_usage, datastore_path):
wait_for_all_checks(client) wait_for_all_checks(client)
res = client.get( res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"), url_for("ui.ui_preview.preview_page", uuid=uuid),
follow_redirects=True follow_redirects=True
) )
assert b"Some text thats the same" in res.data # in selector assert b"Some text thats the same" in res.data # in selector
assert b"Some text that will change" in res.data # in selector assert b"Some text that will change" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_xpath_20_function_count(client, live_server, measure_memory_usage, datastore_path): def test_xpath_20_function_count(client, live_server, measure_memory_usage, datastore_path):
@@ -477,7 +477,7 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage, data
assert b"246913579975308642" in res.data # in selector assert b"246913579975308642" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_xpath_20_function_count2(client, live_server, measure_memory_usage, datastore_path): def test_xpath_20_function_count2(client, live_server, measure_memory_usage, datastore_path):
@@ -501,6 +501,8 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
res = client.get( res = client.get(
@@ -510,7 +512,7 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage, dat
assert b"246913579975308642" in res.data # in selector assert b"246913579975308642" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def test_xpath_20_function_string_join_matches(client, live_server, measure_memory_usage, datastore_path): def test_xpath_20_function_string_join_matches(client, live_server, measure_memory_usage, datastore_path):
@@ -544,7 +546,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
assert b"Some text thats the samespecialconjunctionSome text that will change" in res.data # in selector assert b"Some text thats the samespecialconjunctionSome text that will change" in res.data # in selector
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
def _subtest_xpath_rss(client, datastore_path, content_type='text/html'): def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
@@ -582,7 +584,7 @@ def _subtest_xpath_rss(client, datastore_path, content_type='text/html'):
assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'" assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'"
assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) delete_all_watches(client)
# Be sure all-in-the-wild types of RSS feeds work with xpath # Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server, measure_memory_usage, datastore_path): def test_rss_xpath(client, live_server, measure_memory_usage, datastore_path):
+105 -29
View File
@@ -6,6 +6,42 @@ from flask import url_for
import logging import logging
import time import time
import os import os
import threading
# Thread-safe global storage for test endpoint content
# Avoids filesystem cache issues in parallel tests
_test_endpoint_content_lock = threading.Lock()
_test_endpoint_content = {}
def write_test_file_and_sync(filepath, content, mode='w'):
"""
Write test data to file and ensure it's synced to disk.
Also stores in thread-safe global dict to bypass filesystem cache.
Critical for parallel tests where workers may read files immediately after write.
Without fsync(), data may still be in OS buffers when workers try to read,
causing race conditions where old data is seen.
Args:
filepath: Full path to file
content: Content to write (str or bytes)
mode: File mode ('w' for text, 'wb' for binary)
"""
# Convert content to bytes if needed
if isinstance(content, str):
content_bytes = content.encode('utf-8')
else:
content_bytes = content
# Store in thread-safe global dict for instant access
with _test_endpoint_content_lock:
_test_endpoint_content[os.path.basename(filepath)] = content_bytes
# Also write to file for compatibility
with open(filepath, mode) as f:
f.write(content)
f.flush() # Flush Python buffer to OS
os.fsync(f.fileno()) # Force OS to write to disk
def set_original_response(datastore_path, extra_title=''): def set_original_response(datastore_path, extra_title=''):
test_return_data = f"""<html> test_return_data = f"""<html>
@@ -20,8 +56,7 @@ def set_original_response(datastore_path, extra_title=''):
</html> </html>
""" """
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
f.write(test_return_data)
return None return None
def set_modified_response(datastore_path): def set_modified_response(datastore_path):
@@ -36,9 +71,7 @@ def set_modified_response(datastore_path):
</html> </html>
""" """
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
f.write(test_return_data)
return None return None
def set_longer_modified_response(datastore_path): def set_longer_modified_response(datastore_path):
test_return_data = """<html> test_return_data = """<html>
@@ -55,9 +88,7 @@ def set_longer_modified_response(datastore_path):
</html> </html>
""" """
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
f.write(test_return_data)
return None return None
def set_more_modified_response(datastore_path): def set_more_modified_response(datastore_path):
@@ -73,17 +104,14 @@ def set_more_modified_response(datastore_path):
</html> </html>
""" """
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
f.write(test_return_data)
return None return None
def set_empty_text_response(datastore_path): def set_empty_text_response(datastore_path):
test_return_data = """<html><body></body></html>""" test_return_data = """<html><body></body></html>"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), test_return_data)
f.write(test_return_data)
return None return None
@@ -132,21 +160,40 @@ def extract_UUID_from_client(client):
return uuid.strip() return uuid.strip()
def delete_all_watches(client=None): def delete_all_watches(client=None):
# Change tracking
client.application.config.get('DATASTORE')._dirty_watches = set() # Watch UUIDs that need saving
client.application.config.get('DATASTORE')._dirty_settings = False # Settings changed
client.application.config.get('DATASTORE')._watch_hashes = {} # UUID -> SHA256 hash for change detection
uuids = list(client.application.config.get('DATASTORE').data['watching']) uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids: for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid) client.application.config.get('DATASTORE').delete(uuid)
from changedetectionio.flask_app import update_q
# Clear the queue to prevent leakage to next test
# Use clear() method to ensure both priority_items and notification_queue are drained
if hasattr(update_q, 'clear'):
update_q.clear()
else:
# Fallback for old implementation
while not update_q.empty():
try:
update_q.get_nowait()
except:
break
time.sleep(0.2)
def wait_for_all_checks(client=None): def wait_for_all_checks(client=None):
""" """
Waits until the queue is empty and workers are idle. Waits until the queue is empty and workers are idle.
Delegates to worker_handler.wait_for_all_checks for shared logic. Delegates to worker_pool.wait_for_all_checks for shared logic.
""" """
from changedetectionio.flask_app import update_q as global_update_q from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_handler from changedetectionio import worker_pool
time.sleep(0.05) return worker_pool.wait_for_all_checks(global_update_q, timeout=150)
# Use the shared wait logic from worker_handler
return worker_handler.wait_for_all_checks(global_update_q, timeout=150)
def wait_for_watch_history(client, min_history_count=2, timeout=10): def wait_for_watch_history(client, min_history_count=2, timeout=10):
""" """
@@ -195,8 +242,11 @@ def new_live_server_setup(live_server):
@live_server.app.route('/test-endpoint') @live_server.app.route('/test-endpoint')
def test_endpoint(): def test_endpoint():
from loguru import logger # REMOVED: logger.debug() causes file locking between test process and Flask server process
logger.debug(f"/test-endpoint hit {request}") # Flask server runs in separate multiprocessing.Process and inherited loguru tries to
# write to same log files, causing request handlers to block on file locks
# from loguru import logger
# logger.debug(f"/test-endpoint hit {request}")
ctype = request.args.get('content_type') ctype = request.args.get('content_type')
status_code = request.args.get('status_code') status_code = request.args.get('status_code')
content = request.args.get('content') or None content = request.args.get('content') or None
@@ -218,15 +268,35 @@ def new_live_server_setup(live_server):
resp.headers['Content-Type'] = ctype if ctype else 'text/html' resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp return resp
# Tried using a global var here but didn't seem to work, so reading from a file instead. # Check thread-safe global dict first (instant, no cache issues)
datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore') # Fall back to file if not in dict (for tests that write directly)
with open(os.path.join(datastore_path, "endpoint-content.txt"), "rb") as f: with _test_endpoint_content_lock:
resp = make_response(f.read(), status_code) content_data = _test_endpoint_content.get("endpoint-content.txt")
if uppercase_headers:
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html' if content_data is None:
else: # Not in global dict, read from file
resp.headers['Content-Type'] = ctype if ctype else 'text/html' datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore')
return resp filepath = os.path.join(datastore_path, "endpoint-content.txt")
# REMOVED: os.sync() was blocking for many seconds during parallel tests
# With -n 6+ parallel tests, heavy I/O causes os.sync() to wait for ALL
# system writes to complete, causing "Read timed out" errors
# File writes from test code are already flushed by the time workers fetch
try:
with open(filepath, "rb") as f:
content_data = f.read()
except Exception as e:
# REMOVED: logger.error() causes file locking in multiprocess context
# Just raise the exception directly for debugging
raise
resp = make_response(content_data, status_code)
if uppercase_headers:
resp.headers['CONTENT-TYPE'] = ctype if ctype else 'text/html'
else:
resp.headers['Content-Type'] = ctype if ctype else 'text/html'
return resp
except FileNotFoundError: except FileNotFoundError:
return make_response('', status_code) return make_response('', status_code)
@@ -301,6 +371,12 @@ def new_live_server_setup(live_server):
def test_pdf_endpoint(): def test_pdf_endpoint():
datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore') datastore_path = current_app.config.get('TEST_DATASTORE_PATH', 'test-datastore')
# Force filesystem sync before reading to ensure fresh data
try:
os.sync()
except (AttributeError, PermissionError):
pass
# Tried using a global var here but didn't seem to work, so reading from a file instead. # Tried using a global var here but didn't seem to work, so reading from a file instead.
with open(os.path.join(datastore_path, "endpoint-test.pdf"), "rb") as f: with open(os.path.join(datastore_path, "endpoint-test.pdf"), "rb") as f:
resp = make_response(f.read(), 200) resp = make_response(f.read(), 200)
@@ -3,7 +3,9 @@ from .processors.exceptions import ProcessorException
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
from changedetectionio import html_tools from changedetectionio import html_tools
from changedetectionio import worker_pool
from changedetectionio.flask_app import watch_check_update from changedetectionio.flask_app import watch_check_update
from changedetectionio.queuedWatchMetaData import PrioritizedItem
import asyncio import asyncio
import importlib import importlib
@@ -46,19 +48,33 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
jobs_processed = 0 jobs_processed = 0
start_time = time.time() start_time = time.time()
logger.info(f"Starting async worker {worker_id} (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)") # Log thread name for debugging
import threading
thread_name = threading.current_thread().name
logger.info(f"Starting async worker {worker_id} on thread '{thread_name}' (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
while not app.config.exit.is_set(): while not app.config.exit.is_set():
update_handler = None update_handler = None
watch = None watch = None
try: try:
# Use sync interface via run_in_executor since each worker has its own event loop # Efficient blocking via run_in_executor (no polling overhead!)
loop = asyncio.get_event_loop() # Worker blocks in threading.Queue.get() which uses Condition.wait()
queued_item_data = await asyncio.wait_for( # Executor must be sized to match worker count (see worker_pool.py: 50 threads default)
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0 # Single timeout (no double-timeout wrapper) = no race condition
timeout=1.5 queued_item_data = await q.async_get(executor=executor, timeout=1.0)
)
# CRITICAL: Claim UUID immediately after getting from queue to prevent race condition
# in wait_for_all_checks() which checks qsize() and running_uuids separately
uuid = queued_item_data.item.get('uuid')
if not worker_pool.claim_uuid_for_processing(uuid, worker_id):
# Already being processed - re-queue and continue
logger.trace(f"Worker {worker_id} detected UUID {uuid} already processing during claim - deferring")
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_pool.queue_item_async_safe(q, deferred_item, silent=True)
continue
except asyncio.TimeoutError: except asyncio.TimeoutError:
# No jobs available - check if we should restart based on time while idle # No jobs available - check if we should restart based on time while idle
@@ -67,6 +83,17 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting") logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
return "restart" return "restart"
continue continue
except RuntimeError as e:
# Handle executor shutdown gracefully - this is expected during shutdown
if "cannot schedule new futures after shutdown" in str(e):
# Executor shut down - exit gracefully without logging in pytest
if not IN_PYTEST:
logger.debug(f"Worker {worker_id} detected executor shutdown, exiting")
break
# Other RuntimeError - log and continue
logger.error(f"Worker {worker_id} runtime error: {e}")
await asyncio.sleep(0.1)
continue
except Exception as e: except Exception as e:
# Handle expected Empty exception from queue timeout # Handle expected Empty exception from queue timeout
import queue import queue
@@ -88,26 +115,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
await asyncio.sleep(0.1) await asyncio.sleep(0.1)
continue continue
uuid = queued_item_data.item.get('uuid') # UUID already claimed above immediately after getting from queue
# to prevent race condition with wait_for_all_checks()
# RACE CONDITION FIX: Atomically claim this UUID for processing
from changedetectionio import worker_handler
from changedetectionio.queuedWatchMetaData import PrioritizedItem
# Try to claim the UUID atomically - prevents duplicate processing
if not worker_handler.claim_uuid_for_processing(uuid, worker_id):
# Already being processed by another worker
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed - deferring")
# Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
# Re-queue with lower priority so it gets checked again after current processing finishes
deferred_priority = max(1000, queued_item_data.priority * 10)
deferred_item = PrioritizedItem(priority=deferred_priority, item=queued_item_data.item)
worker_handler.queue_item_async_safe(q, deferred_item, silent=True)
logger.debug(f"Worker {worker_id} re-queued UUID {uuid} for subsequent check")
continue
fetch_start_time = round(time.time()) fetch_start_time = round(time.time())
@@ -224,6 +233,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
except FilterNotFoundInResponse as e: except FilterNotFoundInResponse as e:
if not datastore.data['watching'].get(uuid): if not datastore.data['watching'].get(uuid):
continue continue
logger.debug(f"Received FilterNotFoundInResponse exception for {uuid}")
err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary." err_text = "Warning, no filters were found, no change detection ran - Did the page change layout? update your Visual Filter if necessary."
datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text}) datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text})
@@ -243,17 +253,19 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
c += 1 c += 1
# Send notification if we reached the threshold? # Send notification if we reached the threshold?
threshold = datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) threshold = datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
logger.debug(f"Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}") logger.debug(f"FilterNotFoundInResponse - Filter for {uuid} not found, consecutive_filter_failures: {c} of threshold {threshold}")
if c >= threshold: if c >= threshold:
if not watch.get('notification_muted'): if not watch.get('notification_muted'):
logger.debug(f"Sending filter failed notification for {uuid}") logger.debug(f"FilterNotFoundInResponse - Sending filter failed notification for {uuid}")
await send_filter_failure_notification(uuid, notification_q, datastore) await send_filter_failure_notification(uuid, notification_q, datastore)
c = 0 c = 0
logger.debug(f"Reset filter failure count back to zero") logger.debug(f"FilterNotFoundInResponse - Reset filter failure count back to zero")
else:
logger.debug(f"FilterNotFoundInResponse - {c} of threshold {threshold}..")
datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
else: else:
logger.trace(f"{uuid} - filter_failure_notification_send not enabled, skipping") logger.trace(f"FilterNotFoundInResponse - {uuid} - filter_failure_notification_send not enabled, skipping")
process_changedetection_results = False process_changedetection_results = False
@@ -490,7 +502,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
logger.error(f"Exception while cleaning/quit after calling browser: {e}") logger.error(f"Exception while cleaning/quit after calling browser: {e}")
try: try:
# Release UUID from processing (thread-safe) # Release UUID from processing (thread-safe)
worker_handler.release_uuid_from_processing(uuid, worker_id=worker_id) worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id)
# Send completion signal # Send completion signal
if watch: if watch:
@@ -23,11 +23,14 @@ _uuid_processing_lock = threading.Lock() # Protects currently_processing_uuids
USE_ASYNC_WORKERS = True USE_ASYNC_WORKERS = True
# Custom ThreadPoolExecutor for queue operations with named threads # Custom ThreadPoolExecutor for queue operations with named threads
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency # Scale executor threads to match FETCH_WORKERS (no minimum, no maximum)
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10"))) # Thread naming: "QueueGetter-N" for easy debugging in thread dumps/traces
# With FETCH_WORKERS=10: 10 workers + 10 executor threads = 20 threads total
# With FETCH_WORKERS=500: 500 workers + 500 executor threads = 1000 threads total (acceptable on modern systems)
_max_executor_workers = int(os.getenv("FETCH_WORKERS", "10"))
queue_executor = ThreadPoolExecutor( queue_executor = ThreadPoolExecutor(
max_workers=_max_executor_workers, max_workers=_max_executor_workers,
thread_name_prefix="QueueGetter-" thread_name_prefix="QueueGetter-" # Shows in thread dumps as "QueueGetter-0", "QueueGetter-1", etc.
) )
@@ -82,16 +85,17 @@ class WorkerThread:
self.loop = None self.loop = None
def start(self): def start(self):
"""Start the worker thread""" """Start the worker thread with descriptive name for debugging"""
self.thread = threading.Thread( self.thread = threading.Thread(
target=self.run, target=self.run,
daemon=True, daemon=True,
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}" name=f"PageFetchAsyncUpdateWorker-{self.worker_id}" # Shows in thread dumps with worker ID
) )
self.thread.start() self.thread.start()
def stop(self): def stop(self):
"""Stop the worker thread""" """Stop the worker thread brutally - no waiting"""
# Try to stop the event loop if it exists
if self.loop and self.running: if self.loop and self.running:
try: try:
# Signal the loop to stop # Signal the loop to stop
@@ -99,8 +103,7 @@ class WorkerThread:
except RuntimeError: except RuntimeError:
pass pass
if self.thread and self.thread.is_alive(): # Don't wait - thread is daemon and will die when needed
self.thread.join(timeout=2.0)
def start_async_workers(n_workers, update_q, notification_q, app, datastore): def start_async_workers(n_workers, update_q, notification_q, app, datastore):
@@ -125,7 +128,7 @@ def start_async_workers(n_workers, update_q, notification_q, app, datastore):
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None): async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
"""Start a single async worker with auto-restart capability""" """Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker from changedetectionio.worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging # Check if we're in pytest environment - if so, be more gentle with logging
import os import os
@@ -337,24 +340,36 @@ def queue_item_async_safe(update_q, item, silent=False):
def shutdown_workers(): def shutdown_workers():
"""Shutdown all async workers fast and aggressively""" """Shutdown all async workers brutally - no delays, no waiting"""
global worker_threads global worker_threads, queue_executor
# Check if we're in pytest environment - if so, be more gentle with logging # Check if we're in pytest environment - if so, be more gentle with logging
import os import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest: if not in_pytest:
logger.info("Fast shutdown of async workers initiated...") logger.info("Brutal shutdown of async workers initiated...")
# Stop all worker threads # Stop all worker event loops
for worker in worker_threads: for worker in worker_threads:
worker.stop() worker.stop()
# Clear immediately - threads are daemon and will die
worker_threads.clear() worker_threads.clear()
# Shutdown the queue executor to prevent "cannot schedule new futures after shutdown" errors
# This must happen AFTER workers are stopped to avoid race conditions
if queue_executor:
try:
queue_executor.shutdown(wait=False)
if not in_pytest:
logger.debug("Queue executor shut down")
except Exception as e:
if not in_pytest:
logger.warning(f"Error shutting down queue executor: {e}")
if not in_pytest: if not in_pytest:
logger.info("Async workers fast shutdown complete") logger.info("Async workers brutal shutdown complete")
@@ -469,12 +484,14 @@ def wait_for_all_checks(update_q, timeout=150):
elif time.time() - empty_since >= 0.3: elif time.time() - empty_since >= 0.3:
# Add small buffer for filesystem operations to complete # Add small buffer for filesystem operations to complete
time.sleep(0.2) time.sleep(0.2)
logger.trace("wait_for_all_checks: All checks complete (queue empty, workers idle)")
return True return True
else: else:
empty_since = None empty_since = None
attempt += 1 attempt += 1
logger.warning(f"wait_for_all_checks: Timeout after {timeout} attempts")
return False # Timeout return False # Timeout
+1 -1
View File
@@ -8,7 +8,7 @@ flask-paginate
flask_expects_json~=1.7 flask_expects_json~=1.7
flask_restful flask_restful
flask_cors # For the Chrome extension to operate flask_cors # For the Chrome extension to operate
janus # Thread-safe async/sync queue bridge # janus # No longer needed - using pure threading.Queue for multi-loop support
flask_wtf~=1.2 flask_wtf~=1.2
flask~=3.1 flask~=3.1
flask-socketio~=5.6.0 flask-socketio~=5.6.0