Compare commits

..

32 Commits

Author SHA1 Message Date
dgtlmoon
fe5beac2b2 Small fix for 3.14 setup
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-14 (push) Has been cancelled
2026-01-16 09:40:07 +01:00
dgtlmoon
5a1d44dc62 Merge branch 'master' into python-314 2026-01-16 09:23:23 +01:00
dependabot[bot]
6db1085337 Bump elementpath from 5.0.4 to 5.1.0 (#3754) 2026-01-16 09:22:10 +01:00
吾爱分享
66553e106d Update zh translations with improved, consistent Simplified Chinese UI copy. (#3752) 2026-01-16 09:21:29 +01:00
dependabot[bot]
5b01dbd9f8 Bump apprise from 1.9.5 to 1.9.6 (#3753) 2026-01-16 09:09:02 +01:00
dgtlmoon
c86f214fc3 0.52.6
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-15 22:28:58 +01:00
dgtlmoon
32149640d9 Selenium fetcher - Small fix for #3748 RGB error on transparent screenshots or similar (#3749) 2026-01-15 20:56:53 +01:00
dgtlmoon
15f16455fc UI - Show queue size above watch table in realtime
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2026-01-15 17:28:09 +01:00
dgtlmoon
15cdfac9d9 0.52.5 2026-01-15 14:07:09 +01:00
dgtlmoon
04de397916 Revert sub-process brotli saving because it could fork-bomb/use up too many system resources (#3747) 2026-01-15 13:56:08 +01:00
dgtlmoon
4643082c5b i18n: Recompile zh_Hant_TW/LC_MESSAGES/messages.mo 2026-01-15 13:21:49 +01:00
滅ü
3b2b74e62d i18n: Update zh_Hant_TW translations (#3745) 2026-01-15 13:12:25 +01:00
dependabot[bot]
68354cf53d Update jsonschema requirement from ~=4.25 to ~=4.26 (#3743) 2026-01-15 13:03:16 +01:00
dgtlmoon
3e364e0eba Translations - ZH_Hant_TW - Fixing timeago string handling #3737
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-15 12:24:53 +01:00
dgtlmoon
06ea29bfc7 Translations - Fixing zh_TW to zh_Hant_TW , adding tests #3737 (#3744) 2026-01-15 12:01:12 +01:00
dependabot[bot]
f4e178955c Bump pyppeteer-ng from 2.0.0rc10 to 2.0.0rc11 (#3742) 2026-01-15 10:31:42 +01:00
dgtlmoon
51d531d732 0.52.4
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2026-01-14 13:26:23 +01:00
dgtlmoon
e40c4ca97d Fixing Traditional Chinese locale mapping #3737 (#3738) 2026-01-14 13:26:07 +01:00
dgtlmoon
b8ede70f3a Languages - Pypi/pip package was missing translations 2026-01-14 13:09:23 +01:00
dgtlmoon
50b349b464 0.52.3
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-14 12:00:54 +01:00
dgtlmoon
67d097cca7 UI - Groups - Adding 'Recheck' button from groups overview page 2026-01-14 11:59:42 +01:00
dgtlmoon
494385a379 Minor playwright memory cleanup improvements (#3736) 2026-01-14 11:54:53 +01:00
dgtlmoon
c2ee84b753 Browser Steps UI async_loop bug, refactored startup of BrowserSteps, increased test coverage. Re #3734 (#3735) 2026-01-14 11:27:01 +01:00
dgtlmoon
c1e0296cda 0.52.2
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2026-01-13 16:36:16 +01:00
dgtlmoon
f041223c38 Page fetchers - Were not truely running independently and could have been blocking eachother, this commit speeds up page fetches where there is more than 1 worker. 2026-01-13 16:32:50 +01:00
dgtlmoon
d36738d7ef RSS - Bugfix - possible edge case of wrong feed info could be rendered (#3733) 2026-01-13 16:31:58 +01:00
dgtlmoon
e51ff34c89 UI - Language modal - flag icons should be round
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2026-01-12 18:01:42 +01:00
dgtlmoon
ba4ed9cf27 0.52.1 2026-01-12 17:52:52 +01:00
dgtlmoon
33b7f1684d Merge branch 'master' of github.com:dgtlmoon/changedetection.io 2026-01-12 17:51:17 +01:00
dgtlmoon
3d14df6a11 Development branch merge into release/master
Multi-language / Translations Support (#3696)
  - Complete internationalization system implemented
  - Support for 7 languages: Czech (cs), German (de), French (fr), Italian (it), Korean (ko), Chinese Simplified (zh), Chinese Traditional (zh_TW)
  - Language selector with localized flags and theming
  - Flash message translations
  - Multiple translation fixes and improvements across all languages
  - Language setting preserved across redirects

  Pluggable Content Fetchers (#3653)
  - New architecture for extensible content fetcher system
  - Allows custom fetcher implementations

  Image / Screenshot Comparison Processor (#3680)
  - New processor for visual change detection (disabled for this release)
  - Supporting CSS/JS infrastructure added

  UI Improvements

  Design & Layout
  - Auto-generated tag color schemes
  - Simplified login form styling
  - Removed hard-coded CSS, moved to SCSS variables
  - Tag UI cleanup and improvements
  - Automatic tab wrapper functionality
  - Menu refactoring for better organization
  - Cleanup of offset settings
  - Hide sticky tabs on narrow viewports
  - Improved responsive layout (#3702)

  User Experience
  - Modal alerts/confirmations on delete/clear operations (#3693, #3598, #3382)
  - Auto-add https:// to URLs in quickwatch form if not present
  - Better redirect handling on login (#3699)
  - 'Recheck all' now returns to correct group/tag (#3673)
  - Language set redirect keeps hash fragment
  - More friendly human-readable text throughout UI

  Performance & Reliability

  Scheduler & Processing
  - Soft delays instead of blocking time.sleep() calls (#3710)
  - More resilient handling of same UUID being processed (#3700)
  - Better Puppeteer timeout handling
  - Improved Puppeteer shutdown/cleanup (#3692)
  - Requests cleanup now properly async

  History & Rendering
  - Faster server-side "difference" rendering on History page (#3442)
  - Show ignored/triggered rows in history
  - API: Retry watch data if watch dict changed (more reliable)

  API Improvements

  - Watch get endpoint: retry mechanism for changed watch data
  - WatchHistoryDiff API endpoint includes extra format args (#3703)

  Testing Improvements

  - Replace time.sleep with wait_for_notification_endpoint_output (#3716)
  - Test for mode switching (#3701)
  - Test for #3720 added (#3725)
  - Extract-text difference test fixes
  - Improved dev workflow

  Bug Fixes

  - Notification error text output (#3672, #3669, #3280)
  - HTML validation fixes (#3704)
  - Template discovery path fixes
  - Notification debug log now uses system locale for dates/times
  - Puppeteer spelling mistake in log output
  - Recalculation on anchor change
  - Queue bubble update disabled temporarily

  Dependency Updates

  - beautifulsoup4 updated (#3724)
  - psutil 7.1.0 → 7.2.1 (#3723)
  - python-engineio ~=4.12.3 → ~=4.13.0 (#3707)
  - python-socketio ~=5.14.3 → ~=5.16.0 (#3706)
  - flask-socketio ~=5.5.1 → ~=5.6.0 (#3691)
  - brotli ~=1.1 → ~=1.2 (#3687)
  - lxml updated (#3590)
  - pytest ~=7.2 → ~=9.0 (#3676)
  - jsonschema ~=4.0 → ~=4.25 (#3618)
  - pluggy ~=1.5 → ~=1.6 (#3616)
  - cryptography 44.0.1 → 46.0.3 (security) (#3589)

  Documentation

  - README updated with viewport size setup information

  Development Infrastructure

  - Dev container only built on dev branch
  - Improved dev workflow tooling
2026-01-12 17:50:53 +01:00
dgtlmoon
08ce1e28ce Adding test for #3720 2026-01-12 11:40:31 +01:00
dgtlmoon
9421f7e279 Python 3.14 test #3662 2025-11-30 18:13:24 +01:00
46 changed files with 1850 additions and 2407 deletions

View File

@@ -52,4 +52,13 @@ jobs:
uses: ./.github/workflows/test-stack-reusable-workflow.yml uses: ./.github/workflows/test-stack-reusable-workflow.yml
with: with:
python-version: '3.13' python-version: '3.13'
skip-pypuppeteer: true skip-pypuppeteer: true
test-application-3-14:
#if: github.event_name == 'push' && github.ref == 'refs/heads/master'
needs: lint-code
uses: ./.github/workflows/test-stack-reusable-workflow.yml
with:
python-version: '3.14'
skip-pypuppeteer: false

View File

@@ -11,6 +11,7 @@ recursive-include changedetectionio/realtime *
recursive-include changedetectionio/static * recursive-include changedetectionio/static *
recursive-include changedetectionio/templates * recursive-include changedetectionio/templates *
recursive-include changedetectionio/tests * recursive-include changedetectionio/tests *
recursive-include changedetectionio/translations *
recursive-include changedetectionio/widgets * recursive-include changedetectionio/widgets *
prune changedetectionio/static/package-lock.json prune changedetectionio/static/package-lock.json
prune changedetectionio/static/styles/node_modules prune changedetectionio/static/styles/node_modules

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki # Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1. # Semver means never use .01, or 00. Should be .1.
__version__ = '0.51.4' __version__ = '0.52.6'
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError
@@ -41,9 +41,10 @@ from loguru import logger
# #
# IMPLEMENTATION: # IMPLEMENTATION:
# 1. Explicit contexts everywhere (primary protection): # 1. Explicit contexts everywhere (primary protection):
# - Watch.py: ctx = multiprocessing.get_context('spawn')
# - playwright.py: ctx = multiprocessing.get_context('spawn') # - playwright.py: ctx = multiprocessing.get_context('spawn')
# - puppeteer.py: ctx = multiprocessing.get_context('spawn') # - puppeteer.py: ctx = multiprocessing.get_context('spawn')
# - isolated_opencv.py: ctx = multiprocessing.get_context('spawn')
# - isolated_libvips.py: ctx = multiprocessing.get_context('spawn')
# #
# 2. Global default (defense-in-depth, below): # 2. Global default (defense-in-depth, below):
# - Safety net if future code forgets explicit context # - Safety net if future code forgets explicit context

View File

@@ -1,5 +1,4 @@
from blinker import signal from blinker import signal
from .processors.exceptions import ProcessorException from .processors.exceptions import ProcessorException
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
@@ -9,7 +8,6 @@ from changedetectionio.flask_app import watch_check_update
import asyncio import asyncio
import importlib import importlib
import os import os
import queue
import time import time
from loguru import logger from loguru import logger
@@ -17,36 +15,48 @@ from loguru import logger
# Async version of update_worker # Async version of update_worker
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue # Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
async def async_update_worker(worker_id, q, notification_q, app, datastore): async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
""" """
Async worker function that processes watch check jobs from the queue. Async worker function that processes watch check jobs from the queue.
Args: Args:
worker_id: Unique identifier for this worker worker_id: Unique identifier for this worker
q: AsyncSignalPriorityQueue containing jobs to process q: AsyncSignalPriorityQueue containing jobs to process
notification_q: Standard queue for notifications notification_q: Standard queue for notifications
app: Flask application instance app: Flask application instance
datastore: Application datastore datastore: Application datastore
executor: ThreadPoolExecutor for queue operations (optional)
""" """
# Set a descriptive name for this task # Set a descriptive name for this task
task = asyncio.current_task() task = asyncio.current_task()
if task: if task:
task.set_name(f"async-worker-{worker_id}") task.set_name(f"async-worker-{worker_id}")
logger.info(f"Starting async worker {worker_id}") logger.info(f"Starting async worker {worker_id}")
while not app.config.exit.is_set(): while not app.config.exit.is_set():
update_handler = None update_handler = None
watch = None watch = None
try: try:
# Use native janus async interface - no threads needed! # Use sync interface via run_in_executor since each worker has its own event loop
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0) loop = asyncio.get_event_loop()
queued_item_data = await asyncio.wait_for(
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
timeout=1.5
)
except asyncio.TimeoutError: except asyncio.TimeoutError:
# No jobs available, continue loop # No jobs available, continue loop
continue continue
except Exception as e: except Exception as e:
# Handle expected Empty exception from queue timeout
import queue
if isinstance(e, queue.Empty):
# Queue is empty, normal behavior - just continue
continue
# Unexpected exception - log as critical
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}") logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
# Log queue health for debugging # Log queue health for debugging
@@ -414,14 +424,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"}) datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
finally: finally:
try:
await update_handler.fetcher.quit(watch=watch)
except Exception as e:
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
# Always cleanup - this runs whether there was an exception or not # Always cleanup - this runs whether there was an exception or not
if uuid: if uuid:
try:
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
await update_handler.fetcher.quit(watch=watch)
except Exception as e:
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
try: try:
# Mark UUID as no longer being processed by this worker # Mark UUID as no longer being processed by this worker
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False) worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
@@ -460,7 +469,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s") logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
except Exception as cleanup_error: except Exception as cleanup_error:
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}") logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
del(uuid)
# Brief pause before continuing to avoid tight error loops (only on error) # Brief pause before continuing to avoid tight error loops (only on error)
if 'e' in locals(): if 'e' in locals():
await asyncio.sleep(1.0) await asyncio.sleep(1.0)

View File

@@ -92,7 +92,12 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Be sure we're written fresh # Be sure we're written fresh
datastore.sync_to_json() datastore.sync_to_json()
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching"))) zip_thread = threading.Thread(
target=create_backup,
args=(datastore.datastore_path, datastore.data.get("watching")),
daemon=True,
name="BackupCreator"
)
zip_thread.start() zip_thread.start()
backup_threads.append(zip_thread) backup_threads.append(zip_thread)
flash(gettext("Backup building in background, check back in a few minutes.")) flash(gettext("Backup building in background, check back in a few minutes."))

View File

@@ -21,31 +21,154 @@ from changedetectionio.flask_app import login_optionally_required
from loguru import logger from loguru import logger
browsersteps_sessions = {} browsersteps_sessions = {}
browsersteps_watch_to_session = {} # Maps watch_uuid -> browsersteps_session_id
io_interface_context = None io_interface_context = None
import json import json
import hashlib import hashlib
from flask import Response from flask import Response
import asyncio import asyncio
import threading import threading
import time
def run_async_in_browser_loop(coro): # Dedicated event loop for ALL browser steps sessions
"""Run async coroutine using the existing async worker event loop""" _browser_steps_loop = None
from changedetectionio import worker_handler _browser_steps_thread = None
_browser_steps_loop_lock = threading.Lock()
# Use the existing async worker event loop instead of creating a new one
if worker_handler.USE_ASYNC_WORKERS and worker_handler.async_loop and not worker_handler.async_loop.is_closed(): def _start_browser_steps_loop():
logger.debug("Browser steps using existing async worker event loop") """Start a dedicated event loop for browser steps in its own thread"""
future = asyncio.run_coroutine_threadsafe(coro, worker_handler.async_loop) global _browser_steps_loop
return future.result()
else: # Create and set the event loop for this thread
# Fallback: create a new event loop (for sync workers or if async loop not available) loop = asyncio.new_event_loop()
logger.debug("Browser steps creating temporary event loop") asyncio.set_event_loop(loop)
loop = asyncio.new_event_loop() _browser_steps_loop = loop
asyncio.set_event_loop(loop)
logger.debug("Browser steps event loop started")
try:
# Run the loop forever - handles all browsersteps sessions
loop.run_forever()
except Exception as e:
logger.error(f"Browser steps event loop error: {e}")
finally:
try: try:
return loop.run_until_complete(coro) # Cancel all remaining tasks
pending = asyncio.all_tasks(loop)
for task in pending:
task.cancel()
# Wait for tasks to finish cancellation
if pending:
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
except Exception as e:
logger.debug(f"Error during browser steps loop cleanup: {e}")
finally: finally:
loop.close() loop.close()
logger.debug("Browser steps event loop closed")
def _ensure_browser_steps_loop():
"""Ensure the browser steps event loop is running"""
global _browser_steps_loop, _browser_steps_thread
with _browser_steps_loop_lock:
if _browser_steps_thread is None or not _browser_steps_thread.is_alive():
logger.debug("Starting browser steps event loop thread")
_browser_steps_thread = threading.Thread(
target=_start_browser_steps_loop,
daemon=True,
name="BrowserStepsEventLoop"
)
_browser_steps_thread.start()
# Wait for the loop to be ready
timeout = 5.0
start_time = time.time()
while _browser_steps_loop is None:
if time.time() - start_time > timeout:
raise RuntimeError("Browser steps event loop failed to start")
time.sleep(0.01)
logger.debug("Browser steps event loop thread started and ready")
def run_async_in_browser_loop(coro):
"""Run async coroutine using the dedicated browser steps event loop"""
_ensure_browser_steps_loop()
if _browser_steps_loop and not _browser_steps_loop.is_closed():
logger.debug("Browser steps using dedicated event loop")
future = asyncio.run_coroutine_threadsafe(coro, _browser_steps_loop)
return future.result()
else:
raise RuntimeError("Browser steps event loop is not available")
def cleanup_expired_sessions():
"""Remove expired browsersteps sessions and cleanup their resources"""
global browsersteps_sessions, browsersteps_watch_to_session
expired_session_ids = []
# Find expired sessions
for session_id, session_data in browsersteps_sessions.items():
browserstepper = session_data.get('browserstepper')
if browserstepper and browserstepper.has_expired:
expired_session_ids.append(session_id)
# Cleanup expired sessions
for session_id in expired_session_ids:
logger.debug(f"Cleaning up expired browsersteps session {session_id}")
session_data = browsersteps_sessions[session_id]
# Cleanup playwright resources asynchronously
browserstepper = session_data.get('browserstepper')
if browserstepper:
try:
run_async_in_browser_loop(browserstepper.cleanup())
except Exception as e:
logger.error(f"Error cleaning up session {session_id}: {e}")
# Remove from sessions dict
del browsersteps_sessions[session_id]
# Remove from watch mapping
for watch_uuid, mapped_session_id in list(browsersteps_watch_to_session.items()):
if mapped_session_id == session_id:
del browsersteps_watch_to_session[watch_uuid]
break
if expired_session_ids:
logger.info(f"Cleaned up {len(expired_session_ids)} expired browsersteps session(s)")
def cleanup_session_for_watch(watch_uuid):
"""Cleanup a specific browsersteps session for a watch UUID"""
global browsersteps_sessions, browsersteps_watch_to_session
session_id = browsersteps_watch_to_session.get(watch_uuid)
if not session_id:
logger.debug(f"No browsersteps session found for watch {watch_uuid}")
return
logger.debug(f"Cleaning up browsersteps session {session_id} for watch {watch_uuid}")
session_data = browsersteps_sessions.get(session_id)
if session_data:
browserstepper = session_data.get('browserstepper')
if browserstepper:
try:
run_async_in_browser_loop(browserstepper.cleanup())
except Exception as e:
logger.error(f"Error cleaning up session {session_id} for watch {watch_uuid}: {e}")
# Remove from sessions dict
del browsersteps_sessions[session_id]
# Remove from watch mapping
del browsersteps_watch_to_session[watch_uuid]
logger.debug(f"Cleaned up session for watch {watch_uuid}")
# Opportunistically cleanup any other expired sessions
cleanup_expired_sessions()
def construct_blueprint(datastore: ChangeDetectionStore): def construct_blueprint(datastore: ChangeDetectionStore):
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates") browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
@@ -123,6 +246,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
if not watch_uuid: if not watch_uuid:
return make_response('No Watch UUID specified', 500) return make_response('No Watch UUID specified', 500)
# Cleanup any existing session for this watch
cleanup_session_for_watch(watch_uuid)
logger.debug("Starting connection with playwright") logger.debug("Starting connection with playwright")
logger.debug("browser_steps.py connecting") logger.debug("browser_steps.py connecting")
@@ -131,6 +257,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop( browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
start_browsersteps_session(watch_uuid) start_browsersteps_session(watch_uuid)
) )
# Store the mapping of watch_uuid -> browsersteps_session_id
browsersteps_watch_to_session[watch_uuid] = browsersteps_session_id
except Exception as e: except Exception as e:
if 'ECONNREFUSED' in str(e): if 'ECONNREFUSED' in str(e):
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401) return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)

View File

@@ -47,9 +47,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
if len(dates) < 2: if len(dates) < 2:
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400 return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
# Add uuid to watch for proper functioning
watch['uuid'] = uuid
# Get the number of diffs to include (default: 5) # Get the number of diffs to include (default: 5)
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5) rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
@@ -101,7 +98,7 @@ def construct_single_watch_routes(rss_blueprint, datastore):
date_index_from, date_index_to) date_index_from, date_index_to)
# Create and populate feed entry # Create and populate feed entry
guid = f"{watch['uuid']}/{timestamp_to}" guid = f"{uuid}/{timestamp_to}"
fe = fg.add_entry() fe = fg.add_entry()
title_suffix = f"Change @ {res['original_context']['change_datetime']}" title_suffix = f"Change @ {res['original_context']['change_datetime']}"
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to, populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,

View File

@@ -63,11 +63,8 @@ def construct_tag_routes(rss_blueprint, datastore):
# Only include unviewed watches # Only include unviewed watches
if not watch.viewed: if not watch.viewed:
# Add uuid to watch for proper functioning # Include a link to the diff page (use uuid from loop, don't modify watch dict)
watch['uuid'] = uuid diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=uuid, _external=True)}
# Include a link to the diff page
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=watch['uuid'], _external=True)}
# Get watch label # Get watch label
watch_label = get_watch_label(datastore, watch) watch_label = get_watch_label(datastore, watch)

View File

@@ -50,7 +50,8 @@
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td> <td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td> <td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
<td> <td>
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>&nbsp; <a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
<a href="{{ url_for('ui.form_watch_checknow', tag=uuid) }}" class="pure-button pure-button-primary" >{{ _('Recheck') }}</a>
<a class="pure-button button-error" <a class="pure-button button-error"
href="{{ url_for('tags.delete', uuid=uuid) }}" href="{{ url_for('tags.delete', uuid=uuid) }}"
data-requires-confirm data-requires-confirm

View File

@@ -238,6 +238,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid]) datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch.")) flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
# Cleanup any browsersteps session for this watch
try:
from changedetectionio.blueprint.browser_steps import cleanup_session_for_watch
cleanup_session_for_watch(uuid)
except Exception as e:
logger.debug(f"Error cleaning up browsersteps session: {e}")
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds # Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
# But in the case something is added we should save straight away # But in the case something is added we should save straight away
datastore.needs_write_urgent = True datastore.needs_write_urgent = True
@@ -325,8 +332,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token) 'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
}, },
'settings_application': datastore.data['settings']['application'], 'settings_application': datastore.data['settings']['application'],
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch), 'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid), 'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'), 'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),

View File

@@ -206,7 +206,7 @@ Math: {{ 1 + 1 }}") }}
<div class="tab-pane-inner" id="browser-steps"> <div class="tab-pane-inner" id="browser-steps">
{% if capabilities.supports_browser_steps %} {% if capabilities.supports_browser_steps %}
{% if visual_selector_data_ready %} {% if true %}
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality"> <img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
<fieldset> <fieldset>
<div class="pure-control-group"> <div class="pure-control-group">

View File

@@ -2,7 +2,6 @@ import os
import time import time
from flask import Blueprint, request, make_response, render_template, redirect, url_for, flash, session from flask import Blueprint, request, make_response, render_template, redirect, url_for, flash, session
from flask_login import current_user
from flask_paginate import Pagination, get_page_parameter from flask_paginate import Pagination, get_page_parameter
from changedetectionio import forms from changedetectionio import forms
@@ -85,6 +84,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'), app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
datastore=datastore, datastore=datastore,
errored_count=errored_count, errored_count=errored_count,
extra_classes='has-queue' if len(update_q.queue) else '',
form=form, form=form,
generate_tag_colors=processors.generate_processor_badge_colors, generate_tag_colors=processors.generate_processor_badge_colors,
guid=datastore.data['app_guid'], guid=datastore.data['app_guid'],
@@ -92,9 +92,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
hosted_sticky=os.getenv("SALTED_PASS", False) == False, hosted_sticky=os.getenv("SALTED_PASS", False) == False,
now_time_server=round(time.time()), now_time_server=round(time.time()),
pagination=pagination, pagination=pagination,
processor_badge_css=processors.get_processor_badge_css(),
processor_badge_texts=processors.get_processor_badge_texts(), processor_badge_texts=processors.get_processor_badge_texts(),
processor_descriptions=processors.get_processor_descriptions(), processor_descriptions=processors.get_processor_descriptions(),
processor_badge_css=processors.get_processor_badge_css(), queue_size=len(update_q.queue),
queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue], queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue],
search_q=request.args.get('q', '').strip(), search_q=request.args.get('q', '').strip(),
sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'), sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'),

View File

@@ -99,9 +99,14 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
data-confirm-message="{{ _('<p>Are you sure you want to delete the selected watches?</strong></p><p>This action cannot be undone.</p>') }}" data-confirm-message="{{ _('<p>Are you sure you want to delete the selected watches?</strong></p><p>This action cannot be undone.</p>') }}"
data-confirm-button="{{ _('Delete') }}"><i data-feather="trash" style="width: 14px; height: 14px; stroke: white; margin-right: 4px;"></i>{{ _('Delete') }}</button> data-confirm-button="{{ _('Delete') }}"><i data-feather="trash" style="width: 14px; height: 14px; stroke: white; margin-right: 4px;"></i>{{ _('Delete') }}</button>
</div> </div>
{%- if watches|length >= pagination.per_page -%}
{{ pagination.info }} <div id="stats_row">
{%- endif -%} <div class="left">{%- if watches|length >= pagination.per_page -%}{{ pagination.info }}{%- endif -%}</div>
<div class="right" >{{ _('Queued size') }}: <span id="queue-size-int">{{ queue_size }}</span></div>
</div>
{%- if search_q -%}<div id="search-result-info">{{ _('Searching') }} "<strong><i>{{search_q}}</i></strong>"</div>{%- endif -%} {%- if search_q -%}<div id="search-result-info">{{ _('Searching') }} "<strong><i>{{search_q}}</i></strong>"</div>{%- endif -%}
<div> <div>
<a href="{{url_for('watchlist.index')}}" class="pure-button button-tag {{'active' if not active_tag_uuid }}">{{ _('All') }}</a> <a href="{{url_for('watchlist.index')}}" class="pure-button button-tag {{'active' if not active_tag_uuid }}">{{ _('All') }}</a>

View File

@@ -1,3 +1,4 @@
import gc
import json import json
import os import os
from urllib.parse import urlparse from urllib.parse import urlparse
@@ -185,20 +186,33 @@ class fetcher(Fetcher):
super().screenshot_step(step_n=step_n) super().screenshot_step(step_n=step_n)
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format) screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
# Request GC immediately after screenshot to free memory
# Screenshots can be large and browser steps take many of them
await self.page.request_gc()
if self.browser_steps_screenshot_path is not None: if self.browser_steps_screenshot_path is not None:
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n)) destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
logger.debug(f"Saving step screenshot to {destination}") logger.debug(f"Saving step screenshot to {destination}")
with open(destination, 'wb') as f: with open(destination, 'wb') as f:
f.write(screenshot) f.write(screenshot)
# Clear local reference to allow screenshot bytes to be collected
del screenshot
gc.collect()
async def save_step_html(self, step_n): async def save_step_html(self, step_n):
super().save_step_html(step_n=step_n) super().save_step_html(step_n=step_n)
content = await self.page.content() content = await self.page.content()
# Request GC after getting page content
await self.page.request_gc()
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n)) destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
logger.debug(f"Saving step HTML to {destination}") logger.debug(f"Saving step HTML to {destination}")
with open(destination, 'w', encoding='utf-8') as f: with open(destination, 'w', encoding='utf-8') as f:
f.write(content) f.write(content)
# Clear local reference
del content
gc.collect()
async def run(self, async def run(self,
fetch_favicon=True, fetch_favicon=True,
@@ -305,6 +319,12 @@ class fetcher(Fetcher):
if self.status_code != 200 and not ignore_status_codes: if self.status_code != 200 and not ignore_status_codes:
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format) screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
# Cleanup before raising to prevent memory leak
await self.page.close()
await context.close()
await browser.close()
# Force garbage collection to release Playwright resources immediately
gc.collect()
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot) raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0: if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
@@ -313,48 +333,52 @@ class fetcher(Fetcher):
await browser.close() await browser.close()
raise EmptyReply(url=url, status_code=response.status) raise EmptyReply(url=url, status_code=response.status)
# Run Browser Steps here # Wrap remaining operations in try/finally to ensure cleanup
if self.browser_steps_get_valid_steps():
await self.iterate_browser_steps(start_url=url)
await self.page.wait_for_timeout(extra_wait * 1000)
now = time.time()
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
if current_include_filters is not None:
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
else:
await self.page.evaluate("var include_filters=''")
await self.page.request_gc()
# request_gc before and after evaluate to free up memory
# @todo browsersteps etc
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
"visualselector_xpath_selectors": visualselector_xpath_selectors,
"max_height": MAX_TOTAL_HEIGHT
})
await self.page.request_gc()
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
await self.page.request_gc()
self.content = await self.page.content()
await self.page.request_gc()
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
# Bug 3 in Playwright screenshot handling
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
# JPEG is better here because the screenshots can be very very large
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
# acceptable screenshot quality here
try: try:
# Run Browser Steps here
if self.browser_steps_get_valid_steps():
await self.iterate_browser_steps(start_url=url)
await self.page.wait_for_timeout(extra_wait * 1000)
now = time.time()
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
if current_include_filters is not None:
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
else:
await self.page.evaluate("var include_filters=''")
await self.page.request_gc()
# request_gc before and after evaluate to free up memory
# @todo browsersteps etc
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
"visualselector_xpath_selectors": visualselector_xpath_selectors,
"max_height": MAX_TOTAL_HEIGHT
})
await self.page.request_gc()
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
await self.page.request_gc()
self.content = await self.page.content()
await self.page.request_gc()
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
# Bug 3 in Playwright screenshot handling
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
# JPEG is better here because the screenshots can be very very large
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
# acceptable screenshot quality here
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage # The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format) self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
except ScreenshotUnavailable:
# Re-raise screenshot unavailable exceptions
raise
except Exception as e: except Exception as e:
# It's likely the screenshot was too long/big and something crashed # It's likely the screenshot was too long/big and something crashed
raise ScreenshotUnavailable(url=url, status_code=self.status_code) raise ScreenshotUnavailable(url=url, status_code=self.status_code)
@@ -389,6 +413,10 @@ class fetcher(Fetcher):
pass pass
browser = None browser = None
# Force Python GC to release Playwright resources immediately
# Playwright objects can have circular references that delay cleanup
gc.collect()
# Plugin registration for built-in fetcher # Plugin registration for built-in fetcher
class PlaywrightFetcherPlugin: class PlaywrightFetcherPlugin:

View File

@@ -15,7 +15,7 @@ class fetcher(Fetcher):
proxy_url = None proxy_url = None
# Capability flags # Capability flags
supports_browser_steps = True supports_browser_steps = False
supports_screenshots = True supports_screenshots = True
supports_xpath_element_data = True supports_xpath_element_data = True
@@ -156,6 +156,9 @@ class fetcher(Fetcher):
from PIL import Image from PIL import Image
import io import io
img = Image.open(io.BytesIO(screenshot_png)) img = Image.open(io.BytesIO(screenshot_png))
# Convert to RGB if needed (JPEG doesn't support transparency)
if img.mode != 'RGB':
img = img.convert('RGB')
jpeg_buffer = io.BytesIO() jpeg_buffer = io.BytesIO()
img.save(jpeg_buffer, format='JPEG', quality=int(os.getenv("SCREENSHOT_QUALITY", 72))) img.save(jpeg_buffer, format='JPEG', quality=int(os.getenv("SCREENSHOT_QUALITY", 72)))
self.screenshot = jpeg_buffer.getvalue() self.screenshot = jpeg_buffer.getvalue()

View File

@@ -57,14 +57,15 @@ class SignalPriorityQueue(queue.PriorityQueue):
def put(self, item, block=True, timeout=None): def put(self, item, block=True, timeout=None):
# Call the parent's put method first # Call the parent's put method first
super().put(item, block, timeout) super().put(item, block, timeout)
# After putting the item in the queue, check if it has a UUID and emit signal # After putting the item in the queue, check if it has a UUID and emit signal
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item: if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
uuid = item.item['uuid'] uuid = item.item['uuid']
# Get the signal and send it if it exists # Get the signal and send it if it exists
watch_check_update = signal('watch_check_update') watch_check_update = signal('watch_check_update')
if watch_check_update: if watch_check_update:
# Send the watch_uuid parameter # NOTE: This would block other workers from .put/.get while this signal sends
# Signal handlers may iterate the queue/datastore while holding locks
watch_check_update.send(watch_uuid=uuid) watch_check_update.send(watch_uuid=uuid)
# Send queue_length signal with current queue size # Send queue_length signal with current queue size
@@ -312,14 +313,15 @@ class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
async def put(self, item): async def put(self, item):
# Call the parent's put method first # Call the parent's put method first
await super().put(item) await super().put(item)
# After putting the item in the queue, check if it has a UUID and emit signal # After putting the item in the queue, check if it has a UUID and emit signal
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item: if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
uuid = item.item['uuid'] uuid = item.item['uuid']
# Get the signal and send it if it exists # Get the signal and send it if it exists
watch_check_update = signal('watch_check_update') watch_check_update = signal('watch_check_update')
if watch_check_update: if watch_check_update:
# Send the watch_uuid parameter # NOTE: This would block other workers from .put/.get while this signal sends
# Signal handlers may iterate the queue/datastore while holding locks
watch_check_update.send(watch_uuid=uuid) watch_check_update.send(watch_uuid=uuid)
# Send queue_length signal with current queue size # Send queue_length signal with current queue size

View File

@@ -9,6 +9,7 @@ import threading
import time import time
import timeago import timeago
from blinker import signal from blinker import signal
from pathlib import Path
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from threading import Event from threading import Event
@@ -84,6 +85,10 @@ app.config['NEW_VERSION_AVAILABLE'] = False
if os.getenv('FLASK_SERVER_NAME'): if os.getenv('FLASK_SERVER_NAME'):
app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME') app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME')
# Babel/i18n configuration
app.config['BABEL_TRANSLATION_DIRECTORIES'] = str(Path(__file__).parent / 'translations')
app.config['BABEL_DEFAULT_LOCALE'] = 'en_GB'
#app.config["EXPLAIN_TEMPLATE_LOADING"] = True #app.config["EXPLAIN_TEMPLATE_LOADING"] = True
@@ -395,13 +400,9 @@ def changedetection_app(config=None, datastore_o=None):
def get_locale(): def get_locale():
# 1. Try to get locale from session (user explicitly selected) # 1. Try to get locale from session (user explicitly selected)
if 'locale' in session: if 'locale' in session:
locale = session['locale'] return session['locale']
logger.trace(f"DEBUG: get_locale() returning from session: {locale}")
return locale
# 2. Fall back to Accept-Language header # 2. Fall back to Accept-Language header
locale = request.accept_languages.best_match(language_codes) return request.accept_languages.best_match(language_codes)
logger.trace(f"DEBUG: get_locale() returning from Accept-Language: {locale}")
return locale
# Initialize Babel with locale selector # Initialize Babel with locale selector
babel = Babel(app, locale_selector=get_locale) babel = Babel(app, locale_selector=get_locale)
@@ -518,9 +519,20 @@ def changedetection_app(config=None, datastore_o=None):
@app.route('/set-language/<locale>') @app.route('/set-language/<locale>')
def set_language(locale): def set_language(locale):
"""Set the user's preferred language in the session""" """Set the user's preferred language in the session"""
if not request.cookies:
logger.error("Cannot set language without session cookie")
flash("Cannot set language without session cookie", 'error')
return redirect(url_for('watchlist.index'))
# Validate the locale against available languages # Validate the locale against available languages
if locale in language_codes: if locale in language_codes:
session['locale'] = locale session['locale'] = locale
# CRITICAL: Flask-Babel caches the locale in the request context (ctx.babel_locale)
# We must refresh to clear this cache so the new locale takes effect immediately
# This is especially important for tests where multiple requests happen rapidly
from flask_babel import refresh
refresh()
else: else:
logger.error(f"Invalid locale {locale}, available: {language_codes}") logger.error(f"Invalid locale {locale}, available: {language_codes}")
@@ -863,13 +875,13 @@ def changedetection_app(config=None, datastore_o=None):
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
threading.Thread(target=notification_runner).start() threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest # Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest: if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
threading.Thread(target=check_for_new_version).start() threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
# Return the Flask app - the Socket.IO will be attached to it but initialized separately # Return the Flask app - the Socket.IO will be attached to it but initialized separately
# This avoids circular dependencies # This avoids circular dependencies

View File

@@ -29,6 +29,9 @@ def get_timeago_locale(flask_locale):
""" """
locale_map = { locale_map = {
'zh': 'zh_CN', # Chinese Simplified 'zh': 'zh_CN', # Chinese Simplified
# timeago library just hasn't been updated to use the more modern locale naming convention, before BCP 47 / RFC 5646.
'zh_TW': 'zh_TW', # Chinese Traditional (timeago uses zh_TW)
'zh_Hant_TW': 'zh_TW', # Flask-Babel normalizes zh_TW to zh_Hant_TW, map back to timeago's zh_TW
'pt': 'pt_PT', # Portuguese (Portugal) 'pt': 'pt_PT', # Portuguese (Portugal)
'sv': 'sv_SE', # Swedish 'sv': 'sv_SE', # Swedish
'no': 'nb_NO', # Norwegian Bokmål 'no': 'nb_NO', # Norwegian Bokmål
@@ -53,7 +56,7 @@ LANGUAGE_DATA = {
'it': {'flag': 'fi fi-it fis', 'name': 'Italiano'}, 'it': {'flag': 'fi fi-it fis', 'name': 'Italiano'},
'ja': {'flag': 'fi fi-jp fis', 'name': '日本語'}, 'ja': {'flag': 'fi fi-jp fis', 'name': '日本語'},
'zh': {'flag': 'fi fi-cn fis', 'name': '中文 (简体)'}, 'zh': {'flag': 'fi fi-cn fis', 'name': '中文 (简体)'},
'zh_TW': {'flag': 'fi fi-tw fis', 'name': '繁體中文'}, 'zh_Hant_TW': {'flag': 'fi fi-tw fis', 'name': '繁體中文'},
'ru': {'flag': 'fi fi-ru fis', 'name': 'Русский'}, 'ru': {'flag': 'fi fi-ru fis', 'name': 'Русский'},
'pl': {'flag': 'fi fi-pl fis', 'name': 'Polski'}, 'pl': {'flag': 'fi fi-pl fis', 'name': 'Polski'},
'nl': {'flag': 'fi fi-nl fis', 'name': 'Nederlands'}, 'nl': {'flag': 'fi fi-nl fis', 'name': 'Nederlands'},

View File

@@ -10,54 +10,23 @@ from pathlib import Path
from loguru import logger from loguru import logger
from .. import jinja2_custom as safe_jinja from .. import jinja2_custom as safe_jinja
from ..diff import ADDED_PLACEMARKER_OPEN
from ..html_tools import TRANSLATE_WHITESPACE_TABLE from ..html_tools import TRANSLATE_WHITESPACE_TABLE
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
def _brotli_compress_worker(conn, filepath, mode=None): minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
""" """
Worker function to compress data with brotli in a separate process. Save compressed data using native brotli.
This isolates memory - when process exits, OS reclaims all memory. Testing shows no memory leak when using gc.collect() after compression.
Args:
conn: multiprocessing.Pipe connection to receive data
filepath: destination file path
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
"""
import brotli
try:
# Receive data from parent process via pipe (avoids pickle overhead)
contents = conn.recv()
if mode is not None:
compressed_data = brotli.compress(contents, mode=mode)
else:
compressed_data = brotli.compress(contents)
with open(filepath, 'wb') as f:
f.write(compressed_data)
# Send success status back
conn.send(True)
# No need for explicit cleanup - process exit frees all memory
except Exception as e:
logger.error(f"Brotli compression worker failed: {e}")
conn.send(False)
finally:
conn.close()
def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_uncompressed=False):
"""
Save compressed data using subprocess to isolate memory.
Uses Pipe to avoid pickle overhead for large data.
Args: Args:
contents: data to compress (str or bytes) contents: data to compress (str or bytes)
filepath: destination file path filepath: destination file path
mode: brotli compression mode (e.g., brotli.MODE_TEXT) mode: brotli compression mode (e.g., brotli.MODE_TEXT)
timeout: subprocess timeout in seconds
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
Returns: Returns:
@@ -67,88 +36,43 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
Exception: if compression fails and fallback_uncompressed is False Exception: if compression fails and fallback_uncompressed is False
""" """
import brotli import brotli
import multiprocessing import gc
import sys
# Ensure contents are bytes # Ensure contents are bytes
if isinstance(contents, str): if isinstance(contents, str):
contents = contents.encode('utf-8') contents = contents.encode('utf-8')
# Use explicit spawn context for thread safety (avoids fork() with multi-threaded parent)
# Always use spawn - consistent behavior in tests and production
ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
# Run compression in subprocess using spawn (not fork)
proc = ctx.Process(target=_brotli_compress_worker, args=(child_conn, filepath, mode))
# Windows-safe: Set daemon=False explicitly to avoid issues with process cleanup
proc.daemon = False
proc.start()
try: try:
# Send data to subprocess via pipe (avoids pickle) logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
parent_conn.send(contents)
# Wait for result with timeout if mode is not None:
if parent_conn.poll(timeout): compressed_data = brotli.compress(contents, mode=mode)
success = parent_conn.recv()
else: else:
success = False compressed_data = brotli.compress(contents)
logger.warning(f"Brotli compression subprocess timed out after {timeout}s")
# Graceful termination with platform-aware cleanup
try:
proc.terminate()
except Exception as term_error:
logger.debug(f"Process termination issue (may be normal on Windows): {term_error}")
parent_conn.close() with open(filepath, 'wb') as f:
proc.join(timeout=5) f.write(compressed_data)
# Force kill if still alive after graceful termination logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
if proc.is_alive():
try:
if sys.platform == 'win32':
# Windows: use kill() which is more forceful
proc.kill()
else:
# Unix: terminate() already sent SIGTERM, now try SIGKILL
proc.kill()
proc.join(timeout=2)
except Exception as kill_error:
logger.warning(f"Failed to kill brotli compression process: {kill_error}")
# Check if file was created successfully # Force garbage collection to prevent memory buildup
if success and os.path.exists(filepath): gc.collect()
return filepath
return filepath
except Exception as e: except Exception as e:
logger.error(f"Brotli compression error: {e}") logger.error(f"Brotli compression error: {e}")
try:
parent_conn.close()
except:
pass
try:
proc.terminate()
proc.join(timeout=2)
except:
pass
# Compression failed # Compression failed
if fallback_uncompressed: if fallback_uncompressed:
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed") logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
fallback_path = filepath.replace('.br', '') fallback_path = filepath.replace('.br', '')
with open(fallback_path, 'wb') as f: with open(fallback_path, 'wb') as f:
f.write(contents) f.write(contents)
return fallback_path return fallback_path
else: else:
raise Exception(f"Brotli compression subprocess failed for {filepath}") raise Exception(f"Brotli compression failed for {filepath}: {e}")
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
class model(watch_base): class model(watch_base):
__newest_history_key = None __newest_history_key = None
@@ -492,7 +416,6 @@ class model(watch_base):
self.ensure_data_dir_exists() self.ensure_data_dir_exists()
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False')) skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
# Binary data - detect file type and save without compression # Binary data - detect file type and save without compression
@@ -516,7 +439,7 @@ class model(watch_base):
# Text data - use brotli compression if enabled and above threshold # Text data - use brotli compression if enabled and above threshold
else: else:
if not skip_brotli and len(contents) > threshold: if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
# Compressed text # Compressed text
import brotli import brotli
snapshot_fname = f"{snapshot_id}.txt.br" snapshot_fname = f"{snapshot_id}.txt.br"
@@ -524,7 +447,7 @@ class model(watch_base):
if not os.path.exists(dest): if not os.path.exists(dest):
try: try:
actual_dest = _brotli_subprocess_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True) actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
if actual_dest != dest: if actual_dest != dest:
snapshot_fname = os.path.basename(actual_dest) snapshot_fname = os.path.basename(actual_dest)
except Exception as e: except Exception as e:
@@ -950,13 +873,13 @@ class model(watch_base):
def save_last_text_fetched_before_filters(self, contents): def save_last_text_fetched_before_filters(self, contents):
import brotli import brotli
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br') filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
_brotli_subprocess_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False) _brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
def save_last_fetched_html(self, timestamp, contents): def save_last_fetched_html(self, timestamp, contents):
self.ensure_data_dir_exists() self.ensure_data_dir_exists()
snapshot_fname = f"{timestamp}.html.br" snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(self.watch_data_dir, snapshot_fname) filepath = os.path.join(self.watch_data_dir, snapshot_fname)
_brotli_subprocess_save(contents, filepath, mode=None, fallback_uncompressed=True) _brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
self._prune_last_fetched_html_snapshots() self._prune_last_fetched_html_snapshots()
def get_fetched_html(self, timestamp): def get_fetched_html(self, timestamp):

View File

@@ -13,14 +13,9 @@ Research: https://github.com/libvips/pyvips/issues/234
import multiprocessing import multiprocessing
# CRITICAL: Use 'spawn' instead of 'fork' to avoid inheriting parent's # CRITICAL: Use 'spawn' context instead of 'fork' to avoid inheriting parent's
# LibVIPS threading state which can cause hangs in gaussblur operations # LibVIPS threading state which can cause hangs in gaussblur operations
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
try:
multiprocessing.set_start_method('spawn', force=False)
except RuntimeError:
# Already set, ignore
pass
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height): def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
@@ -95,9 +90,10 @@ def generate_diff_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
Returns: Returns:
bytes: JPEG diff image or None on failure bytes: JPEG diff image or None on failure
""" """
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
p = multiprocessing.Process( p = ctx.Process(
target=_worker_generate_diff, target=_worker_generate_diff,
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height) args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
) )
@@ -140,7 +136,8 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
Returns: Returns:
float: Change percentage float: Change percentage
""" """
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
def _worker_calculate(conn): def _worker_calculate(conn):
try: try:
@@ -185,7 +182,7 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
finally: finally:
conn.close() conn.close()
p = multiprocessing.Process(target=_worker_calculate, args=(child_conn,)) p = ctx.Process(target=_worker_calculate, args=(child_conn,))
p.start() p.start()
result = 0.0 result = 0.0
@@ -233,7 +230,8 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
tuple: (changed_detected, change_percentage) tuple: (changed_detected, change_percentage)
""" """
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True) print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
def _worker_compare(conn): def _worker_compare(conn):
try: try:
@@ -301,7 +299,7 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
finally: finally:
conn.close() conn.close()
p = multiprocessing.Process(target=_worker_compare, args=(child_conn,)) p = ctx.Process(target=_worker_compare, args=(child_conn,))
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True) print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
p.start() p.start()
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True) print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)

View File

@@ -86,6 +86,7 @@ class RecheckPriorityQueue:
def get(self, block: bool = True, timeout: Optional[float] = None): def get(self, block: bool = True, timeout: Optional[float] = None):
"""Thread-safe sync get with priority ordering""" """Thread-safe sync get with priority ordering"""
import queue
try: try:
# Wait for notification # Wait for notification
self.sync_q.get(block=block, timeout=timeout) self.sync_q.get(block=block, timeout=timeout)
@@ -103,8 +104,11 @@ class RecheckPriorityQueue:
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}") logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
return item return item
except queue.Empty:
# Queue is empty with timeout - expected behavior, re-raise without logging
raise
except Exception as e: except Exception as e:
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}") # Re-raise without logging - caller (worker) will handle and log appropriately
raise raise
# ASYNC INTERFACE (for workers) # ASYNC INTERFACE (for workers)

View File

@@ -98,11 +98,12 @@ pytest -vv -s --maxfail=1 tests/test_rss.py
pytest -vv -s --maxfail=1 tests/test_unique_lines.py pytest -vv -s --maxfail=1 tests/test_unique_lines.py
# Try high concurrency # Try high concurrency
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
# Check file:// will pickup a file when enabled # Check file:// will pickup a file when enabled
echo "Hello world" > /tmp/test-file.txt echo "Hello world" > /tmp/test-file.txt
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
# Run it again so that brotli kicks in
TEST_WITH_BROTLI=1 SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=100 FETCH_WORKERS=20 pytest tests/test_history_consistency.py -vv -l -s

View File

@@ -76,7 +76,7 @@ $(document).ready(function () {
// Cache DOM elements for performance // Cache DOM elements for performance
const queueBubble = document.getElementById('queue-bubble'); const queueBubble = document.getElementById('queue-bubble');
const queueSizePagerInfoText = document.getElementById('queue-size-int');
// Only try to connect if authentication isn't required or user is authenticated // Only try to connect if authentication isn't required or user is authenticated
// The 'is_authenticated' variable will be set in the template // The 'is_authenticated' variable will be set in the template
if (typeof is_authenticated !== 'undefined' ? is_authenticated : true) { if (typeof is_authenticated !== 'undefined' ? is_authenticated : true) {
@@ -118,6 +118,10 @@ $(document).ready(function () {
socket.on('queue_size', function (data) { socket.on('queue_size', function (data) {
console.log(`${data.event_timestamp} - Queue size update: ${data.q_length}`); console.log(`${data.event_timestamp} - Queue size update: ${data.q_length}`);
if(queueSizePagerInfoText) {
queueSizePagerInfoText.textContent = parseInt(data.q_length).toLocaleString() || 'None';
}
document.body.classList.toggle('has-queue', parseInt(data.q_length) > 0);
// Update queue bubble in action sidebar // Update queue bubble in action sidebar
//if (queueBubble) { //if (queueBubble) {

View File

@@ -53,7 +53,7 @@
} }
} }
.language-modal { #language-modal {
.language-list { .language-list {
.lang-option { .lang-option {
display: inline-block; display: inline-block;

View File

@@ -1,6 +1,4 @@
.pagination-page-info { .pagination-page-info {
color: #fff;
font-size: 0.85rem;
text-transform: capitalize; text-transform: capitalize;
} }

View File

@@ -1,4 +1,32 @@
/* table related */ /* table related */
#stats_row {
display: flex;
align-items: center;
width: 100%;
color: #fff;
font-size: 0.85rem;
>* {
padding-bottom: 0.5rem;
}
.left {
text-align: left;
}
.right {
opacity: 0.5;
transition: opacity 0.6s ease;
margin-left: auto; /* pushes it to the far right */
text-align: right;
}
}
body.has-queue {
#stats_row {
.right {
opacity: 1.0;
}
}
}
.watch-table { .watch-table {
width: 100%; width: 100%;
font-size: 80%; font-size: 80%;

File diff suppressed because one or more lines are too long

View File

@@ -186,7 +186,7 @@ class ChangeDetectionStore:
# Finally start the thread that will manage periodic data saves to JSON # Finally start the thread that will manage periodic data saves to JSON
# Only start if thread is not already running (reload_state might be called multiple times) # Only start if thread is not already running (reload_state might be called multiple times)
if not self.save_data_thread or not self.save_data_thread.is_alive(): if not self.save_data_thread or not self.save_data_thread.is_alive():
self.save_data_thread = threading.Thread(target=self.save_datastore) self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
self.save_data_thread.start() self.save_data_thread.start()
def rehydrate_entity(self, uuid, entity, processor_override=None): def rehydrate_entity(self, uuid, entity, processor_override=None):

View File

@@ -17,6 +17,8 @@ _MAP = {
def strtobool(value): def strtobool(value):
if not value:
return False
try: try:
return _MAP[str(value).lower()] return _MAP[str(value).lower()]
except KeyError: except KeyError:

View File

@@ -1,5 +1,5 @@
<!DOCTYPE html> <!DOCTYPE html>
<html lang="{{ get_locale() }}" data-darkmode="{{ get_darkmode_state() }}"> <html lang="{{ get_locale()|replace('_', '-') }}" data-darkmode="{{ get_darkmode_state() }}">
<head> <head>
<meta charset="utf-8" > <meta charset="utf-8" >

View File

@@ -2,6 +2,7 @@
import psutil import psutil
import time import time
from threading import Thread from threading import Thread
import multiprocessing
import pytest import pytest
import arrow import arrow
@@ -97,6 +98,34 @@ def cleanup(datastore_path):
if os.path.isfile(f): if os.path.isfile(f):
os.unlink(f) os.unlink(f)
def pytest_configure(config):
"""Configure pytest environment before tests run.
CRITICAL: Set multiprocessing start method to 'fork' for Python 3.14+ compatibility.
Python 3.14 changed the default start method from 'fork' to 'forkserver' on Linux.
The forkserver method requires all objects to be picklable, but pytest-flask's
LiveServer uses nested functions that can't be pickled.
Setting 'fork' explicitly:
- Maintains compatibility with Python 3.10-3.13 (where 'fork' was already default)
- Fixes Python 3.14 pickling errors
- Only affects Unix-like systems (Windows uses 'spawn' regardless)
See: https://github.com/python/cpython/issues/126831
See: https://docs.python.org/3/whatsnew/3.14.html
"""
# Only set if not already set (respects existing configuration)
if multiprocessing.get_start_method(allow_none=True) is None:
try:
# 'fork' is available on Unix-like systems (Linux, macOS)
# On Windows, this will have no effect as 'spawn' is the only option
multiprocessing.set_start_method('fork', force=False)
logger.debug("Set multiprocessing start method to 'fork' for Python 3.14+ compatibility")
except (ValueError, RuntimeError):
# Already set, not available on this platform, or context already created
pass
def pytest_addoption(parser): def pytest_addoption(parser):
"""Add custom command-line options for pytest. """Add custom command-line options for pytest.
@@ -270,3 +299,6 @@ def app(request, datastore_path):
request.addfinalizer(teardown) request.addfinalizer(teardown)
yield app yield app

View File

@@ -206,11 +206,10 @@ def test_regex_error_handling(client, live_server, measure_memory_usage, datasto
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) time.sleep(0.2)
### test regex error handling ### test regex error handling
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"extract_text": '/something bad\d{3/XYZ', data={"extract_text": '/something bad\d{3/XYZ',
"url": test_url, "url": test_url,
"fetch_backend": "html_requests", "fetch_backend": "html_requests",

View File

@@ -4,25 +4,47 @@ import time
import os import os
import json import json
from flask import url_for from flask import url_for
from loguru import logger
from .. import strtobool
from .util import wait_for_all_checks, delete_all_watches from .util import wait_for_all_checks, delete_all_watches
from urllib.parse import urlparse, parse_qs import brotli
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path): def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
# live_server_setup(live_server) # Setup on conftest per function
workers = int(os.getenv("FETCH_WORKERS", 10))
r = range(1, 10+workers)
for one in r: uuids = set()
test_url = url_for('test_endpoint', content_type="text/html", content=str(one), _external=True) sys_fetch_workers = int(os.getenv("FETCH_WORKERS", 10))
res = client.post( workers = range(1, sys_fetch_workers)
url_for("imports.import_page"), now = time.time()
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data for one in workers:
if strtobool(os.getenv("TEST_WITH_BROTLI")):
# A very long string that WILL trigger Brotli compression of the snapshot
# BROTLI_COMPRESS_SIZE_THRESHOLD should be set to say 200
from ..model.Watch import BROTLI_COMPRESS_SIZE_THRESHOLD
content = str(one) + "x" + str(one) * (BROTLI_COMPRESS_SIZE_THRESHOLD + 10)
else:
# Just enough to test datastore
content = str(one)+'x'
test_url = url_for('test_endpoint', content_type="text/html", content=content, _external=True)
uuids.add(client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'title': str(one)}))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
duration = time.time() - now
per_worker = duration/sys_fetch_workers
if sys_fetch_workers < 20:
per_worker_threshold=0.6
elif sys_fetch_workers < 50:
per_worker_threshold = 0.8
else:
per_worker_threshold = 1.5
logger.debug(f"All fetched in {duration:.2f}s, {per_worker}s per worker")
# Problematic on github
#assert per_worker < per_worker_threshold, f"If concurrency is working good, no blocking async problems, each worker ({sys_fetch_workers} workers) should have done his job in under {per_worker_threshold}s, got {per_worker:.2f}s per worker, total duration was {duration:.2f}s"
# Essentially just triggers the DB write/update # Essentially just triggers the DB write/update
res = client.post( res = client.post(
@@ -34,7 +56,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
) )
assert b"Settings updated." in res.data assert b"Settings updated." in res.data
# Wait for the sync DB save to happen
time.sleep(2) time.sleep(2)
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json') json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
@@ -44,14 +66,18 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
json_obj = json.load(f) json_obj = json.load(f)
# assert the right amount of watches was found in the JSON # assert the right amount of watches was found in the JSON
assert len(json_obj['watching']) == len(r), "Correct number of watches was found in the JSON" assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
i=0
i = 0
# each one should have a history.txt containing just one line # each one should have a history.txt containing just one line
for w in json_obj['watching'].keys(): for w in json_obj['watching'].keys():
i+=1 i += 1
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt') history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}" assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
# Should be no errors (could be from brotli etc)
assert not live_server.app.config['DATASTORE'].data['watching'][w].get('last_error')
# Same like in model.Watch # Same like in model.Watch
with open(history_txt_index_file, "r") as f: with open(history_txt_index_file, "r") as f:
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines()) tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
@@ -63,15 +89,21 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
# Find the snapshot one # Find the snapshot one
for fname in files_in_watch_dir: for fname in files_in_watch_dir:
if fname != 'history.txt' and 'html' not in fname: if fname != 'history.txt' and 'html' not in fname:
if strtobool(os.getenv("TEST_WITH_BROTLI")):
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
full_snapshot_history_path = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname)
# contents should match what we requested as content returned from the test url # contents should match what we requested as content returned from the test url
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f: if fname.endswith('.br'):
contents = snapshot_f.read() with open(full_snapshot_history_path, 'rb') as f:
watch_url = json_obj['watching'][w]['url'] contents = brotli.decompress(f.read()).decode('utf-8')
u = urlparse(watch_url) else:
q = parse_qs(u[4]) with open(full_snapshot_history_path, 'r') as snapshot_f:
assert q['content'][0] == contents.strip(), f"Snapshot file {fname} should contain {q['content'][0]}" contents = snapshot_f.read()
watch_title = json_obj['watching'][w]['title']
assert json_obj['watching'][w]['title'], "Watch should have a title set"
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot" assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"

View File

@@ -1,7 +1,71 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
from flask import url_for from flask import url_for
from .util import live_server_setup from .util import live_server_setup, wait_for_all_checks
def test_zh_TW(client, live_server, measure_memory_usage, datastore_path):
import time
test_url = url_for('test_endpoint', _external=True)
# Be sure we got a session cookie
res = client.get(url_for("watchlist.index"), follow_redirects=True)
res = client.get(
url_for("set_language", locale="zh_Hant_TW"), # Traditional
follow_redirects=True
)
# HTML follows BCP 47 language tag rules, not underscore-based locale formats.
assert b'<html lang="zh-Hant-TW"' in res.data
assert b'Cannot set language without session cookie' not in res.data
assert '選擇語言'.encode() in res.data
# Check second set works
res = client.get(
url_for("set_language", locale="en_GB"),
follow_redirects=True
)
assert b'Cannot set language without session cookie' not in res.data
res = client.get(url_for("watchlist.index"), follow_redirects=True)
assert b"Select Language" in res.data, "Second set of language worked"
# Check arbitration between zh_Hant_TW<->zh
res = client.get(
url_for("set_language", locale="zh"), # Simplified chinese
follow_redirects=True
)
res = client.get(url_for("watchlist.index"), follow_redirects=True)
assert "选择语言".encode() in res.data, "Simplified chinese worked and it means the flask-babel cache worked"
# timeago library just hasn't been updated to use the more modern locale naming convention, before BCP 47 / RFC 5646.
# The Python timeago library (https://github.com/hustcc/timeago) supports 48 locales but uses different naming conventions than Flask-Babel.
def test_zh_Hant_TW_timeago_integration():
"""Test that zh_Hant_TW mapping works and timeago renders Traditional Chinese correctly"""
import timeago
from datetime import datetime, timedelta
from changedetectionio.languages import get_timeago_locale
# 1. Test the mapping
mapped_locale = get_timeago_locale('zh_Hant_TW')
assert mapped_locale == 'zh_TW', "zh_Hant_TW should map to timeago's zh_TW"
assert get_timeago_locale('zh_TW') == 'zh_TW', "zh_TW should also map to zh_TW"
# 2. Test timeago library renders Traditional Chinese with the mapped locale
now = datetime.now()
# Test various time periods with Traditional Chinese strings
result_15s = timeago.format(now - timedelta(seconds=15), now, mapped_locale)
assert '秒前' in result_15s, f"Expected '秒前' in '{result_15s}'"
result_5m = timeago.format(now - timedelta(minutes=5), now, mapped_locale)
assert '分鐘前' in result_5m, f"Expected '分鐘前' in '{result_5m}'"
result_2h = timeago.format(now - timedelta(hours=2), now, mapped_locale)
assert '小時前' in result_2h, f"Expected '小時前' in '{result_2h}'"
result_3d = timeago.format(now - timedelta(days=3), now, mapped_locale)
assert '天前' in result_3d, f"Expected '天前' in '{result_3d}'"
def test_language_switching(client, live_server, measure_memory_usage, datastore_path): def test_language_switching(client, live_server, measure_memory_usage, datastore_path):
@@ -13,6 +77,9 @@ def test_language_switching(client, live_server, measure_memory_usage, datastore
3. Switch back to English and verify English text appears 3. Switch back to English and verify English text appears
""" """
# Establish session cookie
client.get(url_for("watchlist.index"), follow_redirects=True)
# Step 1: Set the language to Italian using the /set-language endpoint # Step 1: Set the language to Italian using the /set-language endpoint
res = client.get( res = client.get(
url_for("set_language", locale="it"), url_for("set_language", locale="it"),
@@ -61,6 +128,9 @@ def test_invalid_locale(client, live_server, measure_memory_usage, datastore_pat
The app should ignore invalid locales and continue working. The app should ignore invalid locales and continue working.
""" """
# Establish session cookie
client.get(url_for("watchlist.index"), follow_redirects=True)
# First set to English # First set to English
res = client.get( res = client.get(
url_for("set_language", locale="en"), url_for("set_language", locale="en"),
@@ -93,6 +163,9 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
within the same session. within the same session.
""" """
# Establish session cookie
client.get(url_for("watchlist.index"), follow_redirects=True)
# Set language to Italian # Set language to Italian
res = client.get( res = client.get(
url_for("set_language", locale="it"), url_for("set_language", locale="it"),
@@ -119,6 +192,9 @@ def test_set_language_with_redirect(client, live_server, measure_memory_usage, d
""" """
from flask import url_for from flask import url_for
# Establish session cookie
client.get(url_for("watchlist.index"), follow_redirects=True)
# Set language with a redirect parameter (simulating language change from /settings) # Set language with a redirect parameter (simulating language change from /settings)
res = client.get( res = client.get(
url_for("set_language", locale="de", redirect="/settings"), url_for("set_language", locale="de", redirect="/settings"),

View File

@@ -25,12 +25,13 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage,
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"), uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
data={"url": test_url, "tags": ''}, res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
follow_redirects=True assert b'Queued 1 watch for rechecking.' in res.data
)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) wait_for_all_checks(client)
res = client.post( res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid), url_for("ui.ui_edit.edit_page", uuid=uuid),
data={ data={

View File

@@ -2,7 +2,7 @@
import time import time
from flask import url_for from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches from .util import live_server_setup, wait_for_all_checks, wait_for_watch_history, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
import os import os
@@ -87,6 +87,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
# Wait for initial checks to complete # Wait for initial checks to complete
wait_for_all_checks(client) wait_for_all_checks(client)
# Ensure initial snapshots are saved
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Watches did not save initial snapshots"
# Trigger a change # Trigger a change
set_modified_response(datastore_path=datastore_path) set_modified_response(datastore_path=datastore_path)
@@ -94,6 +97,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
# Ensure all watches have sufficient history for RSS generation
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "Watches did not accumulate sufficient history"
# Get RSS token # Get RSS token
rss_token = extract_rss_token_from_UI(client) rss_token = extract_rss_token_from_UI(client)
assert rss_token is not None assert rss_token is not None
@@ -216,11 +222,13 @@ def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, data
assert b"Watch added" in res.data assert b"Watch added" in res.data
wait_for_all_checks(client) wait_for_all_checks(client)
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Initial snapshots not saved"
# Trigger changes # Trigger changes
set_modified_response(datastore_path=datastore_path) set_modified_response(datastore_path=datastore_path)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client) wait_for_all_checks(client)
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "History not accumulated"
# Get RSS token # Get RSS token
rss_token = extract_rss_token_from_UI(client) rss_token = extract_rss_token_from_UI(client)

View File

@@ -1,8 +1,10 @@
import sys import sys
import os import os
import pytest import pytest
from changedetectionio import html_tools
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
import html_tools
# test generation guide. # test generation guide.
# 1. Do not include encoding in the xml declaration if the test object is a str type. # 1. Do not include encoding in the xml declaration if the test object is a str type.

View File

@@ -164,14 +164,45 @@ def wait_for_all_checks(client=None):
if q_length == 0 and not any_workers_busy: if q_length == 0 and not any_workers_busy:
if empty_since is None: if empty_since is None:
empty_since = time.time() empty_since = time.time()
elif time.time() - empty_since >= 0.15: # Shorter wait # Brief stabilization period for async workers
elif time.time() - empty_since >= 0.3:
break break
else: else:
empty_since = None empty_since = None
attempt += 1 attempt += 1
time.sleep(0.3) time.sleep(0.3)
def wait_for_watch_history(client, min_history_count=2, timeout=10):
"""
Wait for watches to have sufficient history entries.
Useful after wait_for_all_checks() when you need to ensure history is populated.
Args:
client: Test client with access to datastore
min_history_count: Minimum number of history entries required
timeout: Maximum time to wait in seconds
"""
datastore = client.application.config.get('DATASTORE')
start_time = time.time()
while time.time() - start_time < timeout:
all_have_history = True
for uuid, watch in datastore.data['watching'].items():
history_count = len(watch.history.keys())
if history_count < min_history_count:
all_have_history = False
break
if all_have_history:
return True
time.sleep(0.2)
# Timeout - return False
return False
# Replaced by new_live_server_setup and calling per function scope in conftest.py # Replaced by new_live_server_setup and calling per function scope in conftest.py
def live_server_setup(live_server): def live_server_setup(live_server):
return True return True
@@ -189,6 +220,8 @@ def new_live_server_setup(live_server):
@live_server.app.route('/test-endpoint') @live_server.app.route('/test-endpoint')
def test_endpoint(): def test_endpoint():
from loguru import logger
logger.debug(f"/test-endpoint hit {request}")
ctype = request.args.get('content_type') ctype = request.args.get('content_type')
status_code = request.args.get('status_code') status_code = request.args.get('status_code')
content = request.args.get('content') or None content = request.args.get('content') or None

View File

@@ -144,7 +144,6 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path): def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True) four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio') four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
four_o_four_url = four_o_four_url.replace('localhost', 'cdio') four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
@@ -186,3 +185,65 @@ def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_
url_for("ui.form_delete", uuid="all"), url_for("ui.form_delete", uuid="all"),
follow_redirects=True follow_redirects=True
) )
def test_browsersteps_edit_UI_startsession(client, live_server, measure_memory_usage, datastore_path):
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
# Add a watch first
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
test_url = test_url.replace('localhost', 'cdio')
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'fetch_backend': 'html_webdriver', 'paused': True})
# Test starting a browsersteps session
res = client.get(
url_for("browser_steps.browsersteps_start_session", uuid=uuid),
follow_redirects=True
)
assert res.status_code == 200
assert res.is_json
json_data = res.get_json()
assert 'browsersteps_session_id' in json_data
assert json_data['browsersteps_session_id'] # Not empty
browsersteps_session_id = json_data['browsersteps_session_id']
# Verify the session exists in browsersteps_sessions
from changedetectionio.blueprint.browser_steps import browsersteps_sessions, browsersteps_watch_to_session
assert browsersteps_session_id in browsersteps_sessions
assert uuid in browsersteps_watch_to_session
assert browsersteps_watch_to_session[uuid] == browsersteps_session_id
# Verify browsersteps UI shows up on edit page
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
assert b'browsersteps-click-start' in res.data, "Browsersteps manual UI shows up"
# Session should still exist after GET (not cleaned up yet)
assert browsersteps_session_id in browsersteps_sessions
assert uuid in browsersteps_watch_to_session
# Test cleanup happens on save (POST)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"url": test_url,
"tags": "",
'fetch_backend': "html_webdriver",
"time_between_check_use_default": "y",
},
follow_redirects=True
)
assert b"Updated watch" in res.data
# NOW verify the session was cleaned up after save
assert browsersteps_session_id not in browsersteps_sessions
assert uuid not in browsersteps_watch_to_session
# Cleanup
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)

File diff suppressed because it is too large Load Diff

View File

@@ -2,19 +2,18 @@
Worker management module for changedetection.io Worker management module for changedetection.io
Handles asynchronous workers for dynamic worker scaling. Handles asynchronous workers for dynamic worker scaling.
Sync worker support has been removed in favor of async-only architecture. Each worker runs in its own thread with its own event loop for isolation.
""" """
import asyncio import asyncio
import os import os
import threading import threading
import time import time
from concurrent.futures import ThreadPoolExecutor
from loguru import logger from loguru import logger
# Global worker state # Global worker state - each worker has its own thread and event loop
running_async_tasks = [] worker_threads = [] # List of WorkerThread objects
async_loop = None
async_loop_thread = None
# Track currently processing UUIDs for async workers - maps {uuid: worker_id} # Track currently processing UUIDs for async workers - maps {uuid: worker_id}
currently_processing_uuids = {} currently_processing_uuids = {}
@@ -22,89 +21,118 @@ currently_processing_uuids = {}
# Configuration - async workers only # Configuration - async workers only
USE_ASYNC_WORKERS = True USE_ASYNC_WORKERS = True
# Custom ThreadPoolExecutor for queue operations with named threads
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
queue_executor = ThreadPoolExecutor(
max_workers=_max_executor_workers,
thread_name_prefix="QueueGetter-"
)
def start_async_event_loop():
"""Start a dedicated event loop for async workers in a separate thread""" class WorkerThread:
global async_loop """Container for a worker thread with its own event loop"""
logger.info("Starting async event loop for workers") def __init__(self, worker_id, update_q, notification_q, app, datastore):
self.worker_id = worker_id
try: self.update_q = update_q
# Create a new event loop for this thread self.notification_q = notification_q
async_loop = asyncio.new_event_loop() self.app = app
# Set it as the event loop for this thread self.datastore = datastore
asyncio.set_event_loop(async_loop) self.thread = None
self.loop = None
logger.debug(f"Event loop created and set: {async_loop}") self.running = False
# Run the event loop forever def run(self):
async_loop.run_forever() """Run the worker in its own event loop"""
except Exception as e: try:
logger.error(f"Async event loop error: {e}") # Create a new event loop for this thread
finally: self.loop = asyncio.new_event_loop()
# Clean up asyncio.set_event_loop(self.loop)
if async_loop and not async_loop.is_closed(): self.running = True
async_loop.close()
async_loop = None # Run the worker coroutine
logger.info("Async event loop stopped") self.loop.run_until_complete(
start_single_async_worker(
self.worker_id,
self.update_q,
self.notification_q,
self.app,
self.datastore,
queue_executor
)
)
except asyncio.CancelledError:
# Normal shutdown - worker was cancelled
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info(f"Worker {self.worker_id} shutting down gracefully")
except RuntimeError as e:
# Ignore expected shutdown errors
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
logger.error(f"Worker {self.worker_id} runtime error: {e}")
except Exception as e:
logger.error(f"Worker {self.worker_id} thread error: {e}")
finally:
# Clean up
if self.loop and not self.loop.is_closed():
self.loop.close()
self.running = False
self.loop = None
def start(self):
"""Start the worker thread"""
self.thread = threading.Thread(
target=self.run,
daemon=True,
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
)
self.thread.start()
def stop(self):
"""Stop the worker thread"""
if self.loop and self.running:
try:
# Signal the loop to stop
self.loop.call_soon_threadsafe(self.loop.stop)
except RuntimeError:
pass
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
def start_async_workers(n_workers, update_q, notification_q, app, datastore): def start_async_workers(n_workers, update_q, notification_q, app, datastore):
"""Start the async worker management system""" """Start async workers, each with its own thread and event loop for isolation"""
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids global worker_threads, currently_processing_uuids
# Clear any stale UUID tracking state # Clear any stale state
currently_processing_uuids.clear() currently_processing_uuids.clear()
# Start the event loop in a separate thread # Start each worker in its own thread with its own event loop
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True) logger.info(f"Starting {n_workers} async workers (isolated threads)")
async_loop_thread.start()
# Wait for the loop to be available (with timeout for safety)
max_wait_time = 5.0
wait_start = time.time()
while async_loop is None and (time.time() - wait_start) < max_wait_time:
time.sleep(0.1)
if async_loop is None:
logger.error("Failed to start async event loop within timeout")
return
# Additional brief wait to ensure loop is running
time.sleep(0.2)
# Start async workers
logger.info(f"Starting {n_workers} async workers")
for i in range(n_workers): for i in range(n_workers):
try: try:
# Use a factory function to create named worker coroutines worker = WorkerThread(i, update_q, notification_q, app, datastore)
def create_named_worker(worker_id): worker.start()
async def named_worker(): worker_threads.append(worker)
task = asyncio.current_task() # No sleep needed - threads start independently and asynchronously
if task: except Exception as e:
task.set_name(f"async-worker-{worker_id}")
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
return named_worker()
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
running_async_tasks.append(task_future)
except RuntimeError as e:
logger.error(f"Failed to start async worker {i}: {e}") logger.error(f"Failed to start async worker {i}: {e}")
continue continue
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore): async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
"""Start a single async worker with auto-restart capability""" """Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker from changedetectionio.async_update_worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging # Check if we're in pytest environment - if so, be more gentle with logging
import os import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
while not app.config.exit.is_set(): while not app.config.exit.is_set():
try: try:
if not in_pytest: await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
logger.info(f"Starting async worker {worker_id}")
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
# If we reach here, worker exited cleanly # If we reach here, worker exited cleanly
if not in_pytest: if not in_pytest:
logger.info(f"Async worker {worker_id} exited cleanly") logger.info(f"Async worker {worker_id} exited cleanly")
@@ -131,39 +159,38 @@ def start_workers(n_workers, update_q, notification_q, app, datastore):
def add_worker(update_q, notification_q, app, datastore): def add_worker(update_q, notification_q, app, datastore):
"""Add a new async worker (for dynamic scaling)""" """Add a new async worker (for dynamic scaling)"""
global running_async_tasks global worker_threads
if not async_loop: worker_id = len(worker_threads)
logger.error("Async loop not running, cannot add worker")
return False
worker_id = len(running_async_tasks)
logger.info(f"Adding async worker {worker_id}") logger.info(f"Adding async worker {worker_id}")
task_future = asyncio.run_coroutine_threadsafe( try:
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
) worker.start()
running_async_tasks.append(task_future) worker_threads.append(worker)
return True return True
except Exception as e:
logger.error(f"Failed to add worker {worker_id}: {e}")
return False
def remove_worker(): def remove_worker():
"""Remove an async worker (for dynamic scaling)""" """Remove an async worker (for dynamic scaling)"""
global running_async_tasks global worker_threads
if not running_async_tasks: if not worker_threads:
return False return False
# Cancel the last worker # Stop the last worker
task_future = running_async_tasks.pop() worker = worker_threads.pop()
task_future.cancel() worker.stop()
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining") logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
return True return True
def get_worker_count(): def get_worker_count():
"""Get current number of async workers""" """Get current number of async workers"""
return len(running_async_tasks) return len(worker_threads)
def get_running_uuids(): def get_running_uuids():
@@ -249,38 +276,21 @@ def queue_item_async_safe(update_q, item, silent=False):
def shutdown_workers(): def shutdown_workers():
"""Shutdown all async workers fast and aggressively""" """Shutdown all async workers fast and aggressively"""
global async_loop, async_loop_thread, running_async_tasks global worker_threads
# Check if we're in pytest environment - if so, be more gentle with logging # Check if we're in pytest environment - if so, be more gentle with logging
import os import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest: if not in_pytest:
logger.info("Fast shutdown of async workers initiated...") logger.info("Fast shutdown of async workers initiated...")
# Cancel all async tasks immediately # Stop all worker threads
for task_future in running_async_tasks: for worker in worker_threads:
if not task_future.done(): worker.stop()
task_future.cancel()
worker_threads.clear()
# Stop the async event loop immediately
if async_loop and not async_loop.is_closed():
try:
async_loop.call_soon_threadsafe(async_loop.stop)
except RuntimeError:
# Loop might already be stopped
pass
running_async_tasks.clear()
async_loop = None
# Give async thread minimal time to finish, then continue
if async_loop_thread and async_loop_thread.is_alive():
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
if async_loop_thread.is_alive() and not in_pytest:
logger.info("Async thread still running after timeout - continuing with shutdown")
async_loop_thread = None
if not in_pytest: if not in_pytest:
logger.info("Async workers fast shutdown complete") logger.info("Async workers fast shutdown complete")
@@ -290,69 +300,57 @@ def shutdown_workers():
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None): def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
""" """
Dynamically adjust the number of async workers. Dynamically adjust the number of async workers.
Args: Args:
new_count: Target number of workers new_count: Target number of workers
update_q, notification_q, app, datastore: Required for adding new workers update_q, notification_q, app, datastore: Required for adding new workers
Returns: Returns:
dict: Status of the adjustment operation dict: Status of the adjustment operation
""" """
global running_async_tasks global worker_threads
current_count = get_worker_count() current_count = get_worker_count()
if new_count == current_count: if new_count == current_count:
return { return {
'status': 'no_change', 'status': 'no_change',
'message': f'Worker count already at {current_count}', 'message': f'Worker count already at {current_count}',
'current_count': current_count 'current_count': current_count
} }
if new_count > current_count: if new_count > current_count:
# Add workers # Add workers
workers_to_add = new_count - current_count workers_to_add = new_count - current_count
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})") logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
if not all([update_q, notification_q, app, datastore]): if not all([update_q, notification_q, app, datastore]):
return { return {
'status': 'error', 'status': 'error',
'message': 'Missing required parameters to add workers', 'message': 'Missing required parameters to add workers',
'current_count': current_count 'current_count': current_count
} }
for i in range(workers_to_add): for i in range(workers_to_add):
worker_id = len(running_async_tasks) add_worker(update_q, notification_q, app, datastore)
task_future = asyncio.run_coroutine_threadsafe(
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
async_loop
)
running_async_tasks.append(task_future)
return { return {
'status': 'success', 'status': 'success',
'message': f'Added {workers_to_add} workers', 'message': f'Added {workers_to_add} workers',
'previous_count': current_count, 'previous_count': current_count,
'current_count': new_count 'current_count': len(worker_threads)
} }
else: else:
# Remove workers # Remove workers
workers_to_remove = current_count - new_count workers_to_remove = current_count - new_count
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})") logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
removed_count = 0 removed_count = 0
for _ in range(workers_to_remove): for _ in range(workers_to_remove):
if running_async_tasks: if remove_worker():
task_future = running_async_tasks.pop()
task_future.cancel()
# Wait for the task to actually stop
try:
task_future.result(timeout=5) # 5 second timeout
except Exception:
pass # Task was cancelled, which is expected
removed_count += 1 removed_count += 1
return { return {
'status': 'success', 'status': 'success',
'message': f'Removed {removed_count} workers', 'message': f'Removed {removed_count} workers',
@@ -367,72 +365,58 @@ def get_worker_status():
'worker_type': 'async', 'worker_type': 'async',
'worker_count': get_worker_count(), 'worker_count': get_worker_count(),
'running_uuids': get_running_uuids(), 'running_uuids': get_running_uuids(),
'async_loop_running': async_loop is not None, 'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
} }
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None): def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
""" """
Check if the expected number of async workers are running and restart any missing ones. Check if the expected number of async workers are running and restart any missing ones.
Args: Args:
expected_count: Expected number of workers expected_count: Expected number of workers
update_q, notification_q, app, datastore: Required for restarting workers update_q, notification_q, app, datastore: Required for restarting workers
Returns: Returns:
dict: Health check results dict: Health check results
""" """
global running_async_tasks global worker_threads
current_count = get_worker_count() current_count = get_worker_count()
if current_count == expected_count: # Check which workers are actually alive
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
if alive_count == expected_count:
return { return {
'status': 'healthy', 'status': 'healthy',
'expected_count': expected_count, 'expected_count': expected_count,
'actual_count': current_count, 'actual_count': alive_count,
'message': f'All {expected_count} async workers running' 'message': f'All {expected_count} async workers running'
} }
# Check for crashed async workers # Find dead workers
dead_workers = [] dead_workers = []
alive_count = 0 for i, worker in enumerate(worker_threads[:]):
if not worker.thread or not worker.thread.is_alive():
for i, task_future in enumerate(running_async_tasks[:]): dead_workers.append(i)
if task_future.done(): logger.warning(f"Async worker {worker.worker_id} thread is dead")
try:
result = task_future.result()
dead_workers.append(i)
logger.warning(f"Async worker {i} completed unexpectedly")
except Exception as e:
dead_workers.append(i)
logger.error(f"Async worker {i} crashed: {e}")
else:
alive_count += 1
# Remove dead workers from tracking # Remove dead workers from tracking
for i in reversed(dead_workers): for i in reversed(dead_workers):
if i < len(running_async_tasks): if i < len(worker_threads):
running_async_tasks.pop(i) worker_threads.pop(i)
missing_workers = expected_count - alive_count missing_workers = expected_count - alive_count
restarted_count = 0 restarted_count = 0
if missing_workers > 0 and all([update_q, notification_q, app, datastore]): if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
logger.info(f"Restarting {missing_workers} crashed async workers") logger.info(f"Restarting {missing_workers} crashed async workers")
for i in range(missing_workers): for i in range(missing_workers):
worker_id = alive_count + i if add_worker(update_q, notification_q, app, datastore):
try:
task_future = asyncio.run_coroutine_threadsafe(
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
async_loop
)
running_async_tasks.append(task_future)
restarted_count += 1 restarted_count += 1
except Exception as e:
logger.error(f"Failed to restart worker {worker_id}: {e}")
return { return {
'status': 'repaired' if restarted_count > 0 else 'degraded', 'status': 'repaired' if restarted_count > 0 else 'degraded',
'expected_count': expected_count, 'expected_count': expected_count,

View File

@@ -42,7 +42,7 @@ orjson~=3.11
# jq not available on Windows so must be installed manually # jq not available on Windows so must be installed manually
# Notification library # Notification library
apprise==1.9.5 apprise==1.9.6
diff_match_patch diff_match_patch
@@ -70,7 +70,7 @@ lxml >=4.8.0,!=5.2.0,!=5.2.1,<7
# XPath 2.0-3.1 support - 4.2.0 had issues, 4.1.5 stable # XPath 2.0-3.1 support - 4.2.0 had issues, 4.1.5 stable
# Consider updating to latest stable version periodically # Consider updating to latest stable version periodically
elementpath==5.0.4 elementpath==5.1.0
# For fast image comparison in screenshot change detection # For fast image comparison in screenshot change detection
# opencv-python-headless is OPTIONAL (excluded from requirements.txt) # opencv-python-headless is OPTIONAL (excluded from requirements.txt)
@@ -91,7 +91,7 @@ jq~=1.3; python_version >= "3.8" and sys_platform == "linux"
# playwright is installed at Dockerfile build time because it's not available on all platforms # playwright is installed at Dockerfile build time because it's not available on all platforms
pyppeteer-ng==2.0.0rc10 pyppeteer-ng==2.0.0rc11
pyppeteerstealth>=0.0.4 pyppeteerstealth>=0.0.4
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup # Include pytest, so if theres a support issue we can ask them to run these tests on their setup
@@ -100,7 +100,7 @@ pytest-flask ~=1.3
pytest-mock ~=3.15 pytest-mock ~=3.15
# Anything 4.0 and up but not 5.0 # Anything 4.0 and up but not 5.0
jsonschema ~= 4.25 jsonschema ~= 4.26
# OpenAPI validation support # OpenAPI validation support
openapi-core[flask] >= 0.19.0 openapi-core[flask] >= 0.19.0