mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-01 03:46:00 +00:00
Compare commits
32 Commits
dev
...
python-314
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fe5beac2b2 | ||
|
|
5a1d44dc62 | ||
|
|
6db1085337 | ||
|
|
66553e106d | ||
|
|
5b01dbd9f8 | ||
|
|
c86f214fc3 | ||
|
|
32149640d9 | ||
|
|
15f16455fc | ||
|
|
15cdfac9d9 | ||
|
|
04de397916 | ||
|
|
4643082c5b | ||
|
|
3b2b74e62d | ||
|
|
68354cf53d | ||
|
|
3e364e0eba | ||
|
|
06ea29bfc7 | ||
|
|
f4e178955c | ||
|
|
51d531d732 | ||
|
|
e40c4ca97d | ||
|
|
b8ede70f3a | ||
|
|
50b349b464 | ||
|
|
67d097cca7 | ||
|
|
494385a379 | ||
|
|
c2ee84b753 | ||
|
|
c1e0296cda | ||
|
|
f041223c38 | ||
|
|
d36738d7ef | ||
|
|
e51ff34c89 | ||
|
|
ba4ed9cf27 | ||
|
|
33b7f1684d | ||
|
|
3d14df6a11 | ||
|
|
08ce1e28ce | ||
|
|
9421f7e279 |
11
.github/workflows/test-only.yml
vendored
11
.github/workflows/test-only.yml
vendored
@@ -52,4 +52,13 @@ jobs:
|
|||||||
uses: ./.github/workflows/test-stack-reusable-workflow.yml
|
uses: ./.github/workflows/test-stack-reusable-workflow.yml
|
||||||
with:
|
with:
|
||||||
python-version: '3.13'
|
python-version: '3.13'
|
||||||
skip-pypuppeteer: true
|
skip-pypuppeteer: true
|
||||||
|
|
||||||
|
|
||||||
|
test-application-3-14:
|
||||||
|
#if: github.event_name == 'push' && github.ref == 'refs/heads/master'
|
||||||
|
needs: lint-code
|
||||||
|
uses: ./.github/workflows/test-stack-reusable-workflow.yml
|
||||||
|
with:
|
||||||
|
python-version: '3.14'
|
||||||
|
skip-pypuppeteer: false
|
||||||
|
|||||||
@@ -11,6 +11,7 @@ recursive-include changedetectionio/realtime *
|
|||||||
recursive-include changedetectionio/static *
|
recursive-include changedetectionio/static *
|
||||||
recursive-include changedetectionio/templates *
|
recursive-include changedetectionio/templates *
|
||||||
recursive-include changedetectionio/tests *
|
recursive-include changedetectionio/tests *
|
||||||
|
recursive-include changedetectionio/translations *
|
||||||
recursive-include changedetectionio/widgets *
|
recursive-include changedetectionio/widgets *
|
||||||
prune changedetectionio/static/package-lock.json
|
prune changedetectionio/static/package-lock.json
|
||||||
prune changedetectionio/static/styles/node_modules
|
prune changedetectionio/static/styles/node_modules
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||||
# Semver means never use .01, or 00. Should be .1.
|
# Semver means never use .01, or 00. Should be .1.
|
||||||
__version__ = '0.51.4'
|
__version__ = '0.52.6'
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
@@ -41,9 +41,10 @@ from loguru import logger
|
|||||||
#
|
#
|
||||||
# IMPLEMENTATION:
|
# IMPLEMENTATION:
|
||||||
# 1. Explicit contexts everywhere (primary protection):
|
# 1. Explicit contexts everywhere (primary protection):
|
||||||
# - Watch.py: ctx = multiprocessing.get_context('spawn')
|
|
||||||
# - playwright.py: ctx = multiprocessing.get_context('spawn')
|
# - playwright.py: ctx = multiprocessing.get_context('spawn')
|
||||||
# - puppeteer.py: ctx = multiprocessing.get_context('spawn')
|
# - puppeteer.py: ctx = multiprocessing.get_context('spawn')
|
||||||
|
# - isolated_opencv.py: ctx = multiprocessing.get_context('spawn')
|
||||||
|
# - isolated_libvips.py: ctx = multiprocessing.get_context('spawn')
|
||||||
#
|
#
|
||||||
# 2. Global default (defense-in-depth, below):
|
# 2. Global default (defense-in-depth, below):
|
||||||
# - Safety net if future code forgets explicit context
|
# - Safety net if future code forgets explicit context
|
||||||
|
|||||||
@@ -1,5 +1,4 @@
|
|||||||
from blinker import signal
|
from blinker import signal
|
||||||
|
|
||||||
from .processors.exceptions import ProcessorException
|
from .processors.exceptions import ProcessorException
|
||||||
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
|
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
|
||||||
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
|
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
|
||||||
@@ -9,7 +8,6 @@ from changedetectionio.flask_app import watch_check_update
|
|||||||
import asyncio
|
import asyncio
|
||||||
import importlib
|
import importlib
|
||||||
import os
|
import os
|
||||||
import queue
|
|
||||||
import time
|
import time
|
||||||
|
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
@@ -17,36 +15,48 @@ from loguru import logger
|
|||||||
# Async version of update_worker
|
# Async version of update_worker
|
||||||
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
|
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
|
||||||
|
|
||||||
async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
|
||||||
"""
|
"""
|
||||||
Async worker function that processes watch check jobs from the queue.
|
Async worker function that processes watch check jobs from the queue.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
worker_id: Unique identifier for this worker
|
worker_id: Unique identifier for this worker
|
||||||
q: AsyncSignalPriorityQueue containing jobs to process
|
q: AsyncSignalPriorityQueue containing jobs to process
|
||||||
notification_q: Standard queue for notifications
|
notification_q: Standard queue for notifications
|
||||||
app: Flask application instance
|
app: Flask application instance
|
||||||
datastore: Application datastore
|
datastore: Application datastore
|
||||||
|
executor: ThreadPoolExecutor for queue operations (optional)
|
||||||
"""
|
"""
|
||||||
# Set a descriptive name for this task
|
# Set a descriptive name for this task
|
||||||
task = asyncio.current_task()
|
task = asyncio.current_task()
|
||||||
if task:
|
if task:
|
||||||
task.set_name(f"async-worker-{worker_id}")
|
task.set_name(f"async-worker-{worker_id}")
|
||||||
|
|
||||||
logger.info(f"Starting async worker {worker_id}")
|
logger.info(f"Starting async worker {worker_id}")
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
update_handler = None
|
update_handler = None
|
||||||
watch = None
|
watch = None
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Use native janus async interface - no threads needed!
|
# Use sync interface via run_in_executor since each worker has its own event loop
|
||||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
loop = asyncio.get_event_loop()
|
||||||
|
queued_item_data = await asyncio.wait_for(
|
||||||
|
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
|
||||||
|
timeout=1.5
|
||||||
|
)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# No jobs available, continue loop
|
# No jobs available, continue loop
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
# Handle expected Empty exception from queue timeout
|
||||||
|
import queue
|
||||||
|
if isinstance(e, queue.Empty):
|
||||||
|
# Queue is empty, normal behavior - just continue
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Unexpected exception - log as critical
|
||||||
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
||||||
|
|
||||||
# Log queue health for debugging
|
# Log queue health for debugging
|
||||||
@@ -414,14 +424,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
||||||
|
|
||||||
finally:
|
finally:
|
||||||
|
|
||||||
try:
|
|
||||||
await update_handler.fetcher.quit(watch=watch)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
|
||||||
|
|
||||||
# Always cleanup - this runs whether there was an exception or not
|
# Always cleanup - this runs whether there was an exception or not
|
||||||
if uuid:
|
if uuid:
|
||||||
|
try:
|
||||||
|
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||||
|
await update_handler.fetcher.quit(watch=watch)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||||
try:
|
try:
|
||||||
# Mark UUID as no longer being processed by this worker
|
# Mark UUID as no longer being processed by this worker
|
||||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||||
@@ -460,7 +469,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||||
except Exception as cleanup_error:
|
except Exception as cleanup_error:
|
||||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||||
|
|
||||||
|
del(uuid)
|
||||||
|
|
||||||
# Brief pause before continuing to avoid tight error loops (only on error)
|
# Brief pause before continuing to avoid tight error loops (only on error)
|
||||||
if 'e' in locals():
|
if 'e' in locals():
|
||||||
await asyncio.sleep(1.0)
|
await asyncio.sleep(1.0)
|
||||||
|
|||||||
@@ -92,7 +92,12 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
|
|
||||||
# Be sure we're written fresh
|
# Be sure we're written fresh
|
||||||
datastore.sync_to_json()
|
datastore.sync_to_json()
|
||||||
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
|
zip_thread = threading.Thread(
|
||||||
|
target=create_backup,
|
||||||
|
args=(datastore.datastore_path, datastore.data.get("watching")),
|
||||||
|
daemon=True,
|
||||||
|
name="BackupCreator"
|
||||||
|
)
|
||||||
zip_thread.start()
|
zip_thread.start()
|
||||||
backup_threads.append(zip_thread)
|
backup_threads.append(zip_thread)
|
||||||
flash(gettext("Backup building in background, check back in a few minutes."))
|
flash(gettext("Backup building in background, check back in a few minutes."))
|
||||||
|
|||||||
@@ -21,31 +21,154 @@ from changedetectionio.flask_app import login_optionally_required
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
browsersteps_sessions = {}
|
browsersteps_sessions = {}
|
||||||
|
browsersteps_watch_to_session = {} # Maps watch_uuid -> browsersteps_session_id
|
||||||
io_interface_context = None
|
io_interface_context = None
|
||||||
import json
|
import json
|
||||||
import hashlib
|
import hashlib
|
||||||
from flask import Response
|
from flask import Response
|
||||||
import asyncio
|
import asyncio
|
||||||
import threading
|
import threading
|
||||||
|
import time
|
||||||
|
|
||||||
def run_async_in_browser_loop(coro):
|
# Dedicated event loop for ALL browser steps sessions
|
||||||
"""Run async coroutine using the existing async worker event loop"""
|
_browser_steps_loop = None
|
||||||
from changedetectionio import worker_handler
|
_browser_steps_thread = None
|
||||||
|
_browser_steps_loop_lock = threading.Lock()
|
||||||
# Use the existing async worker event loop instead of creating a new one
|
|
||||||
if worker_handler.USE_ASYNC_WORKERS and worker_handler.async_loop and not worker_handler.async_loop.is_closed():
|
def _start_browser_steps_loop():
|
||||||
logger.debug("Browser steps using existing async worker event loop")
|
"""Start a dedicated event loop for browser steps in its own thread"""
|
||||||
future = asyncio.run_coroutine_threadsafe(coro, worker_handler.async_loop)
|
global _browser_steps_loop
|
||||||
return future.result()
|
|
||||||
else:
|
# Create and set the event loop for this thread
|
||||||
# Fallback: create a new event loop (for sync workers or if async loop not available)
|
loop = asyncio.new_event_loop()
|
||||||
logger.debug("Browser steps creating temporary event loop")
|
asyncio.set_event_loop(loop)
|
||||||
loop = asyncio.new_event_loop()
|
_browser_steps_loop = loop
|
||||||
asyncio.set_event_loop(loop)
|
|
||||||
|
logger.debug("Browser steps event loop started")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Run the loop forever - handles all browsersteps sessions
|
||||||
|
loop.run_forever()
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Browser steps event loop error: {e}")
|
||||||
|
finally:
|
||||||
try:
|
try:
|
||||||
return loop.run_until_complete(coro)
|
# Cancel all remaining tasks
|
||||||
|
pending = asyncio.all_tasks(loop)
|
||||||
|
for task in pending:
|
||||||
|
task.cancel()
|
||||||
|
|
||||||
|
# Wait for tasks to finish cancellation
|
||||||
|
if pending:
|
||||||
|
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error during browser steps loop cleanup: {e}")
|
||||||
finally:
|
finally:
|
||||||
loop.close()
|
loop.close()
|
||||||
|
logger.debug("Browser steps event loop closed")
|
||||||
|
|
||||||
|
def _ensure_browser_steps_loop():
|
||||||
|
"""Ensure the browser steps event loop is running"""
|
||||||
|
global _browser_steps_loop, _browser_steps_thread
|
||||||
|
|
||||||
|
with _browser_steps_loop_lock:
|
||||||
|
if _browser_steps_thread is None or not _browser_steps_thread.is_alive():
|
||||||
|
logger.debug("Starting browser steps event loop thread")
|
||||||
|
_browser_steps_thread = threading.Thread(
|
||||||
|
target=_start_browser_steps_loop,
|
||||||
|
daemon=True,
|
||||||
|
name="BrowserStepsEventLoop"
|
||||||
|
)
|
||||||
|
_browser_steps_thread.start()
|
||||||
|
|
||||||
|
# Wait for the loop to be ready
|
||||||
|
timeout = 5.0
|
||||||
|
start_time = time.time()
|
||||||
|
while _browser_steps_loop is None:
|
||||||
|
if time.time() - start_time > timeout:
|
||||||
|
raise RuntimeError("Browser steps event loop failed to start")
|
||||||
|
time.sleep(0.01)
|
||||||
|
|
||||||
|
logger.debug("Browser steps event loop thread started and ready")
|
||||||
|
|
||||||
|
def run_async_in_browser_loop(coro):
|
||||||
|
"""Run async coroutine using the dedicated browser steps event loop"""
|
||||||
|
_ensure_browser_steps_loop()
|
||||||
|
|
||||||
|
if _browser_steps_loop and not _browser_steps_loop.is_closed():
|
||||||
|
logger.debug("Browser steps using dedicated event loop")
|
||||||
|
future = asyncio.run_coroutine_threadsafe(coro, _browser_steps_loop)
|
||||||
|
return future.result()
|
||||||
|
else:
|
||||||
|
raise RuntimeError("Browser steps event loop is not available")
|
||||||
|
|
||||||
|
def cleanup_expired_sessions():
|
||||||
|
"""Remove expired browsersteps sessions and cleanup their resources"""
|
||||||
|
global browsersteps_sessions, browsersteps_watch_to_session
|
||||||
|
|
||||||
|
expired_session_ids = []
|
||||||
|
|
||||||
|
# Find expired sessions
|
||||||
|
for session_id, session_data in browsersteps_sessions.items():
|
||||||
|
browserstepper = session_data.get('browserstepper')
|
||||||
|
if browserstepper and browserstepper.has_expired:
|
||||||
|
expired_session_ids.append(session_id)
|
||||||
|
|
||||||
|
# Cleanup expired sessions
|
||||||
|
for session_id in expired_session_ids:
|
||||||
|
logger.debug(f"Cleaning up expired browsersteps session {session_id}")
|
||||||
|
session_data = browsersteps_sessions[session_id]
|
||||||
|
|
||||||
|
# Cleanup playwright resources asynchronously
|
||||||
|
browserstepper = session_data.get('browserstepper')
|
||||||
|
if browserstepper:
|
||||||
|
try:
|
||||||
|
run_async_in_browser_loop(browserstepper.cleanup())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error cleaning up session {session_id}: {e}")
|
||||||
|
|
||||||
|
# Remove from sessions dict
|
||||||
|
del browsersteps_sessions[session_id]
|
||||||
|
|
||||||
|
# Remove from watch mapping
|
||||||
|
for watch_uuid, mapped_session_id in list(browsersteps_watch_to_session.items()):
|
||||||
|
if mapped_session_id == session_id:
|
||||||
|
del browsersteps_watch_to_session[watch_uuid]
|
||||||
|
break
|
||||||
|
|
||||||
|
if expired_session_ids:
|
||||||
|
logger.info(f"Cleaned up {len(expired_session_ids)} expired browsersteps session(s)")
|
||||||
|
|
||||||
|
def cleanup_session_for_watch(watch_uuid):
|
||||||
|
"""Cleanup a specific browsersteps session for a watch UUID"""
|
||||||
|
global browsersteps_sessions, browsersteps_watch_to_session
|
||||||
|
|
||||||
|
session_id = browsersteps_watch_to_session.get(watch_uuid)
|
||||||
|
if not session_id:
|
||||||
|
logger.debug(f"No browsersteps session found for watch {watch_uuid}")
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug(f"Cleaning up browsersteps session {session_id} for watch {watch_uuid}")
|
||||||
|
|
||||||
|
session_data = browsersteps_sessions.get(session_id)
|
||||||
|
if session_data:
|
||||||
|
browserstepper = session_data.get('browserstepper')
|
||||||
|
if browserstepper:
|
||||||
|
try:
|
||||||
|
run_async_in_browser_loop(browserstepper.cleanup())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error cleaning up session {session_id} for watch {watch_uuid}: {e}")
|
||||||
|
|
||||||
|
# Remove from sessions dict
|
||||||
|
del browsersteps_sessions[session_id]
|
||||||
|
|
||||||
|
# Remove from watch mapping
|
||||||
|
del browsersteps_watch_to_session[watch_uuid]
|
||||||
|
|
||||||
|
logger.debug(f"Cleaned up session for watch {watch_uuid}")
|
||||||
|
|
||||||
|
# Opportunistically cleanup any other expired sessions
|
||||||
|
cleanup_expired_sessions()
|
||||||
|
|
||||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||||
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
|
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
|
||||||
@@ -123,6 +246,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
if not watch_uuid:
|
if not watch_uuid:
|
||||||
return make_response('No Watch UUID specified', 500)
|
return make_response('No Watch UUID specified', 500)
|
||||||
|
|
||||||
|
# Cleanup any existing session for this watch
|
||||||
|
cleanup_session_for_watch(watch_uuid)
|
||||||
|
|
||||||
logger.debug("Starting connection with playwright")
|
logger.debug("Starting connection with playwright")
|
||||||
logger.debug("browser_steps.py connecting")
|
logger.debug("browser_steps.py connecting")
|
||||||
|
|
||||||
@@ -131,6 +257,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
|
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
|
||||||
start_browsersteps_session(watch_uuid)
|
start_browsersteps_session(watch_uuid)
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Store the mapping of watch_uuid -> browsersteps_session_id
|
||||||
|
browsersteps_watch_to_session[watch_uuid] = browsersteps_session_id
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
if 'ECONNREFUSED' in str(e):
|
if 'ECONNREFUSED' in str(e):
|
||||||
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
||||||
|
|||||||
@@ -47,9 +47,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
|||||||
if len(dates) < 2:
|
if len(dates) < 2:
|
||||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||||
|
|
||||||
# Add uuid to watch for proper functioning
|
|
||||||
watch['uuid'] = uuid
|
|
||||||
|
|
||||||
# Get the number of diffs to include (default: 5)
|
# Get the number of diffs to include (default: 5)
|
||||||
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
||||||
|
|
||||||
@@ -101,7 +98,7 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
|||||||
date_index_from, date_index_to)
|
date_index_from, date_index_to)
|
||||||
|
|
||||||
# Create and populate feed entry
|
# Create and populate feed entry
|
||||||
guid = f"{watch['uuid']}/{timestamp_to}"
|
guid = f"{uuid}/{timestamp_to}"
|
||||||
fe = fg.add_entry()
|
fe = fg.add_entry()
|
||||||
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
|
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
|
||||||
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
|
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
|
||||||
|
|||||||
@@ -63,11 +63,8 @@ def construct_tag_routes(rss_blueprint, datastore):
|
|||||||
|
|
||||||
# Only include unviewed watches
|
# Only include unviewed watches
|
||||||
if not watch.viewed:
|
if not watch.viewed:
|
||||||
# Add uuid to watch for proper functioning
|
# Include a link to the diff page (use uuid from loop, don't modify watch dict)
|
||||||
watch['uuid'] = uuid
|
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=uuid, _external=True)}
|
||||||
|
|
||||||
# Include a link to the diff page
|
|
||||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=watch['uuid'], _external=True)}
|
|
||||||
|
|
||||||
# Get watch label
|
# Get watch label
|
||||||
watch_label = get_watch_label(datastore, watch)
|
watch_label = get_watch_label(datastore, watch)
|
||||||
|
|||||||
@@ -50,7 +50,8 @@
|
|||||||
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
|
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
|
||||||
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
|
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
|
||||||
<td>
|
<td>
|
||||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||||
|
<a href="{{ url_for('ui.form_watch_checknow', tag=uuid) }}" class="pure-button pure-button-primary" >{{ _('Recheck') }}</a>
|
||||||
<a class="pure-button button-error"
|
<a class="pure-button button-error"
|
||||||
href="{{ url_for('tags.delete', uuid=uuid) }}"
|
href="{{ url_for('tags.delete', uuid=uuid) }}"
|
||||||
data-requires-confirm
|
data-requires-confirm
|
||||||
|
|||||||
@@ -238,6 +238,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
||||||
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
||||||
|
|
||||||
|
# Cleanup any browsersteps session for this watch
|
||||||
|
try:
|
||||||
|
from changedetectionio.blueprint.browser_steps import cleanup_session_for_watch
|
||||||
|
cleanup_session_for_watch(uuid)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error cleaning up browsersteps session: {e}")
|
||||||
|
|
||||||
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
|
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
|
||||||
# But in the case something is added we should save straight away
|
# But in the case something is added we should save straight away
|
||||||
datastore.needs_write_urgent = True
|
datastore.needs_write_urgent = True
|
||||||
@@ -325,8 +332,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
||||||
},
|
},
|
||||||
'settings_application': datastore.data['settings']['application'],
|
'settings_application': datastore.data['settings']['application'],
|
||||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
|
||||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
|
||||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||||
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||||
|
|||||||
@@ -206,7 +206,7 @@ Math: {{ 1 + 1 }}") }}
|
|||||||
|
|
||||||
<div class="tab-pane-inner" id="browser-steps">
|
<div class="tab-pane-inner" id="browser-steps">
|
||||||
{% if capabilities.supports_browser_steps %}
|
{% if capabilities.supports_browser_steps %}
|
||||||
{% if visual_selector_data_ready %}
|
{% if true %}
|
||||||
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
|
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
|
||||||
<fieldset>
|
<fieldset>
|
||||||
<div class="pure-control-group">
|
<div class="pure-control-group">
|
||||||
|
|||||||
@@ -2,7 +2,6 @@ import os
|
|||||||
import time
|
import time
|
||||||
|
|
||||||
from flask import Blueprint, request, make_response, render_template, redirect, url_for, flash, session
|
from flask import Blueprint, request, make_response, render_template, redirect, url_for, flash, session
|
||||||
from flask_login import current_user
|
|
||||||
from flask_paginate import Pagination, get_page_parameter
|
from flask_paginate import Pagination, get_page_parameter
|
||||||
|
|
||||||
from changedetectionio import forms
|
from changedetectionio import forms
|
||||||
@@ -85,6 +84,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
|
app_rss_token=datastore.data['settings']['application'].get('rss_access_token'),
|
||||||
datastore=datastore,
|
datastore=datastore,
|
||||||
errored_count=errored_count,
|
errored_count=errored_count,
|
||||||
|
extra_classes='has-queue' if len(update_q.queue) else '',
|
||||||
form=form,
|
form=form,
|
||||||
generate_tag_colors=processors.generate_processor_badge_colors,
|
generate_tag_colors=processors.generate_processor_badge_colors,
|
||||||
guid=datastore.data['app_guid'],
|
guid=datastore.data['app_guid'],
|
||||||
@@ -92,9 +92,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
|
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
|
||||||
now_time_server=round(time.time()),
|
now_time_server=round(time.time()),
|
||||||
pagination=pagination,
|
pagination=pagination,
|
||||||
|
processor_badge_css=processors.get_processor_badge_css(),
|
||||||
processor_badge_texts=processors.get_processor_badge_texts(),
|
processor_badge_texts=processors.get_processor_badge_texts(),
|
||||||
processor_descriptions=processors.get_processor_descriptions(),
|
processor_descriptions=processors.get_processor_descriptions(),
|
||||||
processor_badge_css=processors.get_processor_badge_css(),
|
queue_size=len(update_q.queue),
|
||||||
queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue],
|
queued_uuids=[q_uuid.item['uuid'] for q_uuid in update_q.queue],
|
||||||
search_q=request.args.get('q', '').strip(),
|
search_q=request.args.get('q', '').strip(),
|
||||||
sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'),
|
sort_attribute=request.args.get('sort') if request.args.get('sort') else request.cookies.get('sort'),
|
||||||
|
|||||||
@@ -99,9 +99,14 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
|
|||||||
data-confirm-message="{{ _('<p>Are you sure you want to delete the selected watches?</strong></p><p>This action cannot be undone.</p>') }}"
|
data-confirm-message="{{ _('<p>Are you sure you want to delete the selected watches?</strong></p><p>This action cannot be undone.</p>') }}"
|
||||||
data-confirm-button="{{ _('Delete') }}"><i data-feather="trash" style="width: 14px; height: 14px; stroke: white; margin-right: 4px;"></i>{{ _('Delete') }}</button>
|
data-confirm-button="{{ _('Delete') }}"><i data-feather="trash" style="width: 14px; height: 14px; stroke: white; margin-right: 4px;"></i>{{ _('Delete') }}</button>
|
||||||
</div>
|
</div>
|
||||||
{%- if watches|length >= pagination.per_page -%}
|
|
||||||
{{ pagination.info }}
|
<div id="stats_row">
|
||||||
{%- endif -%}
|
<div class="left">{%- if watches|length >= pagination.per_page -%}{{ pagination.info }}{%- endif -%}</div>
|
||||||
|
<div class="right" >{{ _('Queued size') }}: <span id="queue-size-int">{{ queue_size }}</span></div>
|
||||||
|
</div>
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
{%- if search_q -%}<div id="search-result-info">{{ _('Searching') }} "<strong><i>{{search_q}}</i></strong>"</div>{%- endif -%}
|
{%- if search_q -%}<div id="search-result-info">{{ _('Searching') }} "<strong><i>{{search_q}}</i></strong>"</div>{%- endif -%}
|
||||||
<div>
|
<div>
|
||||||
<a href="{{url_for('watchlist.index')}}" class="pure-button button-tag {{'active' if not active_tag_uuid }}">{{ _('All') }}</a>
|
<a href="{{url_for('watchlist.index')}}" class="pure-button button-tag {{'active' if not active_tag_uuid }}">{{ _('All') }}</a>
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import gc
|
||||||
import json
|
import json
|
||||||
import os
|
import os
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
@@ -185,20 +186,33 @@ class fetcher(Fetcher):
|
|||||||
super().screenshot_step(step_n=step_n)
|
super().screenshot_step(step_n=step_n)
|
||||||
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||||
|
|
||||||
|
# Request GC immediately after screenshot to free memory
|
||||||
|
# Screenshots can be large and browser steps take many of them
|
||||||
|
await self.page.request_gc()
|
||||||
|
|
||||||
if self.browser_steps_screenshot_path is not None:
|
if self.browser_steps_screenshot_path is not None:
|
||||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
|
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
|
||||||
logger.debug(f"Saving step screenshot to {destination}")
|
logger.debug(f"Saving step screenshot to {destination}")
|
||||||
with open(destination, 'wb') as f:
|
with open(destination, 'wb') as f:
|
||||||
f.write(screenshot)
|
f.write(screenshot)
|
||||||
|
# Clear local reference to allow screenshot bytes to be collected
|
||||||
|
del screenshot
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
async def save_step_html(self, step_n):
|
async def save_step_html(self, step_n):
|
||||||
super().save_step_html(step_n=step_n)
|
super().save_step_html(step_n=step_n)
|
||||||
content = await self.page.content()
|
content = await self.page.content()
|
||||||
|
|
||||||
|
# Request GC after getting page content
|
||||||
|
await self.page.request_gc()
|
||||||
|
|
||||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
||||||
logger.debug(f"Saving step HTML to {destination}")
|
logger.debug(f"Saving step HTML to {destination}")
|
||||||
with open(destination, 'w', encoding='utf-8') as f:
|
with open(destination, 'w', encoding='utf-8') as f:
|
||||||
f.write(content)
|
f.write(content)
|
||||||
|
# Clear local reference
|
||||||
|
del content
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
async def run(self,
|
async def run(self,
|
||||||
fetch_favicon=True,
|
fetch_favicon=True,
|
||||||
@@ -305,6 +319,12 @@ class fetcher(Fetcher):
|
|||||||
|
|
||||||
if self.status_code != 200 and not ignore_status_codes:
|
if self.status_code != 200 and not ignore_status_codes:
|
||||||
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
|
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
|
||||||
|
# Cleanup before raising to prevent memory leak
|
||||||
|
await self.page.close()
|
||||||
|
await context.close()
|
||||||
|
await browser.close()
|
||||||
|
# Force garbage collection to release Playwright resources immediately
|
||||||
|
gc.collect()
|
||||||
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
|
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
|
||||||
|
|
||||||
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
|
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
|
||||||
@@ -313,48 +333,52 @@ class fetcher(Fetcher):
|
|||||||
await browser.close()
|
await browser.close()
|
||||||
raise EmptyReply(url=url, status_code=response.status)
|
raise EmptyReply(url=url, status_code=response.status)
|
||||||
|
|
||||||
# Run Browser Steps here
|
# Wrap remaining operations in try/finally to ensure cleanup
|
||||||
if self.browser_steps_get_valid_steps():
|
|
||||||
await self.iterate_browser_steps(start_url=url)
|
|
||||||
|
|
||||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
|
||||||
|
|
||||||
now = time.time()
|
|
||||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
|
||||||
if current_include_filters is not None:
|
|
||||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
|
||||||
else:
|
|
||||||
await self.page.evaluate("var include_filters=''")
|
|
||||||
await self.page.request_gc()
|
|
||||||
|
|
||||||
# request_gc before and after evaluate to free up memory
|
|
||||||
# @todo browsersteps etc
|
|
||||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
|
||||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
|
||||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
|
||||||
"max_height": MAX_TOTAL_HEIGHT
|
|
||||||
})
|
|
||||||
await self.page.request_gc()
|
|
||||||
|
|
||||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
|
||||||
await self.page.request_gc()
|
|
||||||
|
|
||||||
self.content = await self.page.content()
|
|
||||||
await self.page.request_gc()
|
|
||||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
|
||||||
|
|
||||||
|
|
||||||
# Bug 3 in Playwright screenshot handling
|
|
||||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
|
||||||
# JPEG is better here because the screenshots can be very very large
|
|
||||||
|
|
||||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
|
||||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
|
||||||
# acceptable screenshot quality here
|
|
||||||
try:
|
try:
|
||||||
|
# Run Browser Steps here
|
||||||
|
if self.browser_steps_get_valid_steps():
|
||||||
|
await self.iterate_browser_steps(start_url=url)
|
||||||
|
|
||||||
|
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||||
|
|
||||||
|
now = time.time()
|
||||||
|
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||||
|
if current_include_filters is not None:
|
||||||
|
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||||
|
else:
|
||||||
|
await self.page.evaluate("var include_filters=''")
|
||||||
|
await self.page.request_gc()
|
||||||
|
|
||||||
|
# request_gc before and after evaluate to free up memory
|
||||||
|
# @todo browsersteps etc
|
||||||
|
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||||
|
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||||
|
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||||
|
"max_height": MAX_TOTAL_HEIGHT
|
||||||
|
})
|
||||||
|
await self.page.request_gc()
|
||||||
|
|
||||||
|
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||||
|
await self.page.request_gc()
|
||||||
|
|
||||||
|
self.content = await self.page.content()
|
||||||
|
await self.page.request_gc()
|
||||||
|
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||||
|
|
||||||
|
|
||||||
|
# Bug 3 in Playwright screenshot handling
|
||||||
|
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||||
|
# JPEG is better here because the screenshots can be very very large
|
||||||
|
|
||||||
|
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||||
|
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||||
|
# acceptable screenshot quality here
|
||||||
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
|
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
|
||||||
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||||
|
|
||||||
|
except ScreenshotUnavailable:
|
||||||
|
# Re-raise screenshot unavailable exceptions
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# It's likely the screenshot was too long/big and something crashed
|
# It's likely the screenshot was too long/big and something crashed
|
||||||
raise ScreenshotUnavailable(url=url, status_code=self.status_code)
|
raise ScreenshotUnavailable(url=url, status_code=self.status_code)
|
||||||
@@ -389,6 +413,10 @@ class fetcher(Fetcher):
|
|||||||
pass
|
pass
|
||||||
browser = None
|
browser = None
|
||||||
|
|
||||||
|
# Force Python GC to release Playwright resources immediately
|
||||||
|
# Playwright objects can have circular references that delay cleanup
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
|
|
||||||
# Plugin registration for built-in fetcher
|
# Plugin registration for built-in fetcher
|
||||||
class PlaywrightFetcherPlugin:
|
class PlaywrightFetcherPlugin:
|
||||||
|
|||||||
@@ -15,7 +15,7 @@ class fetcher(Fetcher):
|
|||||||
proxy_url = None
|
proxy_url = None
|
||||||
|
|
||||||
# Capability flags
|
# Capability flags
|
||||||
supports_browser_steps = True
|
supports_browser_steps = False
|
||||||
supports_screenshots = True
|
supports_screenshots = True
|
||||||
supports_xpath_element_data = True
|
supports_xpath_element_data = True
|
||||||
|
|
||||||
@@ -156,6 +156,9 @@ class fetcher(Fetcher):
|
|||||||
from PIL import Image
|
from PIL import Image
|
||||||
import io
|
import io
|
||||||
img = Image.open(io.BytesIO(screenshot_png))
|
img = Image.open(io.BytesIO(screenshot_png))
|
||||||
|
# Convert to RGB if needed (JPEG doesn't support transparency)
|
||||||
|
if img.mode != 'RGB':
|
||||||
|
img = img.convert('RGB')
|
||||||
jpeg_buffer = io.BytesIO()
|
jpeg_buffer = io.BytesIO()
|
||||||
img.save(jpeg_buffer, format='JPEG', quality=int(os.getenv("SCREENSHOT_QUALITY", 72)))
|
img.save(jpeg_buffer, format='JPEG', quality=int(os.getenv("SCREENSHOT_QUALITY", 72)))
|
||||||
self.screenshot = jpeg_buffer.getvalue()
|
self.screenshot = jpeg_buffer.getvalue()
|
||||||
|
|||||||
@@ -57,14 +57,15 @@ class SignalPriorityQueue(queue.PriorityQueue):
|
|||||||
def put(self, item, block=True, timeout=None):
|
def put(self, item, block=True, timeout=None):
|
||||||
# Call the parent's put method first
|
# Call the parent's put method first
|
||||||
super().put(item, block, timeout)
|
super().put(item, block, timeout)
|
||||||
|
|
||||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||||
uuid = item.item['uuid']
|
uuid = item.item['uuid']
|
||||||
# Get the signal and send it if it exists
|
# Get the signal and send it if it exists
|
||||||
watch_check_update = signal('watch_check_update')
|
watch_check_update = signal('watch_check_update')
|
||||||
if watch_check_update:
|
if watch_check_update:
|
||||||
# Send the watch_uuid parameter
|
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||||
|
# Signal handlers may iterate the queue/datastore while holding locks
|
||||||
watch_check_update.send(watch_uuid=uuid)
|
watch_check_update.send(watch_uuid=uuid)
|
||||||
|
|
||||||
# Send queue_length signal with current queue size
|
# Send queue_length signal with current queue size
|
||||||
@@ -312,14 +313,15 @@ class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
|
|||||||
async def put(self, item):
|
async def put(self, item):
|
||||||
# Call the parent's put method first
|
# Call the parent's put method first
|
||||||
await super().put(item)
|
await super().put(item)
|
||||||
|
|
||||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||||
uuid = item.item['uuid']
|
uuid = item.item['uuid']
|
||||||
# Get the signal and send it if it exists
|
# Get the signal and send it if it exists
|
||||||
watch_check_update = signal('watch_check_update')
|
watch_check_update = signal('watch_check_update')
|
||||||
if watch_check_update:
|
if watch_check_update:
|
||||||
# Send the watch_uuid parameter
|
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||||
|
# Signal handlers may iterate the queue/datastore while holding locks
|
||||||
watch_check_update.send(watch_uuid=uuid)
|
watch_check_update.send(watch_uuid=uuid)
|
||||||
|
|
||||||
# Send queue_length signal with current queue size
|
# Send queue_length signal with current queue size
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
import timeago
|
import timeago
|
||||||
from blinker import signal
|
from blinker import signal
|
||||||
|
from pathlib import Path
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from threading import Event
|
from threading import Event
|
||||||
@@ -84,6 +85,10 @@ app.config['NEW_VERSION_AVAILABLE'] = False
|
|||||||
if os.getenv('FLASK_SERVER_NAME'):
|
if os.getenv('FLASK_SERVER_NAME'):
|
||||||
app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME')
|
app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME')
|
||||||
|
|
||||||
|
# Babel/i18n configuration
|
||||||
|
app.config['BABEL_TRANSLATION_DIRECTORIES'] = str(Path(__file__).parent / 'translations')
|
||||||
|
app.config['BABEL_DEFAULT_LOCALE'] = 'en_GB'
|
||||||
|
|
||||||
#app.config["EXPLAIN_TEMPLATE_LOADING"] = True
|
#app.config["EXPLAIN_TEMPLATE_LOADING"] = True
|
||||||
|
|
||||||
|
|
||||||
@@ -395,13 +400,9 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
def get_locale():
|
def get_locale():
|
||||||
# 1. Try to get locale from session (user explicitly selected)
|
# 1. Try to get locale from session (user explicitly selected)
|
||||||
if 'locale' in session:
|
if 'locale' in session:
|
||||||
locale = session['locale']
|
return session['locale']
|
||||||
logger.trace(f"DEBUG: get_locale() returning from session: {locale}")
|
|
||||||
return locale
|
|
||||||
# 2. Fall back to Accept-Language header
|
# 2. Fall back to Accept-Language header
|
||||||
locale = request.accept_languages.best_match(language_codes)
|
return request.accept_languages.best_match(language_codes)
|
||||||
logger.trace(f"DEBUG: get_locale() returning from Accept-Language: {locale}")
|
|
||||||
return locale
|
|
||||||
|
|
||||||
# Initialize Babel with locale selector
|
# Initialize Babel with locale selector
|
||||||
babel = Babel(app, locale_selector=get_locale)
|
babel = Babel(app, locale_selector=get_locale)
|
||||||
@@ -518,9 +519,20 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
@app.route('/set-language/<locale>')
|
@app.route('/set-language/<locale>')
|
||||||
def set_language(locale):
|
def set_language(locale):
|
||||||
"""Set the user's preferred language in the session"""
|
"""Set the user's preferred language in the session"""
|
||||||
|
if not request.cookies:
|
||||||
|
logger.error("Cannot set language without session cookie")
|
||||||
|
flash("Cannot set language without session cookie", 'error')
|
||||||
|
return redirect(url_for('watchlist.index'))
|
||||||
|
|
||||||
# Validate the locale against available languages
|
# Validate the locale against available languages
|
||||||
if locale in language_codes:
|
if locale in language_codes:
|
||||||
session['locale'] = locale
|
session['locale'] = locale
|
||||||
|
|
||||||
|
# CRITICAL: Flask-Babel caches the locale in the request context (ctx.babel_locale)
|
||||||
|
# We must refresh to clear this cache so the new locale takes effect immediately
|
||||||
|
# This is especially important for tests where multiple requests happen rapidly
|
||||||
|
from flask_babel import refresh
|
||||||
|
refresh()
|
||||||
else:
|
else:
|
||||||
logger.error(f"Invalid locale {locale}, available: {language_codes}")
|
logger.error(f"Invalid locale {locale}, available: {language_codes}")
|
||||||
|
|
||||||
@@ -863,13 +875,13 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||||
|
|
||||||
# @todo handle ctrl break
|
# @todo handle ctrl break
|
||||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||||
threading.Thread(target=notification_runner).start()
|
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||||
|
|
||||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
# Check for new release version, but not when running in test/build or pytest
|
# Check for new release version, but not when running in test/build or pytest
|
||||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||||
threading.Thread(target=check_for_new_version).start()
|
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||||
|
|
||||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||||
# This avoids circular dependencies
|
# This avoids circular dependencies
|
||||||
|
|||||||
@@ -29,6 +29,9 @@ def get_timeago_locale(flask_locale):
|
|||||||
"""
|
"""
|
||||||
locale_map = {
|
locale_map = {
|
||||||
'zh': 'zh_CN', # Chinese Simplified
|
'zh': 'zh_CN', # Chinese Simplified
|
||||||
|
# timeago library just hasn't been updated to use the more modern locale naming convention, before BCP 47 / RFC 5646.
|
||||||
|
'zh_TW': 'zh_TW', # Chinese Traditional (timeago uses zh_TW)
|
||||||
|
'zh_Hant_TW': 'zh_TW', # Flask-Babel normalizes zh_TW to zh_Hant_TW, map back to timeago's zh_TW
|
||||||
'pt': 'pt_PT', # Portuguese (Portugal)
|
'pt': 'pt_PT', # Portuguese (Portugal)
|
||||||
'sv': 'sv_SE', # Swedish
|
'sv': 'sv_SE', # Swedish
|
||||||
'no': 'nb_NO', # Norwegian Bokmål
|
'no': 'nb_NO', # Norwegian Bokmål
|
||||||
@@ -53,7 +56,7 @@ LANGUAGE_DATA = {
|
|||||||
'it': {'flag': 'fi fi-it fis', 'name': 'Italiano'},
|
'it': {'flag': 'fi fi-it fis', 'name': 'Italiano'},
|
||||||
'ja': {'flag': 'fi fi-jp fis', 'name': '日本語'},
|
'ja': {'flag': 'fi fi-jp fis', 'name': '日本語'},
|
||||||
'zh': {'flag': 'fi fi-cn fis', 'name': '中文 (简体)'},
|
'zh': {'flag': 'fi fi-cn fis', 'name': '中文 (简体)'},
|
||||||
'zh_TW': {'flag': 'fi fi-tw fis', 'name': '繁體中文'},
|
'zh_Hant_TW': {'flag': 'fi fi-tw fis', 'name': '繁體中文'},
|
||||||
'ru': {'flag': 'fi fi-ru fis', 'name': 'Русский'},
|
'ru': {'flag': 'fi fi-ru fis', 'name': 'Русский'},
|
||||||
'pl': {'flag': 'fi fi-pl fis', 'name': 'Polski'},
|
'pl': {'flag': 'fi fi-pl fis', 'name': 'Polski'},
|
||||||
'nl': {'flag': 'fi fi-nl fis', 'name': 'Nederlands'},
|
'nl': {'flag': 'fi fi-nl fis', 'name': 'Nederlands'},
|
||||||
|
|||||||
@@ -10,54 +10,23 @@ from pathlib import Path
|
|||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from .. import jinja2_custom as safe_jinja
|
from .. import jinja2_custom as safe_jinja
|
||||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
|
||||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||||
|
|
||||||
|
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||||
|
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||||
|
|
||||||
def _brotli_compress_worker(conn, filepath, mode=None):
|
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||||
|
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||||
|
|
||||||
|
def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
|
||||||
"""
|
"""
|
||||||
Worker function to compress data with brotli in a separate process.
|
Save compressed data using native brotli.
|
||||||
This isolates memory - when process exits, OS reclaims all memory.
|
Testing shows no memory leak when using gc.collect() after compression.
|
||||||
|
|
||||||
Args:
|
|
||||||
conn: multiprocessing.Pipe connection to receive data
|
|
||||||
filepath: destination file path
|
|
||||||
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
|
||||||
"""
|
|
||||||
import brotli
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Receive data from parent process via pipe (avoids pickle overhead)
|
|
||||||
contents = conn.recv()
|
|
||||||
|
|
||||||
if mode is not None:
|
|
||||||
compressed_data = brotli.compress(contents, mode=mode)
|
|
||||||
else:
|
|
||||||
compressed_data = brotli.compress(contents)
|
|
||||||
|
|
||||||
with open(filepath, 'wb') as f:
|
|
||||||
f.write(compressed_data)
|
|
||||||
|
|
||||||
# Send success status back
|
|
||||||
conn.send(True)
|
|
||||||
# No need for explicit cleanup - process exit frees all memory
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Brotli compression worker failed: {e}")
|
|
||||||
conn.send(False)
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
|
||||||
def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_uncompressed=False):
|
|
||||||
"""
|
|
||||||
Save compressed data using subprocess to isolate memory.
|
|
||||||
Uses Pipe to avoid pickle overhead for large data.
|
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
contents: data to compress (str or bytes)
|
contents: data to compress (str or bytes)
|
||||||
filepath: destination file path
|
filepath: destination file path
|
||||||
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
||||||
timeout: subprocess timeout in seconds
|
|
||||||
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
|
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
@@ -67,88 +36,43 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
|||||||
Exception: if compression fails and fallback_uncompressed is False
|
Exception: if compression fails and fallback_uncompressed is False
|
||||||
"""
|
"""
|
||||||
import brotli
|
import brotli
|
||||||
import multiprocessing
|
import gc
|
||||||
import sys
|
|
||||||
|
|
||||||
# Ensure contents are bytes
|
# Ensure contents are bytes
|
||||||
if isinstance(contents, str):
|
if isinstance(contents, str):
|
||||||
contents = contents.encode('utf-8')
|
contents = contents.encode('utf-8')
|
||||||
|
|
||||||
# Use explicit spawn context for thread safety (avoids fork() with multi-threaded parent)
|
|
||||||
# Always use spawn - consistent behavior in tests and production
|
|
||||||
ctx = multiprocessing.get_context('spawn')
|
|
||||||
parent_conn, child_conn = ctx.Pipe()
|
|
||||||
|
|
||||||
# Run compression in subprocess using spawn (not fork)
|
|
||||||
proc = ctx.Process(target=_brotli_compress_worker, args=(child_conn, filepath, mode))
|
|
||||||
|
|
||||||
# Windows-safe: Set daemon=False explicitly to avoid issues with process cleanup
|
|
||||||
proc.daemon = False
|
|
||||||
proc.start()
|
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Send data to subprocess via pipe (avoids pickle)
|
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
||||||
parent_conn.send(contents)
|
|
||||||
|
|
||||||
# Wait for result with timeout
|
if mode is not None:
|
||||||
if parent_conn.poll(timeout):
|
compressed_data = brotli.compress(contents, mode=mode)
|
||||||
success = parent_conn.recv()
|
|
||||||
else:
|
else:
|
||||||
success = False
|
compressed_data = brotli.compress(contents)
|
||||||
logger.warning(f"Brotli compression subprocess timed out after {timeout}s")
|
|
||||||
# Graceful termination with platform-aware cleanup
|
|
||||||
try:
|
|
||||||
proc.terminate()
|
|
||||||
except Exception as term_error:
|
|
||||||
logger.debug(f"Process termination issue (may be normal on Windows): {term_error}")
|
|
||||||
|
|
||||||
parent_conn.close()
|
with open(filepath, 'wb') as f:
|
||||||
proc.join(timeout=5)
|
f.write(compressed_data)
|
||||||
|
|
||||||
# Force kill if still alive after graceful termination
|
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
||||||
if proc.is_alive():
|
|
||||||
try:
|
|
||||||
if sys.platform == 'win32':
|
|
||||||
# Windows: use kill() which is more forceful
|
|
||||||
proc.kill()
|
|
||||||
else:
|
|
||||||
# Unix: terminate() already sent SIGTERM, now try SIGKILL
|
|
||||||
proc.kill()
|
|
||||||
proc.join(timeout=2)
|
|
||||||
except Exception as kill_error:
|
|
||||||
logger.warning(f"Failed to kill brotli compression process: {kill_error}")
|
|
||||||
|
|
||||||
# Check if file was created successfully
|
# Force garbage collection to prevent memory buildup
|
||||||
if success and os.path.exists(filepath):
|
gc.collect()
|
||||||
return filepath
|
|
||||||
|
return filepath
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Brotli compression error: {e}")
|
logger.error(f"Brotli compression error: {e}")
|
||||||
try:
|
|
||||||
parent_conn.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
proc.terminate()
|
|
||||||
proc.join(timeout=2)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Compression failed
|
# Compression failed
|
||||||
if fallback_uncompressed:
|
if fallback_uncompressed:
|
||||||
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
|
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
|
||||||
fallback_path = filepath.replace('.br', '')
|
fallback_path = filepath.replace('.br', '')
|
||||||
with open(fallback_path, 'wb') as f:
|
with open(fallback_path, 'wb') as f:
|
||||||
f.write(contents)
|
f.write(contents)
|
||||||
return fallback_path
|
return fallback_path
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Brotli compression subprocess failed for {filepath}")
|
raise Exception(f"Brotli compression failed for {filepath}: {e}")
|
||||||
|
|
||||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
|
||||||
|
|
||||||
|
|
||||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
|
||||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
|
||||||
|
|
||||||
class model(watch_base):
|
class model(watch_base):
|
||||||
__newest_history_key = None
|
__newest_history_key = None
|
||||||
@@ -492,7 +416,6 @@ class model(watch_base):
|
|||||||
|
|
||||||
self.ensure_data_dir_exists()
|
self.ensure_data_dir_exists()
|
||||||
|
|
||||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
|
||||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||||
|
|
||||||
# Binary data - detect file type and save without compression
|
# Binary data - detect file type and save without compression
|
||||||
@@ -516,7 +439,7 @@ class model(watch_base):
|
|||||||
|
|
||||||
# Text data - use brotli compression if enabled and above threshold
|
# Text data - use brotli compression if enabled and above threshold
|
||||||
else:
|
else:
|
||||||
if not skip_brotli and len(contents) > threshold:
|
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
|
||||||
# Compressed text
|
# Compressed text
|
||||||
import brotli
|
import brotli
|
||||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||||
@@ -524,7 +447,7 @@ class model(watch_base):
|
|||||||
|
|
||||||
if not os.path.exists(dest):
|
if not os.path.exists(dest):
|
||||||
try:
|
try:
|
||||||
actual_dest = _brotli_subprocess_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
|
actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
|
||||||
if actual_dest != dest:
|
if actual_dest != dest:
|
||||||
snapshot_fname = os.path.basename(actual_dest)
|
snapshot_fname = os.path.basename(actual_dest)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -950,13 +873,13 @@ class model(watch_base):
|
|||||||
def save_last_text_fetched_before_filters(self, contents):
|
def save_last_text_fetched_before_filters(self, contents):
|
||||||
import brotli
|
import brotli
|
||||||
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
|
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
|
||||||
_brotli_subprocess_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
|
_brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
|
||||||
|
|
||||||
def save_last_fetched_html(self, timestamp, contents):
|
def save_last_fetched_html(self, timestamp, contents):
|
||||||
self.ensure_data_dir_exists()
|
self.ensure_data_dir_exists()
|
||||||
snapshot_fname = f"{timestamp}.html.br"
|
snapshot_fname = f"{timestamp}.html.br"
|
||||||
filepath = os.path.join(self.watch_data_dir, snapshot_fname)
|
filepath = os.path.join(self.watch_data_dir, snapshot_fname)
|
||||||
_brotli_subprocess_save(contents, filepath, mode=None, fallback_uncompressed=True)
|
_brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
|
||||||
self._prune_last_fetched_html_snapshots()
|
self._prune_last_fetched_html_snapshots()
|
||||||
|
|
||||||
def get_fetched_html(self, timestamp):
|
def get_fetched_html(self, timestamp):
|
||||||
|
|||||||
@@ -13,14 +13,9 @@ Research: https://github.com/libvips/pyvips/issues/234
|
|||||||
|
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
|
||||||
# CRITICAL: Use 'spawn' instead of 'fork' to avoid inheriting parent's
|
# CRITICAL: Use 'spawn' context instead of 'fork' to avoid inheriting parent's
|
||||||
# LibVIPS threading state which can cause hangs in gaussblur operations
|
# LibVIPS threading state which can cause hangs in gaussblur operations
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
try:
|
|
||||||
multiprocessing.set_start_method('spawn', force=False)
|
|
||||||
except RuntimeError:
|
|
||||||
# Already set, ignore
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
|
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
|
||||||
@@ -95,9 +90,10 @@ def generate_diff_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
Returns:
|
Returns:
|
||||||
bytes: JPEG diff image or None on failure
|
bytes: JPEG diff image or None on failure
|
||||||
"""
|
"""
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
p = multiprocessing.Process(
|
p = ctx.Process(
|
||||||
target=_worker_generate_diff,
|
target=_worker_generate_diff,
|
||||||
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
|
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
|
||||||
)
|
)
|
||||||
@@ -140,7 +136,8 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
|
|||||||
Returns:
|
Returns:
|
||||||
float: Change percentage
|
float: Change percentage
|
||||||
"""
|
"""
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
def _worker_calculate(conn):
|
def _worker_calculate(conn):
|
||||||
try:
|
try:
|
||||||
@@ -185,7 +182,7 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
p = multiprocessing.Process(target=_worker_calculate, args=(child_conn,))
|
p = ctx.Process(target=_worker_calculate, args=(child_conn,))
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
result = 0.0
|
result = 0.0
|
||||||
@@ -233,7 +230,8 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
tuple: (changed_detected, change_percentage)
|
tuple: (changed_detected, change_percentage)
|
||||||
"""
|
"""
|
||||||
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
|
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
def _worker_compare(conn):
|
def _worker_compare(conn):
|
||||||
try:
|
try:
|
||||||
@@ -301,7 +299,7 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
p = multiprocessing.Process(target=_worker_compare, args=(child_conn,))
|
p = ctx.Process(target=_worker_compare, args=(child_conn,))
|
||||||
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
|
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
|
||||||
p.start()
|
p.start()
|
||||||
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)
|
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)
|
||||||
|
|||||||
@@ -86,6 +86,7 @@ class RecheckPriorityQueue:
|
|||||||
|
|
||||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||||
"""Thread-safe sync get with priority ordering"""
|
"""Thread-safe sync get with priority ordering"""
|
||||||
|
import queue
|
||||||
try:
|
try:
|
||||||
# Wait for notification
|
# Wait for notification
|
||||||
self.sync_q.get(block=block, timeout=timeout)
|
self.sync_q.get(block=block, timeout=timeout)
|
||||||
@@ -103,8 +104,11 @@ class RecheckPriorityQueue:
|
|||||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||||
return item
|
return item
|
||||||
|
|
||||||
|
except queue.Empty:
|
||||||
|
# Queue is empty with timeout - expected behavior, re-raise without logging
|
||||||
|
raise
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
# Re-raise without logging - caller (worker) will handle and log appropriately
|
||||||
raise
|
raise
|
||||||
|
|
||||||
# ASYNC INTERFACE (for workers)
|
# ASYNC INTERFACE (for workers)
|
||||||
|
|||||||
@@ -98,11 +98,12 @@ pytest -vv -s --maxfail=1 tests/test_rss.py
|
|||||||
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
||||||
|
|
||||||
# Try high concurrency
|
# Try high concurrency
|
||||||
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
||||||
|
|
||||||
# Check file:// will pickup a file when enabled
|
# Check file:// will pickup a file when enabled
|
||||||
echo "Hello world" > /tmp/test-file.txt
|
echo "Hello world" > /tmp/test-file.txt
|
||||||
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
|
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
|
||||||
|
|
||||||
|
|
||||||
|
# Run it again so that brotli kicks in
|
||||||
|
TEST_WITH_BROTLI=1 SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=100 FETCH_WORKERS=20 pytest tests/test_history_consistency.py -vv -l -s
|
||||||
|
|||||||
@@ -76,7 +76,7 @@ $(document).ready(function () {
|
|||||||
|
|
||||||
// Cache DOM elements for performance
|
// Cache DOM elements for performance
|
||||||
const queueBubble = document.getElementById('queue-bubble');
|
const queueBubble = document.getElementById('queue-bubble');
|
||||||
|
const queueSizePagerInfoText = document.getElementById('queue-size-int');
|
||||||
// Only try to connect if authentication isn't required or user is authenticated
|
// Only try to connect if authentication isn't required or user is authenticated
|
||||||
// The 'is_authenticated' variable will be set in the template
|
// The 'is_authenticated' variable will be set in the template
|
||||||
if (typeof is_authenticated !== 'undefined' ? is_authenticated : true) {
|
if (typeof is_authenticated !== 'undefined' ? is_authenticated : true) {
|
||||||
@@ -118,6 +118,10 @@ $(document).ready(function () {
|
|||||||
|
|
||||||
socket.on('queue_size', function (data) {
|
socket.on('queue_size', function (data) {
|
||||||
console.log(`${data.event_timestamp} - Queue size update: ${data.q_length}`);
|
console.log(`${data.event_timestamp} - Queue size update: ${data.q_length}`);
|
||||||
|
if(queueSizePagerInfoText) {
|
||||||
|
queueSizePagerInfoText.textContent = parseInt(data.q_length).toLocaleString() || 'None';
|
||||||
|
}
|
||||||
|
document.body.classList.toggle('has-queue', parseInt(data.q_length) > 0);
|
||||||
|
|
||||||
// Update queue bubble in action sidebar
|
// Update queue bubble in action sidebar
|
||||||
//if (queueBubble) {
|
//if (queueBubble) {
|
||||||
|
|||||||
@@ -53,7 +53,7 @@
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
.language-modal {
|
#language-modal {
|
||||||
.language-list {
|
.language-list {
|
||||||
.lang-option {
|
.lang-option {
|
||||||
display: inline-block;
|
display: inline-block;
|
||||||
|
|||||||
@@ -1,6 +1,4 @@
|
|||||||
.pagination-page-info {
|
.pagination-page-info {
|
||||||
color: #fff;
|
|
||||||
font-size: 0.85rem;
|
|
||||||
text-transform: capitalize;
|
text-transform: capitalize;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,4 +1,32 @@
|
|||||||
/* table related */
|
/* table related */
|
||||||
|
#stats_row {
|
||||||
|
display: flex;
|
||||||
|
align-items: center;
|
||||||
|
width: 100%;
|
||||||
|
color: #fff;
|
||||||
|
font-size: 0.85rem;
|
||||||
|
>* {
|
||||||
|
padding-bottom: 0.5rem;
|
||||||
|
}
|
||||||
|
.left {
|
||||||
|
text-align: left;
|
||||||
|
}
|
||||||
|
|
||||||
|
.right {
|
||||||
|
opacity: 0.5;
|
||||||
|
transition: opacity 0.6s ease;
|
||||||
|
margin-left: auto; /* pushes it to the far right */
|
||||||
|
text-align: right;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
body.has-queue {
|
||||||
|
#stats_row {
|
||||||
|
.right {
|
||||||
|
opacity: 1.0;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
.watch-table {
|
.watch-table {
|
||||||
width: 100%;
|
width: 100%;
|
||||||
font-size: 80%;
|
font-size: 80%;
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -186,7 +186,7 @@ class ChangeDetectionStore:
|
|||||||
# Finally start the thread that will manage periodic data saves to JSON
|
# Finally start the thread that will manage periodic data saves to JSON
|
||||||
# Only start if thread is not already running (reload_state might be called multiple times)
|
# Only start if thread is not already running (reload_state might be called multiple times)
|
||||||
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
||||||
self.save_data_thread = threading.Thread(target=self.save_datastore)
|
self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
|
||||||
self.save_data_thread.start()
|
self.save_data_thread.start()
|
||||||
|
|
||||||
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
||||||
|
|||||||
@@ -17,6 +17,8 @@ _MAP = {
|
|||||||
|
|
||||||
|
|
||||||
def strtobool(value):
|
def strtobool(value):
|
||||||
|
if not value:
|
||||||
|
return False
|
||||||
try:
|
try:
|
||||||
return _MAP[str(value).lower()]
|
return _MAP[str(value).lower()]
|
||||||
except KeyError:
|
except KeyError:
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
<!DOCTYPE html>
|
<!DOCTYPE html>
|
||||||
<html lang="{{ get_locale() }}" data-darkmode="{{ get_darkmode_state() }}">
|
<html lang="{{ get_locale()|replace('_', '-') }}" data-darkmode="{{ get_darkmode_state() }}">
|
||||||
|
|
||||||
<head>
|
<head>
|
||||||
<meta charset="utf-8" >
|
<meta charset="utf-8" >
|
||||||
|
|||||||
@@ -2,6 +2,7 @@
|
|||||||
import psutil
|
import psutil
|
||||||
import time
|
import time
|
||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
import arrow
|
import arrow
|
||||||
@@ -97,6 +98,34 @@ def cleanup(datastore_path):
|
|||||||
if os.path.isfile(f):
|
if os.path.isfile(f):
|
||||||
os.unlink(f)
|
os.unlink(f)
|
||||||
|
|
||||||
|
def pytest_configure(config):
|
||||||
|
"""Configure pytest environment before tests run.
|
||||||
|
|
||||||
|
CRITICAL: Set multiprocessing start method to 'fork' for Python 3.14+ compatibility.
|
||||||
|
|
||||||
|
Python 3.14 changed the default start method from 'fork' to 'forkserver' on Linux.
|
||||||
|
The forkserver method requires all objects to be picklable, but pytest-flask's
|
||||||
|
LiveServer uses nested functions that can't be pickled.
|
||||||
|
|
||||||
|
Setting 'fork' explicitly:
|
||||||
|
- Maintains compatibility with Python 3.10-3.13 (where 'fork' was already default)
|
||||||
|
- Fixes Python 3.14 pickling errors
|
||||||
|
- Only affects Unix-like systems (Windows uses 'spawn' regardless)
|
||||||
|
|
||||||
|
See: https://github.com/python/cpython/issues/126831
|
||||||
|
See: https://docs.python.org/3/whatsnew/3.14.html
|
||||||
|
"""
|
||||||
|
# Only set if not already set (respects existing configuration)
|
||||||
|
if multiprocessing.get_start_method(allow_none=True) is None:
|
||||||
|
try:
|
||||||
|
# 'fork' is available on Unix-like systems (Linux, macOS)
|
||||||
|
# On Windows, this will have no effect as 'spawn' is the only option
|
||||||
|
multiprocessing.set_start_method('fork', force=False)
|
||||||
|
logger.debug("Set multiprocessing start method to 'fork' for Python 3.14+ compatibility")
|
||||||
|
except (ValueError, RuntimeError):
|
||||||
|
# Already set, not available on this platform, or context already created
|
||||||
|
pass
|
||||||
|
|
||||||
def pytest_addoption(parser):
|
def pytest_addoption(parser):
|
||||||
"""Add custom command-line options for pytest.
|
"""Add custom command-line options for pytest.
|
||||||
|
|
||||||
@@ -270,3 +299,6 @@ def app(request, datastore_path):
|
|||||||
|
|
||||||
request.addfinalizer(teardown)
|
request.addfinalizer(teardown)
|
||||||
yield app
|
yield app
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -206,11 +206,10 @@ def test_regex_error_handling(client, live_server, measure_memory_usage, datasto
|
|||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
time.sleep(0.2)
|
||||||
|
|
||||||
### test regex error handling
|
### test regex error handling
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
data={"extract_text": '/something bad\d{3/XYZ',
|
data={"extract_text": '/something bad\d{3/XYZ',
|
||||||
"url": test_url,
|
"url": test_url,
|
||||||
"fetch_backend": "html_requests",
|
"fetch_backend": "html_requests",
|
||||||
|
|||||||
@@ -4,25 +4,47 @@ import time
|
|||||||
import os
|
import os
|
||||||
import json
|
import json
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
from loguru import logger
|
||||||
|
from .. import strtobool
|
||||||
from .util import wait_for_all_checks, delete_all_watches
|
from .util import wait_for_all_checks, delete_all_watches
|
||||||
from urllib.parse import urlparse, parse_qs
|
import brotli
|
||||||
|
|
||||||
|
|
||||||
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
|
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
|
||||||
# live_server_setup(live_server) # Setup on conftest per function
|
|
||||||
workers = int(os.getenv("FETCH_WORKERS", 10))
|
|
||||||
r = range(1, 10+workers)
|
|
||||||
|
|
||||||
for one in r:
|
uuids = set()
|
||||||
test_url = url_for('test_endpoint', content_type="text/html", content=str(one), _external=True)
|
sys_fetch_workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||||
res = client.post(
|
workers = range(1, sys_fetch_workers)
|
||||||
url_for("imports.import_page"),
|
now = time.time()
|
||||||
data={"urls": test_url},
|
|
||||||
follow_redirects=True
|
|
||||||
)
|
|
||||||
|
|
||||||
assert b"1 Imported" in res.data
|
for one in workers:
|
||||||
|
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||||
|
# A very long string that WILL trigger Brotli compression of the snapshot
|
||||||
|
# BROTLI_COMPRESS_SIZE_THRESHOLD should be set to say 200
|
||||||
|
from ..model.Watch import BROTLI_COMPRESS_SIZE_THRESHOLD
|
||||||
|
content = str(one) + "x" + str(one) * (BROTLI_COMPRESS_SIZE_THRESHOLD + 10)
|
||||||
|
else:
|
||||||
|
# Just enough to test datastore
|
||||||
|
content = str(one)+'x'
|
||||||
|
|
||||||
|
test_url = url_for('test_endpoint', content_type="text/html", content=content, _external=True)
|
||||||
|
uuids.add(client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'title': str(one)}))
|
||||||
|
|
||||||
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
duration = time.time() - now
|
||||||
|
per_worker = duration/sys_fetch_workers
|
||||||
|
if sys_fetch_workers < 20:
|
||||||
|
per_worker_threshold=0.6
|
||||||
|
elif sys_fetch_workers < 50:
|
||||||
|
per_worker_threshold = 0.8
|
||||||
|
else:
|
||||||
|
per_worker_threshold = 1.5
|
||||||
|
|
||||||
|
logger.debug(f"All fetched in {duration:.2f}s, {per_worker}s per worker")
|
||||||
|
# Problematic on github
|
||||||
|
#assert per_worker < per_worker_threshold, f"If concurrency is working good, no blocking async problems, each worker ({sys_fetch_workers} workers) should have done his job in under {per_worker_threshold}s, got {per_worker:.2f}s per worker, total duration was {duration:.2f}s"
|
||||||
|
|
||||||
# Essentially just triggers the DB write/update
|
# Essentially just triggers the DB write/update
|
||||||
res = client.post(
|
res = client.post(
|
||||||
@@ -34,7 +56,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
|||||||
)
|
)
|
||||||
assert b"Settings updated." in res.data
|
assert b"Settings updated." in res.data
|
||||||
|
|
||||||
|
# Wait for the sync DB save to happen
|
||||||
time.sleep(2)
|
time.sleep(2)
|
||||||
|
|
||||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||||
@@ -44,14 +66,18 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
|||||||
json_obj = json.load(f)
|
json_obj = json.load(f)
|
||||||
|
|
||||||
# assert the right amount of watches was found in the JSON
|
# assert the right amount of watches was found in the JSON
|
||||||
assert len(json_obj['watching']) == len(r), "Correct number of watches was found in the JSON"
|
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
|
||||||
i=0
|
|
||||||
|
i = 0
|
||||||
# each one should have a history.txt containing just one line
|
# each one should have a history.txt containing just one line
|
||||||
for w in json_obj['watching'].keys():
|
for w in json_obj['watching'].keys():
|
||||||
i+=1
|
i += 1
|
||||||
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
|
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
|
||||||
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
|
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
|
||||||
|
|
||||||
|
# Should be no errors (could be from brotli etc)
|
||||||
|
assert not live_server.app.config['DATASTORE'].data['watching'][w].get('last_error')
|
||||||
|
|
||||||
# Same like in model.Watch
|
# Same like in model.Watch
|
||||||
with open(history_txt_index_file, "r") as f:
|
with open(history_txt_index_file, "r") as f:
|
||||||
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
|
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
|
||||||
@@ -63,15 +89,21 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
|||||||
# Find the snapshot one
|
# Find the snapshot one
|
||||||
for fname in files_in_watch_dir:
|
for fname in files_in_watch_dir:
|
||||||
if fname != 'history.txt' and 'html' not in fname:
|
if fname != 'history.txt' and 'html' not in fname:
|
||||||
|
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||||
|
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
|
||||||
|
|
||||||
|
full_snapshot_history_path = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname)
|
||||||
# contents should match what we requested as content returned from the test url
|
# contents should match what we requested as content returned from the test url
|
||||||
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f:
|
if fname.endswith('.br'):
|
||||||
contents = snapshot_f.read()
|
with open(full_snapshot_history_path, 'rb') as f:
|
||||||
watch_url = json_obj['watching'][w]['url']
|
contents = brotli.decompress(f.read()).decode('utf-8')
|
||||||
u = urlparse(watch_url)
|
else:
|
||||||
q = parse_qs(u[4])
|
with open(full_snapshot_history_path, 'r') as snapshot_f:
|
||||||
assert q['content'][0] == contents.strip(), f"Snapshot file {fname} should contain {q['content'][0]}"
|
contents = snapshot_f.read()
|
||||||
|
|
||||||
|
|
||||||
|
watch_title = json_obj['watching'][w]['title']
|
||||||
|
assert json_obj['watching'][w]['title'], "Watch should have a title set"
|
||||||
|
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
|
||||||
|
|
||||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,71 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
|
||||||
|
|
||||||
|
def test_zh_TW(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
import time
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|
||||||
|
# Be sure we got a session cookie
|
||||||
|
res = client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="zh_Hant_TW"), # Traditional
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
# HTML follows BCP 47 language tag rules, not underscore-based locale formats.
|
||||||
|
assert b'<html lang="zh-Hant-TW"' in res.data
|
||||||
|
assert b'Cannot set language without session cookie' not in res.data
|
||||||
|
assert '選擇語言'.encode() in res.data
|
||||||
|
|
||||||
|
# Check second set works
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="en_GB"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b'Cannot set language without session cookie' not in res.data
|
||||||
|
res = client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
assert b"Select Language" in res.data, "Second set of language worked"
|
||||||
|
|
||||||
|
# Check arbitration between zh_Hant_TW<->zh
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="zh"), # Simplified chinese
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
res = client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
assert "选择语言".encode() in res.data, "Simplified chinese worked and it means the flask-babel cache worked"
|
||||||
|
|
||||||
|
|
||||||
|
# timeago library just hasn't been updated to use the more modern locale naming convention, before BCP 47 / RFC 5646.
|
||||||
|
# The Python timeago library (https://github.com/hustcc/timeago) supports 48 locales but uses different naming conventions than Flask-Babel.
|
||||||
|
def test_zh_Hant_TW_timeago_integration():
|
||||||
|
"""Test that zh_Hant_TW mapping works and timeago renders Traditional Chinese correctly"""
|
||||||
|
import timeago
|
||||||
|
from datetime import datetime, timedelta
|
||||||
|
from changedetectionio.languages import get_timeago_locale
|
||||||
|
|
||||||
|
# 1. Test the mapping
|
||||||
|
mapped_locale = get_timeago_locale('zh_Hant_TW')
|
||||||
|
assert mapped_locale == 'zh_TW', "zh_Hant_TW should map to timeago's zh_TW"
|
||||||
|
assert get_timeago_locale('zh_TW') == 'zh_TW', "zh_TW should also map to zh_TW"
|
||||||
|
|
||||||
|
# 2. Test timeago library renders Traditional Chinese with the mapped locale
|
||||||
|
now = datetime.now()
|
||||||
|
|
||||||
|
# Test various time periods with Traditional Chinese strings
|
||||||
|
result_15s = timeago.format(now - timedelta(seconds=15), now, mapped_locale)
|
||||||
|
assert '秒前' in result_15s, f"Expected '秒前' in '{result_15s}'"
|
||||||
|
|
||||||
|
result_5m = timeago.format(now - timedelta(minutes=5), now, mapped_locale)
|
||||||
|
assert '分鐘前' in result_5m, f"Expected '分鐘前' in '{result_5m}'"
|
||||||
|
|
||||||
|
result_2h = timeago.format(now - timedelta(hours=2), now, mapped_locale)
|
||||||
|
assert '小時前' in result_2h, f"Expected '小時前' in '{result_2h}'"
|
||||||
|
|
||||||
|
result_3d = timeago.format(now - timedelta(days=3), now, mapped_locale)
|
||||||
|
assert '天前' in result_3d, f"Expected '天前' in '{result_3d}'"
|
||||||
|
|
||||||
|
|
||||||
def test_language_switching(client, live_server, measure_memory_usage, datastore_path):
|
def test_language_switching(client, live_server, measure_memory_usage, datastore_path):
|
||||||
@@ -13,6 +77,9 @@ def test_language_switching(client, live_server, measure_memory_usage, datastore
|
|||||||
3. Switch back to English and verify English text appears
|
3. Switch back to English and verify English text appears
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Establish session cookie
|
||||||
|
client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
|
||||||
# Step 1: Set the language to Italian using the /set-language endpoint
|
# Step 1: Set the language to Italian using the /set-language endpoint
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("set_language", locale="it"),
|
url_for("set_language", locale="it"),
|
||||||
@@ -61,6 +128,9 @@ def test_invalid_locale(client, live_server, measure_memory_usage, datastore_pat
|
|||||||
The app should ignore invalid locales and continue working.
|
The app should ignore invalid locales and continue working.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Establish session cookie
|
||||||
|
client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
|
||||||
# First set to English
|
# First set to English
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("set_language", locale="en"),
|
url_for("set_language", locale="en"),
|
||||||
@@ -93,6 +163,9 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
|
|||||||
within the same session.
|
within the same session.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
# Establish session cookie
|
||||||
|
client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
|
||||||
# Set language to Italian
|
# Set language to Italian
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("set_language", locale="it"),
|
url_for("set_language", locale="it"),
|
||||||
@@ -119,6 +192,9 @@ def test_set_language_with_redirect(client, live_server, measure_memory_usage, d
|
|||||||
"""
|
"""
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
|
|
||||||
|
# Establish session cookie
|
||||||
|
client.get(url_for("watchlist.index"), follow_redirects=True)
|
||||||
|
|
||||||
# Set language with a redirect parameter (simulating language change from /settings)
|
# Set language with a redirect parameter (simulating language change from /settings)
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("set_language", locale="de", redirect="/settings"),
|
url_for("set_language", locale="de", redirect="/settings"),
|
||||||
|
|||||||
@@ -25,12 +25,13 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage,
|
|||||||
|
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|
||||||
res = client.post(
|
|
||||||
url_for("ui.ui_views.form_quick_watch_add"),
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
data={"url": test_url, "tags": ''},
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
follow_redirects=True
|
assert b'Queued 1 watch for rechecking.' in res.data
|
||||||
)
|
|
||||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
data={
|
data={
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
from .util import live_server_setup, wait_for_all_checks, wait_for_watch_history, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||||
import os
|
import os
|
||||||
|
|
||||||
|
|
||||||
@@ -87,6 +87,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
|||||||
# Wait for initial checks to complete
|
# Wait for initial checks to complete
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Ensure initial snapshots are saved
|
||||||
|
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Watches did not save initial snapshots"
|
||||||
|
|
||||||
# Trigger a change
|
# Trigger a change
|
||||||
set_modified_response(datastore_path=datastore_path)
|
set_modified_response(datastore_path=datastore_path)
|
||||||
|
|
||||||
@@ -94,6 +97,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
|||||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Ensure all watches have sufficient history for RSS generation
|
||||||
|
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "Watches did not accumulate sufficient history"
|
||||||
|
|
||||||
# Get RSS token
|
# Get RSS token
|
||||||
rss_token = extract_rss_token_from_UI(client)
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
assert rss_token is not None
|
assert rss_token is not None
|
||||||
@@ -216,11 +222,13 @@ def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, data
|
|||||||
assert b"Watch added" in res.data
|
assert b"Watch added" in res.data
|
||||||
|
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Initial snapshots not saved"
|
||||||
|
|
||||||
# Trigger changes
|
# Trigger changes
|
||||||
set_modified_response(datastore_path=datastore_path)
|
set_modified_response(datastore_path=datastore_path)
|
||||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "History not accumulated"
|
||||||
|
|
||||||
# Get RSS token
|
# Get RSS token
|
||||||
rss_token = extract_rss_token_from_UI(client)
|
rss_token = extract_rss_token_from_UI(client)
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
import sys
|
import sys
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from changedetectionio import html_tools
|
||||||
|
|
||||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||||
import html_tools
|
|
||||||
|
|
||||||
# test generation guide.
|
# test generation guide.
|
||||||
# 1. Do not include encoding in the xml declaration if the test object is a str type.
|
# 1. Do not include encoding in the xml declaration if the test object is a str type.
|
||||||
|
|||||||
@@ -164,14 +164,45 @@ def wait_for_all_checks(client=None):
|
|||||||
if q_length == 0 and not any_workers_busy:
|
if q_length == 0 and not any_workers_busy:
|
||||||
if empty_since is None:
|
if empty_since is None:
|
||||||
empty_since = time.time()
|
empty_since = time.time()
|
||||||
elif time.time() - empty_since >= 0.15: # Shorter wait
|
# Brief stabilization period for async workers
|
||||||
|
elif time.time() - empty_since >= 0.3:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
empty_since = None
|
empty_since = None
|
||||||
|
|
||||||
attempt += 1
|
attempt += 1
|
||||||
time.sleep(0.3)
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||||
|
"""
|
||||||
|
Wait for watches to have sufficient history entries.
|
||||||
|
Useful after wait_for_all_checks() when you need to ensure history is populated.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
client: Test client with access to datastore
|
||||||
|
min_history_count: Minimum number of history entries required
|
||||||
|
timeout: Maximum time to wait in seconds
|
||||||
|
"""
|
||||||
|
datastore = client.application.config.get('DATASTORE')
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
while time.time() - start_time < timeout:
|
||||||
|
all_have_history = True
|
||||||
|
for uuid, watch in datastore.data['watching'].items():
|
||||||
|
history_count = len(watch.history.keys())
|
||||||
|
if history_count < min_history_count:
|
||||||
|
all_have_history = False
|
||||||
|
break
|
||||||
|
|
||||||
|
if all_have_history:
|
||||||
|
return True
|
||||||
|
|
||||||
|
time.sleep(0.2)
|
||||||
|
|
||||||
|
# Timeout - return False
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
# Replaced by new_live_server_setup and calling per function scope in conftest.py
|
# Replaced by new_live_server_setup and calling per function scope in conftest.py
|
||||||
def live_server_setup(live_server):
|
def live_server_setup(live_server):
|
||||||
return True
|
return True
|
||||||
@@ -189,6 +220,8 @@ def new_live_server_setup(live_server):
|
|||||||
|
|
||||||
@live_server.app.route('/test-endpoint')
|
@live_server.app.route('/test-endpoint')
|
||||||
def test_endpoint():
|
def test_endpoint():
|
||||||
|
from loguru import logger
|
||||||
|
logger.debug(f"/test-endpoint hit {request}")
|
||||||
ctype = request.args.get('content_type')
|
ctype = request.args.get('content_type')
|
||||||
status_code = request.args.get('status_code')
|
status_code = request.args.get('status_code')
|
||||||
content = request.args.get('content') or None
|
content = request.args.get('content') or None
|
||||||
|
|||||||
@@ -144,7 +144,6 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
|
|||||||
|
|
||||||
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
|
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
|
||||||
|
|
||||||
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
|
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
|
||||||
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
|
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
|
||||||
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
|
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
|
||||||
@@ -186,3 +185,65 @@ def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_
|
|||||||
url_for("ui.form_delete", uuid="all"),
|
url_for("ui.form_delete", uuid="all"),
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def test_browsersteps_edit_UI_startsession(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
|
||||||
|
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
|
||||||
|
|
||||||
|
# Add a watch first
|
||||||
|
test_url = url_for('test_interactive_html_endpoint', _external=True)
|
||||||
|
test_url = test_url.replace('localhost.localdomain', 'cdio')
|
||||||
|
test_url = test_url.replace('localhost', 'cdio')
|
||||||
|
|
||||||
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'fetch_backend': 'html_webdriver', 'paused': True})
|
||||||
|
|
||||||
|
# Test starting a browsersteps session
|
||||||
|
res = client.get(
|
||||||
|
url_for("browser_steps.browsersteps_start_session", uuid=uuid),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert res.is_json
|
||||||
|
json_data = res.get_json()
|
||||||
|
assert 'browsersteps_session_id' in json_data
|
||||||
|
assert json_data['browsersteps_session_id'] # Not empty
|
||||||
|
|
||||||
|
browsersteps_session_id = json_data['browsersteps_session_id']
|
||||||
|
|
||||||
|
# Verify the session exists in browsersteps_sessions
|
||||||
|
from changedetectionio.blueprint.browser_steps import browsersteps_sessions, browsersteps_watch_to_session
|
||||||
|
assert browsersteps_session_id in browsersteps_sessions
|
||||||
|
assert uuid in browsersteps_watch_to_session
|
||||||
|
assert browsersteps_watch_to_session[uuid] == browsersteps_session_id
|
||||||
|
|
||||||
|
# Verify browsersteps UI shows up on edit page
|
||||||
|
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||||
|
assert b'browsersteps-click-start' in res.data, "Browsersteps manual UI shows up"
|
||||||
|
|
||||||
|
# Session should still exist after GET (not cleaned up yet)
|
||||||
|
assert browsersteps_session_id in browsersteps_sessions
|
||||||
|
assert uuid in browsersteps_watch_to_session
|
||||||
|
|
||||||
|
# Test cleanup happens on save (POST)
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
|
data={
|
||||||
|
"url": test_url,
|
||||||
|
"tags": "",
|
||||||
|
'fetch_backend': "html_webdriver",
|
||||||
|
"time_between_check_use_default": "y",
|
||||||
|
},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Updated watch" in res.data
|
||||||
|
|
||||||
|
# NOW verify the session was cleaned up after save
|
||||||
|
assert browsersteps_session_id not in browsersteps_sessions
|
||||||
|
assert uuid not in browsersteps_watch_to_session
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
client.get(
|
||||||
|
url_for("ui.form_delete", uuid="all"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|||||||
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Binary file not shown.
@@ -2,19 +2,18 @@
|
|||||||
Worker management module for changedetection.io
|
Worker management module for changedetection.io
|
||||||
|
|
||||||
Handles asynchronous workers for dynamic worker scaling.
|
Handles asynchronous workers for dynamic worker scaling.
|
||||||
Sync worker support has been removed in favor of async-only architecture.
|
Each worker runs in its own thread with its own event loop for isolation.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
import asyncio
|
import asyncio
|
||||||
import os
|
import os
|
||||||
import threading
|
import threading
|
||||||
import time
|
import time
|
||||||
|
from concurrent.futures import ThreadPoolExecutor
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
# Global worker state
|
# Global worker state - each worker has its own thread and event loop
|
||||||
running_async_tasks = []
|
worker_threads = [] # List of WorkerThread objects
|
||||||
async_loop = None
|
|
||||||
async_loop_thread = None
|
|
||||||
|
|
||||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||||
currently_processing_uuids = {}
|
currently_processing_uuids = {}
|
||||||
@@ -22,89 +21,118 @@ currently_processing_uuids = {}
|
|||||||
# Configuration - async workers only
|
# Configuration - async workers only
|
||||||
USE_ASYNC_WORKERS = True
|
USE_ASYNC_WORKERS = True
|
||||||
|
|
||||||
|
# Custom ThreadPoolExecutor for queue operations with named threads
|
||||||
|
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
|
||||||
|
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
|
||||||
|
queue_executor = ThreadPoolExecutor(
|
||||||
|
max_workers=_max_executor_workers,
|
||||||
|
thread_name_prefix="QueueGetter-"
|
||||||
|
)
|
||||||
|
|
||||||
def start_async_event_loop():
|
|
||||||
"""Start a dedicated event loop for async workers in a separate thread"""
|
class WorkerThread:
|
||||||
global async_loop
|
"""Container for a worker thread with its own event loop"""
|
||||||
logger.info("Starting async event loop for workers")
|
def __init__(self, worker_id, update_q, notification_q, app, datastore):
|
||||||
|
self.worker_id = worker_id
|
||||||
try:
|
self.update_q = update_q
|
||||||
# Create a new event loop for this thread
|
self.notification_q = notification_q
|
||||||
async_loop = asyncio.new_event_loop()
|
self.app = app
|
||||||
# Set it as the event loop for this thread
|
self.datastore = datastore
|
||||||
asyncio.set_event_loop(async_loop)
|
self.thread = None
|
||||||
|
self.loop = None
|
||||||
logger.debug(f"Event loop created and set: {async_loop}")
|
self.running = False
|
||||||
|
|
||||||
# Run the event loop forever
|
def run(self):
|
||||||
async_loop.run_forever()
|
"""Run the worker in its own event loop"""
|
||||||
except Exception as e:
|
try:
|
||||||
logger.error(f"Async event loop error: {e}")
|
# Create a new event loop for this thread
|
||||||
finally:
|
self.loop = asyncio.new_event_loop()
|
||||||
# Clean up
|
asyncio.set_event_loop(self.loop)
|
||||||
if async_loop and not async_loop.is_closed():
|
self.running = True
|
||||||
async_loop.close()
|
|
||||||
async_loop = None
|
# Run the worker coroutine
|
||||||
logger.info("Async event loop stopped")
|
self.loop.run_until_complete(
|
||||||
|
start_single_async_worker(
|
||||||
|
self.worker_id,
|
||||||
|
self.update_q,
|
||||||
|
self.notification_q,
|
||||||
|
self.app,
|
||||||
|
self.datastore,
|
||||||
|
queue_executor
|
||||||
|
)
|
||||||
|
)
|
||||||
|
except asyncio.CancelledError:
|
||||||
|
# Normal shutdown - worker was cancelled
|
||||||
|
import os
|
||||||
|
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
|
if not in_pytest:
|
||||||
|
logger.info(f"Worker {self.worker_id} shutting down gracefully")
|
||||||
|
except RuntimeError as e:
|
||||||
|
# Ignore expected shutdown errors
|
||||||
|
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
|
||||||
|
logger.error(f"Worker {self.worker_id} runtime error: {e}")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Worker {self.worker_id} thread error: {e}")
|
||||||
|
finally:
|
||||||
|
# Clean up
|
||||||
|
if self.loop and not self.loop.is_closed():
|
||||||
|
self.loop.close()
|
||||||
|
self.running = False
|
||||||
|
self.loop = None
|
||||||
|
|
||||||
|
def start(self):
|
||||||
|
"""Start the worker thread"""
|
||||||
|
self.thread = threading.Thread(
|
||||||
|
target=self.run,
|
||||||
|
daemon=True,
|
||||||
|
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
|
||||||
|
)
|
||||||
|
self.thread.start()
|
||||||
|
|
||||||
|
def stop(self):
|
||||||
|
"""Stop the worker thread"""
|
||||||
|
if self.loop and self.running:
|
||||||
|
try:
|
||||||
|
# Signal the loop to stop
|
||||||
|
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||||
|
except RuntimeError:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if self.thread and self.thread.is_alive():
|
||||||
|
self.thread.join(timeout=2.0)
|
||||||
|
|
||||||
|
|
||||||
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||||
"""Start the async worker management system"""
|
"""Start async workers, each with its own thread and event loop for isolation"""
|
||||||
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids
|
global worker_threads, currently_processing_uuids
|
||||||
|
|
||||||
# Clear any stale UUID tracking state
|
# Clear any stale state
|
||||||
currently_processing_uuids.clear()
|
currently_processing_uuids.clear()
|
||||||
|
|
||||||
# Start the event loop in a separate thread
|
# Start each worker in its own thread with its own event loop
|
||||||
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True)
|
logger.info(f"Starting {n_workers} async workers (isolated threads)")
|
||||||
async_loop_thread.start()
|
|
||||||
|
|
||||||
# Wait for the loop to be available (with timeout for safety)
|
|
||||||
max_wait_time = 5.0
|
|
||||||
wait_start = time.time()
|
|
||||||
while async_loop is None and (time.time() - wait_start) < max_wait_time:
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
if async_loop is None:
|
|
||||||
logger.error("Failed to start async event loop within timeout")
|
|
||||||
return
|
|
||||||
|
|
||||||
# Additional brief wait to ensure loop is running
|
|
||||||
time.sleep(0.2)
|
|
||||||
|
|
||||||
# Start async workers
|
|
||||||
logger.info(f"Starting {n_workers} async workers")
|
|
||||||
for i in range(n_workers):
|
for i in range(n_workers):
|
||||||
try:
|
try:
|
||||||
# Use a factory function to create named worker coroutines
|
worker = WorkerThread(i, update_q, notification_q, app, datastore)
|
||||||
def create_named_worker(worker_id):
|
worker.start()
|
||||||
async def named_worker():
|
worker_threads.append(worker)
|
||||||
task = asyncio.current_task()
|
# No sleep needed - threads start independently and asynchronously
|
||||||
if task:
|
except Exception as e:
|
||||||
task.set_name(f"async-worker-{worker_id}")
|
|
||||||
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
|
|
||||||
return named_worker()
|
|
||||||
|
|
||||||
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
|
|
||||||
running_async_tasks.append(task_future)
|
|
||||||
except RuntimeError as e:
|
|
||||||
logger.error(f"Failed to start async worker {i}: {e}")
|
logger.error(f"Failed to start async worker {i}: {e}")
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
|
||||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
|
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
|
||||||
"""Start a single async worker with auto-restart capability"""
|
"""Start a single async worker with auto-restart capability"""
|
||||||
from changedetectionio.async_update_worker import async_update_worker
|
from changedetectionio.async_update_worker import async_update_worker
|
||||||
|
|
||||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||||
import os
|
import os
|
||||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
try:
|
try:
|
||||||
if not in_pytest:
|
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||||
logger.info(f"Starting async worker {worker_id}")
|
|
||||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
|
|
||||||
# If we reach here, worker exited cleanly
|
# If we reach here, worker exited cleanly
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||||
@@ -131,39 +159,38 @@ def start_workers(n_workers, update_q, notification_q, app, datastore):
|
|||||||
|
|
||||||
def add_worker(update_q, notification_q, app, datastore):
|
def add_worker(update_q, notification_q, app, datastore):
|
||||||
"""Add a new async worker (for dynamic scaling)"""
|
"""Add a new async worker (for dynamic scaling)"""
|
||||||
global running_async_tasks
|
global worker_threads
|
||||||
|
|
||||||
if not async_loop:
|
worker_id = len(worker_threads)
|
||||||
logger.error("Async loop not running, cannot add worker")
|
|
||||||
return False
|
|
||||||
|
|
||||||
worker_id = len(running_async_tasks)
|
|
||||||
logger.info(f"Adding async worker {worker_id}")
|
logger.info(f"Adding async worker {worker_id}")
|
||||||
|
|
||||||
task_future = asyncio.run_coroutine_threadsafe(
|
try:
|
||||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
|
||||||
)
|
worker.start()
|
||||||
running_async_tasks.append(task_future)
|
worker_threads.append(worker)
|
||||||
return True
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Failed to add worker {worker_id}: {e}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def remove_worker():
|
def remove_worker():
|
||||||
"""Remove an async worker (for dynamic scaling)"""
|
"""Remove an async worker (for dynamic scaling)"""
|
||||||
global running_async_tasks
|
global worker_threads
|
||||||
|
|
||||||
if not running_async_tasks:
|
if not worker_threads:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Cancel the last worker
|
# Stop the last worker
|
||||||
task_future = running_async_tasks.pop()
|
worker = worker_threads.pop()
|
||||||
task_future.cancel()
|
worker.stop()
|
||||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
|
||||||
def get_worker_count():
|
def get_worker_count():
|
||||||
"""Get current number of async workers"""
|
"""Get current number of async workers"""
|
||||||
return len(running_async_tasks)
|
return len(worker_threads)
|
||||||
|
|
||||||
|
|
||||||
def get_running_uuids():
|
def get_running_uuids():
|
||||||
@@ -249,38 +276,21 @@ def queue_item_async_safe(update_q, item, silent=False):
|
|||||||
|
|
||||||
def shutdown_workers():
|
def shutdown_workers():
|
||||||
"""Shutdown all async workers fast and aggressively"""
|
"""Shutdown all async workers fast and aggressively"""
|
||||||
global async_loop, async_loop_thread, running_async_tasks
|
global worker_threads
|
||||||
|
|
||||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||||
import os
|
import os
|
||||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
|
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info("Fast shutdown of async workers initiated...")
|
logger.info("Fast shutdown of async workers initiated...")
|
||||||
|
|
||||||
# Cancel all async tasks immediately
|
# Stop all worker threads
|
||||||
for task_future in running_async_tasks:
|
for worker in worker_threads:
|
||||||
if not task_future.done():
|
worker.stop()
|
||||||
task_future.cancel()
|
|
||||||
|
worker_threads.clear()
|
||||||
# Stop the async event loop immediately
|
|
||||||
if async_loop and not async_loop.is_closed():
|
|
||||||
try:
|
|
||||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
|
||||||
except RuntimeError:
|
|
||||||
# Loop might already be stopped
|
|
||||||
pass
|
|
||||||
|
|
||||||
running_async_tasks.clear()
|
|
||||||
async_loop = None
|
|
||||||
|
|
||||||
# Give async thread minimal time to finish, then continue
|
|
||||||
if async_loop_thread and async_loop_thread.is_alive():
|
|
||||||
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
|
|
||||||
if async_loop_thread.is_alive() and not in_pytest:
|
|
||||||
logger.info("Async thread still running after timeout - continuing with shutdown")
|
|
||||||
async_loop_thread = None
|
|
||||||
|
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info("Async workers fast shutdown complete")
|
logger.info("Async workers fast shutdown complete")
|
||||||
|
|
||||||
@@ -290,69 +300,57 @@ def shutdown_workers():
|
|||||||
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||||
"""
|
"""
|
||||||
Dynamically adjust the number of async workers.
|
Dynamically adjust the number of async workers.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
new_count: Target number of workers
|
new_count: Target number of workers
|
||||||
update_q, notification_q, app, datastore: Required for adding new workers
|
update_q, notification_q, app, datastore: Required for adding new workers
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: Status of the adjustment operation
|
dict: Status of the adjustment operation
|
||||||
"""
|
"""
|
||||||
global running_async_tasks
|
global worker_threads
|
||||||
|
|
||||||
current_count = get_worker_count()
|
current_count = get_worker_count()
|
||||||
|
|
||||||
if new_count == current_count:
|
if new_count == current_count:
|
||||||
return {
|
return {
|
||||||
'status': 'no_change',
|
'status': 'no_change',
|
||||||
'message': f'Worker count already at {current_count}',
|
'message': f'Worker count already at {current_count}',
|
||||||
'current_count': current_count
|
'current_count': current_count
|
||||||
}
|
}
|
||||||
|
|
||||||
if new_count > current_count:
|
if new_count > current_count:
|
||||||
# Add workers
|
# Add workers
|
||||||
workers_to_add = new_count - current_count
|
workers_to_add = new_count - current_count
|
||||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||||
|
|
||||||
if not all([update_q, notification_q, app, datastore]):
|
if not all([update_q, notification_q, app, datastore]):
|
||||||
return {
|
return {
|
||||||
'status': 'error',
|
'status': 'error',
|
||||||
'message': 'Missing required parameters to add workers',
|
'message': 'Missing required parameters to add workers',
|
||||||
'current_count': current_count
|
'current_count': current_count
|
||||||
}
|
}
|
||||||
|
|
||||||
for i in range(workers_to_add):
|
for i in range(workers_to_add):
|
||||||
worker_id = len(running_async_tasks)
|
add_worker(update_q, notification_q, app, datastore)
|
||||||
task_future = asyncio.run_coroutine_threadsafe(
|
|
||||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
|
||||||
async_loop
|
|
||||||
)
|
|
||||||
running_async_tasks.append(task_future)
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 'success',
|
'status': 'success',
|
||||||
'message': f'Added {workers_to_add} workers',
|
'message': f'Added {workers_to_add} workers',
|
||||||
'previous_count': current_count,
|
'previous_count': current_count,
|
||||||
'current_count': new_count
|
'current_count': len(worker_threads)
|
||||||
}
|
}
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# Remove workers
|
# Remove workers
|
||||||
workers_to_remove = current_count - new_count
|
workers_to_remove = current_count - new_count
|
||||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||||
|
|
||||||
removed_count = 0
|
removed_count = 0
|
||||||
for _ in range(workers_to_remove):
|
for _ in range(workers_to_remove):
|
||||||
if running_async_tasks:
|
if remove_worker():
|
||||||
task_future = running_async_tasks.pop()
|
|
||||||
task_future.cancel()
|
|
||||||
# Wait for the task to actually stop
|
|
||||||
try:
|
|
||||||
task_future.result(timeout=5) # 5 second timeout
|
|
||||||
except Exception:
|
|
||||||
pass # Task was cancelled, which is expected
|
|
||||||
removed_count += 1
|
removed_count += 1
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 'success',
|
'status': 'success',
|
||||||
'message': f'Removed {removed_count} workers',
|
'message': f'Removed {removed_count} workers',
|
||||||
@@ -367,72 +365,58 @@ def get_worker_status():
|
|||||||
'worker_type': 'async',
|
'worker_type': 'async',
|
||||||
'worker_count': get_worker_count(),
|
'worker_count': get_worker_count(),
|
||||||
'running_uuids': get_running_uuids(),
|
'running_uuids': get_running_uuids(),
|
||||||
'async_loop_running': async_loop is not None,
|
'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||||
"""
|
"""
|
||||||
Check if the expected number of async workers are running and restart any missing ones.
|
Check if the expected number of async workers are running and restart any missing ones.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
expected_count: Expected number of workers
|
expected_count: Expected number of workers
|
||||||
update_q, notification_q, app, datastore: Required for restarting workers
|
update_q, notification_q, app, datastore: Required for restarting workers
|
||||||
|
|
||||||
Returns:
|
Returns:
|
||||||
dict: Health check results
|
dict: Health check results
|
||||||
"""
|
"""
|
||||||
global running_async_tasks
|
global worker_threads
|
||||||
|
|
||||||
current_count = get_worker_count()
|
current_count = get_worker_count()
|
||||||
|
|
||||||
if current_count == expected_count:
|
# Check which workers are actually alive
|
||||||
|
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
|
||||||
|
|
||||||
|
if alive_count == expected_count:
|
||||||
return {
|
return {
|
||||||
'status': 'healthy',
|
'status': 'healthy',
|
||||||
'expected_count': expected_count,
|
'expected_count': expected_count,
|
||||||
'actual_count': current_count,
|
'actual_count': alive_count,
|
||||||
'message': f'All {expected_count} async workers running'
|
'message': f'All {expected_count} async workers running'
|
||||||
}
|
}
|
||||||
|
|
||||||
# Check for crashed async workers
|
# Find dead workers
|
||||||
dead_workers = []
|
dead_workers = []
|
||||||
alive_count = 0
|
for i, worker in enumerate(worker_threads[:]):
|
||||||
|
if not worker.thread or not worker.thread.is_alive():
|
||||||
for i, task_future in enumerate(running_async_tasks[:]):
|
dead_workers.append(i)
|
||||||
if task_future.done():
|
logger.warning(f"Async worker {worker.worker_id} thread is dead")
|
||||||
try:
|
|
||||||
result = task_future.result()
|
|
||||||
dead_workers.append(i)
|
|
||||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
|
||||||
except Exception as e:
|
|
||||||
dead_workers.append(i)
|
|
||||||
logger.error(f"Async worker {i} crashed: {e}")
|
|
||||||
else:
|
|
||||||
alive_count += 1
|
|
||||||
|
|
||||||
# Remove dead workers from tracking
|
# Remove dead workers from tracking
|
||||||
for i in reversed(dead_workers):
|
for i in reversed(dead_workers):
|
||||||
if i < len(running_async_tasks):
|
if i < len(worker_threads):
|
||||||
running_async_tasks.pop(i)
|
worker_threads.pop(i)
|
||||||
|
|
||||||
missing_workers = expected_count - alive_count
|
missing_workers = expected_count - alive_count
|
||||||
restarted_count = 0
|
restarted_count = 0
|
||||||
|
|
||||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||||
|
|
||||||
for i in range(missing_workers):
|
for i in range(missing_workers):
|
||||||
worker_id = alive_count + i
|
if add_worker(update_q, notification_q, app, datastore):
|
||||||
try:
|
|
||||||
task_future = asyncio.run_coroutine_threadsafe(
|
|
||||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
|
||||||
async_loop
|
|
||||||
)
|
|
||||||
running_async_tasks.append(task_future)
|
|
||||||
restarted_count += 1
|
restarted_count += 1
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
|
||||||
|
|
||||||
return {
|
return {
|
||||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||||
'expected_count': expected_count,
|
'expected_count': expected_count,
|
||||||
|
|||||||
@@ -42,7 +42,7 @@ orjson~=3.11
|
|||||||
# jq not available on Windows so must be installed manually
|
# jq not available on Windows so must be installed manually
|
||||||
|
|
||||||
# Notification library
|
# Notification library
|
||||||
apprise==1.9.5
|
apprise==1.9.6
|
||||||
|
|
||||||
diff_match_patch
|
diff_match_patch
|
||||||
|
|
||||||
@@ -70,7 +70,7 @@ lxml >=4.8.0,!=5.2.0,!=5.2.1,<7
|
|||||||
|
|
||||||
# XPath 2.0-3.1 support - 4.2.0 had issues, 4.1.5 stable
|
# XPath 2.0-3.1 support - 4.2.0 had issues, 4.1.5 stable
|
||||||
# Consider updating to latest stable version periodically
|
# Consider updating to latest stable version periodically
|
||||||
elementpath==5.0.4
|
elementpath==5.1.0
|
||||||
|
|
||||||
# For fast image comparison in screenshot change detection
|
# For fast image comparison in screenshot change detection
|
||||||
# opencv-python-headless is OPTIONAL (excluded from requirements.txt)
|
# opencv-python-headless is OPTIONAL (excluded from requirements.txt)
|
||||||
@@ -91,7 +91,7 @@ jq~=1.3; python_version >= "3.8" and sys_platform == "linux"
|
|||||||
|
|
||||||
# playwright is installed at Dockerfile build time because it's not available on all platforms
|
# playwright is installed at Dockerfile build time because it's not available on all platforms
|
||||||
|
|
||||||
pyppeteer-ng==2.0.0rc10
|
pyppeteer-ng==2.0.0rc11
|
||||||
pyppeteerstealth>=0.0.4
|
pyppeteerstealth>=0.0.4
|
||||||
|
|
||||||
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
|
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
|
||||||
@@ -100,7 +100,7 @@ pytest-flask ~=1.3
|
|||||||
pytest-mock ~=3.15
|
pytest-mock ~=3.15
|
||||||
|
|
||||||
# Anything 4.0 and up but not 5.0
|
# Anything 4.0 and up but not 5.0
|
||||||
jsonschema ~= 4.25
|
jsonschema ~= 4.26
|
||||||
|
|
||||||
# OpenAPI validation support
|
# OpenAPI validation support
|
||||||
openapi-core[flask] >= 0.19.0
|
openapi-core[flask] >= 0.19.0
|
||||||
|
|||||||
Reference in New Issue
Block a user