Compare commits

...

10 Commits

Author SHA1 Message Date
dgtlmoon
9c7797d93d Increase max queue size and many CPU performance fixes
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-17 13:34:37 +01:00
dgtlmoon
9ae5423b8b Fixing queue limit size 2026-01-17 13:28:57 +01:00
dgtlmoon
8c9218863b WIP 2026-01-17 13:15:27 +01:00
dgtlmoon
f5466e16fc If they want to queue one thats already running, thats up to them. 2026-01-16 20:32:11 +01:00
dgtlmoon
b6b90f9f59 woops 2026-01-16 20:22:37 +01:00
dgtlmoon
1db234204b fix typo 2026-01-16 20:16:54 +01:00
dgtlmoon
6223104899 Bunch up some tests a little 2026-01-16 20:13:12 +01:00
dgtlmoon
1fe6bf4fcd More wait time under test 2026-01-16 20:07:09 +01:00
dgtlmoon
52ddce0876 Tests - Faster recheck/reschedule loop under pytest environment 2026-01-16 20:02:36 +01:00
dgtlmoon
0746e135b4 No need to add imported items to the check queue, the scheduler will do this #3762 2026-01-16 19:32:13 +01:00
9 changed files with 72 additions and 53 deletions

View File

@@ -438,7 +438,8 @@ class CreateWatch(Resource):
new_uuid = self.datastore.add_watch(url=url, extras=extras, tag=tags)
if new_uuid:
worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
# Dont queue because the scheduler will check that it hasnt been checked before anyway
# worker_handler.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': new_uuid}))
return {'uuid': new_uuid}, 201
else:
return "Invalid or unsupported URL", 400

View File

@@ -8,6 +8,7 @@ from changedetectionio.flask_app import watch_check_update
import asyncio
import importlib
import os
import sys
import time
from loguru import logger
@@ -15,6 +16,9 @@ from loguru import logger
# Async version of update_worker
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
DEFER_SLEEP_TIME_ALREADY_QUEUED = 0.3 if IN_PYTEST else 10.0
async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
"""
Async worker function that processes watch check jobs from the queue.
@@ -78,7 +82,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
if worker_handler.is_watch_running_by_another_worker(uuid, worker_id):
logger.trace(f"Worker {worker_id} detected UUID {uuid} already being processed by another worker - deferring")
# Sleep to avoid tight loop and give the other worker time to finish
await asyncio.sleep(10.0)
await asyncio.sleep(DEFER_SLEEP_TIME_ALREADY_QUEUED)
# Re-queue with lower priority so it gets checked again after current processing finishes
deferred_priority = max(1000, queued_item_data.priority * 10)

View File

@@ -1,13 +1,8 @@
from flask import Blueprint, request, redirect, url_for, flash, render_template
from loguru import logger
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio import worker_handler
from changedetectionio.blueprint.imports.importer import (
import_url_list,
import_distill_io_json,
import_xlsx_wachete,
import_xlsx_custom
)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
import_blueprint = Blueprint('imports', __name__, template_folder="templates")
@@ -17,15 +12,26 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
def import_page():
remaining_urls = []
from changedetectionio import forms
#
if request.method == 'POST':
# from changedetectionio import worker_handler
from changedetectionio.blueprint.imports.importer import (
import_url_list,
import_distill_io_json,
import_xlsx_wachete,
import_xlsx_custom
)
# URL List import
if request.values.get('urls') and len(request.values.get('urls').strip()):
# Import and push into the queue for immediate update check
importer_handler = import_url_list()
importer_handler.run(data=request.values.get('urls'), flash=flash, datastore=datastore, processor=request.values.get('processor', 'text_json_diff'))
for uuid in importer_handler.new_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
logger.debug(f"Imported {len(importer_handler.new_uuids)} new UUIDs")
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
if len(importer_handler.remaining_data) == 0:
return redirect(url_for('watchlist.index'))
@@ -37,8 +43,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Import and push into the queue for immediate update check
d_importer = import_distill_io_json()
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
for uuid in d_importer.new_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# XLSX importer
if request.files and request.files.get('xlsx_file'):
@@ -60,8 +68,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
w_importer.import_profile = map
w_importer.run(data=file, flash=flash, datastore=datastore)
for uuid in w_importer.new_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Dont' add to queue because scheduler can see that they haven't been checked and will add them to the queue
# for uuid in importer_handler.new_uuids:
# worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
# Could be some remaining, or we could be on GET
form = forms.importForm(formdata=request.form if request.method == 'POST' else None)

View File

@@ -227,12 +227,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
i = 0
running_uuids = worker_handler.get_running_uuids()
if uuid:
if uuid not in running_uuids:
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
i += 1
worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
i += 1
else:
# Recheck all, including muted
@@ -241,7 +238,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
watch_uuid = k[0]
watch = k[1]
if not watch['paused']:
if watch_uuid not in running_uuids:
if watch_uuid:
if with_errors and not watch.get('last_error'):
continue

View File

@@ -84,7 +84,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
datastore=datastore,
errored_count=errored_count,
extra_classes='has-queue' if len(update_q.queue) else '',
extra_classes='has-queue' if not update_q.empty() else '',
form=form,
generate_tag_colors=processors.generate_processor_badge_colors,
guid=datastore.data['app_guid'],
@@ -95,8 +95,8 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
processor_badge_css=processors.get_processor_badge_css(),
processor_badge_texts=processors.get_processor_badge_texts(),
processor_descriptions=processors.get_processor_descriptions(),
queue_size=len(update_q.queue),
queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue],
queue_size=update_q.qsize(),
queued_uuids=update_q.get_queued_uuids(),
search_q=request.args.get('q', '').strip(),
sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'),
sort_order=request.args.get('order') if request.args.get('order') else request.cookies.get('order'),

View File

@@ -27,9 +27,7 @@ from flask import (
session,
url_for,
)
from urllib.parse import urlparse
from flask_compress import Compress as FlaskCompress
from flask_login import current_user
from flask_restful import abort, Api
from flask_cors import CORS
@@ -46,6 +44,7 @@ from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, Watch
from changedetectionio.api.Search import Search
from .time_handler import is_within_schedule
from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
datastore = None
@@ -56,7 +55,7 @@ extra_stylesheets = []
# Use bulletproof janus-based queues for sync/async reliability
update_q = RecheckPriorityQueue()
notification_q = NotificationQueue()
MAX_QUEUE_SIZE = 2000
MAX_QUEUE_SIZE = 5000
app = Flask(__name__,
static_url_path="",
@@ -979,6 +978,10 @@ def ticker_thread_check_time_launch_checks():
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
# Workers are now started during app initialization, not here
WAIT_TIME_BETWEEN_LOOP = 1.0 if not IN_PYTEST else 0.01
if IN_PYTEST:
# The time between loops should be less than the first .sleep/wait in def wait_for_all_checks() of tests/util.py
logger.warning(f"Looks like we're in PYTEST! Setting time between searching for items to add to the queue to {WAIT_TIME_BETWEEN_LOOP}s")
while not app.config.exit.is_set():
@@ -1002,6 +1005,9 @@ def ticker_thread_check_time_launch_checks():
# Get a list of watches by UUID that are currently fetching data
running_uuids = worker_handler.get_running_uuids()
# Build set of queued UUIDs once for O(1) lookup instead of O(n) per watch
queued_uuids = {q_item.item['uuid'] for q_item in update_q.queue}
# Re #232 - Deepcopy the data incase it changes while we're iterating through it all
watch_uuid_list = []
while True:
@@ -1018,16 +1024,17 @@ def ticker_thread_check_time_launch_checks():
else:
break
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
while update_q.qsize() >= 2000:
logger.warning(f"Recheck watches queue size limit reached ({MAX_QUEUE_SIZE}), skipping adding more items")
app.config.exit.wait(10.0)
recheck_time_system_seconds = int(datastore.threshold_seconds)
# Check for watches outside of the time threshold to put in the thread queue.
for uuid in watch_uuid_list:
for watch_index, uuid in enumerate(watch_uuid_list):
# Re #438 - Check queue size every 100 watches for CPU efficiency (not every watch)
if watch_index % 100 == 0:
current_queue_size = update_q.qsize()
if current_queue_size >= MAX_QUEUE_SIZE:
logger.debug(f"Queue size limit reached ({current_queue_size}/{MAX_QUEUE_SIZE}), stopping scheduler this iteration.")
break
now = time.time()
watch = datastore.data['watching'].get(uuid)
if not watch:
@@ -1077,7 +1084,7 @@ def ticker_thread_check_time_launch_checks():
seconds_since_last_recheck = now - watch['last_checked']
if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
if not uuid in running_uuids and uuid not in [q_uuid.item['uuid'] for q_uuid in update_q.queue]:
if not uuid in running_uuids and uuid not in queued_uuids:
# Proxies can be set to have a limit on seconds between which they can be called
watch_proxy = datastore.get_preferred_proxy_for_watch(uuid=uuid)
@@ -1120,4 +1127,4 @@ def ticker_thread_check_time_launch_checks():
watch.jitter_seconds = 0
# Should be low so we can break this out in testing
app.config.exit.wait(1)
app.config.exit.wait(WAIT_TIME_BETWEEN_LOOP)

View File

@@ -176,7 +176,16 @@ class RecheckPriorityQueue:
def empty(self) -> bool:
"""Check if queue is empty"""
return self.qsize() == 0
def get_queued_uuids(self) -> list:
"""Get list of all queued UUIDs efficiently with single lock"""
try:
with self._lock:
return [item.item['uuid'] for item in self._priority_items if hasattr(item, 'item') and 'uuid' in item.item]
except Exception as e:
logger.critical(f"CRITICAL: Failed to get queued UUIDs: {str(e)}")
return []
def close(self):
"""Close the janus queue"""
try:

View File

@@ -150,11 +150,8 @@ def handle_watch_update(socketio, **kwargs):
# Get list of watches that are currently running
running_uuids = worker_handler.get_running_uuids()
# Get list of watches in the queue
queue_list = []
for q_item in update_q.queue:
if hasattr(q_item, 'item') and 'uuid' in q_item.item:
queue_list.append(q_item.item['uuid'])
# Get list of watches in the queue (efficient single-lock method)
queue_list = update_q.get_queued_uuids()
# Get the error texts from the watch
error_texts = watch.compile_error_texts()

View File

@@ -82,20 +82,14 @@ echo "RUNNING WITH BASE_URL SET"
# Re #65 - Ability to include a link back to the installation, in the notification.
export BASE_URL="https://really-unique-domain.io"
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notification.py
# Re-run with HIDE_REFERER set - could affect login
export HIDE_REFERER=True
pytest -vv -s --maxfail=1 tests/test_access_control.py
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notification.py tests/test_access_control.py
# Re-run a few tests that will trigger brotli based storage
export SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5
pytest -vv -s --maxfail=1 tests/test_access_control.py
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest tests/test_notification.py
pytest -vv -s --maxfail=1 tests/test_backend.py
pytest -vv -s --maxfail=1 tests/test_rss.py
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
# And again with brotli+screenshot attachment
SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5 REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --dist=load tests/test_backend.py tests/test_rss.py tests/test_unique_lines.py tests/test_notification.py tests/test_access_control.py
# Try high concurrency
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s