mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-15 19:50:34 +00:00
Compare commits
15 Commits
RSS-data-U
...
processor-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7e853a4b46 | ||
|
|
4dc5301de4 | ||
|
|
edf0989cd4 | ||
|
|
423b546948 | ||
|
|
c1c810a79a | ||
|
|
51d531d732 | ||
|
|
e40c4ca97d | ||
|
|
b8ede70f3a | ||
|
|
50b349b464 | ||
|
|
67d097cca7 | ||
|
|
494385a379 | ||
|
|
c2ee84b753 | ||
|
|
c1e0296cda | ||
|
|
f041223c38 | ||
|
|
d36738d7ef |
@@ -132,6 +132,15 @@ ENV LOGGER_LEVEL="$LOGGER_LEVEL"
|
||||
ENV LC_ALL=en_US.UTF-8
|
||||
|
||||
WORKDIR /app
|
||||
|
||||
# Copy and set up entrypoint script for installing extra packages
|
||||
COPY docker-entrypoint.sh /docker-entrypoint.sh
|
||||
RUN chmod +x /docker-entrypoint.sh
|
||||
|
||||
# Set entrypoint to handle EXTRA_PACKAGES env var
|
||||
ENTRYPOINT ["/docker-entrypoint.sh"]
|
||||
|
||||
# Default command (can be overridden in docker-compose.yml)
|
||||
CMD ["python", "./changedetection.py", "-d", "/datastore"]
|
||||
|
||||
|
||||
|
||||
@@ -11,6 +11,7 @@ recursive-include changedetectionio/realtime *
|
||||
recursive-include changedetectionio/static *
|
||||
recursive-include changedetectionio/templates *
|
||||
recursive-include changedetectionio/tests *
|
||||
recursive-include changedetectionio/translations *
|
||||
recursive-include changedetectionio/widgets *
|
||||
prune changedetectionio/static/package-lock.json
|
||||
prune changedetectionio/static/styles/node_modules
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
# Semver means never use .01, or 00. Should be .1.
|
||||
__version__ = '0.52.1'
|
||||
__version__ = '0.52.4'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -139,58 +139,18 @@ class Watch(Resource):
|
||||
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
from changedetectionio import processors
|
||||
processor_config_data = {}
|
||||
regular_data = {}
|
||||
|
||||
for key, value in request.json.items():
|
||||
if key.startswith('processor_config_'):
|
||||
config_key = key.replace('processor_config_', '')
|
||||
if value: # Only save non-empty values
|
||||
processor_config_data[config_key] = value
|
||||
else:
|
||||
regular_data[key] = value
|
||||
# Make a mutable copy of request.json for modification
|
||||
json_data = dict(request.json)
|
||||
|
||||
# Extract and remove processor config fields from json_data
|
||||
processor_config_data = processors.extract_processor_config_from_form_data(json_data)
|
||||
|
||||
# Update watch with regular (non-processor-config) fields
|
||||
watch.update(regular_data)
|
||||
watch.update(json_data)
|
||||
|
||||
# Save processor config to JSON file if any config data exists
|
||||
if processor_config_data:
|
||||
try:
|
||||
processor_name = request.json.get('processor', watch.get('processor'))
|
||||
if processor_name:
|
||||
# Create a processor instance to access config methods
|
||||
from changedetectionio.processors import difference_detection_processor
|
||||
processor_instance = difference_detection_processor(self.datastore, uuid)
|
||||
# Use processor name as filename so each processor keeps its own config
|
||||
config_filename = f'{processor_name}.json'
|
||||
processor_instance.update_extra_watch_config(config_filename, processor_config_data)
|
||||
logger.debug(f"API: Saved processor config to {config_filename}: {processor_config_data}")
|
||||
|
||||
# Call optional edit_hook if processor has one
|
||||
try:
|
||||
import importlib
|
||||
edit_hook_module_name = f'changedetectionio.processors.{processor_name}.edit_hook'
|
||||
|
||||
try:
|
||||
edit_hook = importlib.import_module(edit_hook_module_name)
|
||||
logger.debug(f"API: Found edit_hook module for {processor_name}")
|
||||
|
||||
if hasattr(edit_hook, 'on_config_save'):
|
||||
logger.info(f"API: Calling edit_hook.on_config_save for {processor_name}")
|
||||
# Call hook and get updated config
|
||||
updated_config = edit_hook.on_config_save(watch, processor_config_data, self.datastore)
|
||||
# Save updated config back to file
|
||||
processor_instance.update_extra_watch_config(config_filename, updated_config)
|
||||
logger.info(f"API: Edit hook updated config: {updated_config}")
|
||||
else:
|
||||
logger.debug(f"API: Edit hook module found but no on_config_save function")
|
||||
except ModuleNotFoundError:
|
||||
logger.debug(f"API: No edit_hook module for processor {processor_name} (this is normal)")
|
||||
except Exception as hook_error:
|
||||
logger.error(f"API: Edit hook error (non-fatal): {hook_error}", exc_info=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"API: Failed to save processor config: {e}")
|
||||
# Save processor config to JSON file
|
||||
processors.save_processor_config(self.datastore, uuid, processor_config_data)
|
||||
|
||||
return "OK", 200
|
||||
|
||||
|
||||
@@ -1,5 +1,4 @@
|
||||
from blinker import signal
|
||||
|
||||
from .processors.exceptions import ProcessorException
|
||||
import changedetectionio.content_fetchers.exceptions as content_fetchers_exceptions
|
||||
from changedetectionio.processors.text_json_diff.processor import FilterNotFoundInResponse
|
||||
@@ -9,7 +8,6 @@ from changedetectionio.flask_app import watch_check_update
|
||||
import asyncio
|
||||
import importlib
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
|
||||
from loguru import logger
|
||||
@@ -17,36 +15,48 @@ from loguru import logger
|
||||
# Async version of update_worker
|
||||
# Processes jobs from AsyncSignalPriorityQueue instead of threaded queue
|
||||
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
async def async_update_worker(worker_id, q, notification_q, app, datastore, executor=None):
|
||||
"""
|
||||
Async worker function that processes watch check jobs from the queue.
|
||||
|
||||
|
||||
Args:
|
||||
worker_id: Unique identifier for this worker
|
||||
q: AsyncSignalPriorityQueue containing jobs to process
|
||||
notification_q: Standard queue for notifications
|
||||
app: Flask application instance
|
||||
datastore: Application datastore
|
||||
executor: ThreadPoolExecutor for queue operations (optional)
|
||||
"""
|
||||
# Set a descriptive name for this task
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
|
||||
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
update_handler = None
|
||||
watch = None
|
||||
|
||||
try:
|
||||
# Use native janus async interface - no threads needed!
|
||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
||||
# Use sync interface via run_in_executor since each worker has its own event loop
|
||||
loop = asyncio.get_event_loop()
|
||||
queued_item_data = await asyncio.wait_for(
|
||||
loop.run_in_executor(executor, q.get, True, 1.0), # block=True, timeout=1.0
|
||||
timeout=1.5
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No jobs available, continue loop
|
||||
continue
|
||||
except Exception as e:
|
||||
# Handle expected Empty exception from queue timeout
|
||||
import queue
|
||||
if isinstance(e, queue.Empty):
|
||||
# Queue is empty, normal behavior - just continue
|
||||
continue
|
||||
|
||||
# Unexpected exception - log as critical
|
||||
logger.critical(f"CRITICAL: Worker {worker_id} failed to get queue item: {type(e).__name__}: {e}")
|
||||
|
||||
# Log queue health for debugging
|
||||
@@ -104,11 +114,14 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
processor = watch.get('processor', 'text_json_diff')
|
||||
|
||||
# Init a new 'difference_detection_processor'
|
||||
try:
|
||||
processor_module = importlib.import_module(f"changedetectionio.processors.{processor}.processor")
|
||||
except ModuleNotFoundError as e:
|
||||
print(f"Processor module '{processor}' not found.")
|
||||
raise e
|
||||
# Use get_processor_module() to support both built-in and plugin processors
|
||||
from changedetectionio.processors import get_processor_module
|
||||
processor_module = get_processor_module(processor)
|
||||
|
||||
if not processor_module:
|
||||
error_msg = f"Processor module '{processor}' not found."
|
||||
logger.error(error_msg)
|
||||
raise ModuleNotFoundError(error_msg)
|
||||
|
||||
update_handler = processor_module.perform_site_check(datastore=datastore,
|
||||
watch_uuid=uuid)
|
||||
@@ -414,14 +427,13 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
||||
|
||||
finally:
|
||||
|
||||
try:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
|
||||
# Always cleanup - this runs whether there was an exception or not
|
||||
if uuid:
|
||||
try:
|
||||
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
try:
|
||||
# Mark UUID as no longer being processed by this worker
|
||||
worker_handler.set_uuid_processing(uuid, worker_id=worker_id, processing=False)
|
||||
@@ -460,7 +472,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||
except Exception as cleanup_error:
|
||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||
|
||||
|
||||
del(uuid)
|
||||
|
||||
# Brief pause before continuing to avoid tight error loops (only on error)
|
||||
if 'e' in locals():
|
||||
await asyncio.sleep(1.0)
|
||||
|
||||
@@ -92,7 +92,12 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
zip_thread = threading.Thread(target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")))
|
||||
zip_thread = threading.Thread(
|
||||
target=create_backup,
|
||||
args=(datastore.datastore_path, datastore.data.get("watching")),
|
||||
daemon=True,
|
||||
name="BackupCreator"
|
||||
)
|
||||
zip_thread.start()
|
||||
backup_threads.append(zip_thread)
|
||||
flash(gettext("Backup building in background, check back in a few minutes."))
|
||||
|
||||
@@ -21,31 +21,154 @@ from changedetectionio.flask_app import login_optionally_required
|
||||
from loguru import logger
|
||||
|
||||
browsersteps_sessions = {}
|
||||
browsersteps_watch_to_session = {} # Maps watch_uuid -> browsersteps_session_id
|
||||
io_interface_context = None
|
||||
import json
|
||||
import hashlib
|
||||
from flask import Response
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the existing async worker event loop"""
|
||||
from changedetectionio import worker_handler
|
||||
|
||||
# Use the existing async worker event loop instead of creating a new one
|
||||
if worker_handler.USE_ASYNC_WORKERS and worker_handler.async_loop and not worker_handler.async_loop.is_closed():
|
||||
logger.debug("Browser steps using existing async worker event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, worker_handler.async_loop)
|
||||
return future.result()
|
||||
else:
|
||||
# Fallback: create a new event loop (for sync workers or if async loop not available)
|
||||
logger.debug("Browser steps creating temporary event loop")
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
# Dedicated event loop for ALL browser steps sessions
|
||||
_browser_steps_loop = None
|
||||
_browser_steps_thread = None
|
||||
_browser_steps_loop_lock = threading.Lock()
|
||||
|
||||
def _start_browser_steps_loop():
|
||||
"""Start a dedicated event loop for browser steps in its own thread"""
|
||||
global _browser_steps_loop
|
||||
|
||||
# Create and set the event loop for this thread
|
||||
loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(loop)
|
||||
_browser_steps_loop = loop
|
||||
|
||||
logger.debug("Browser steps event loop started")
|
||||
|
||||
try:
|
||||
# Run the loop forever - handles all browsersteps sessions
|
||||
loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Browser steps event loop error: {e}")
|
||||
finally:
|
||||
try:
|
||||
return loop.run_until_complete(coro)
|
||||
# Cancel all remaining tasks
|
||||
pending = asyncio.all_tasks(loop)
|
||||
for task in pending:
|
||||
task.cancel()
|
||||
|
||||
# Wait for tasks to finish cancellation
|
||||
if pending:
|
||||
loop.run_until_complete(asyncio.gather(*pending, return_exceptions=True))
|
||||
except Exception as e:
|
||||
logger.debug(f"Error during browser steps loop cleanup: {e}")
|
||||
finally:
|
||||
loop.close()
|
||||
logger.debug("Browser steps event loop closed")
|
||||
|
||||
def _ensure_browser_steps_loop():
|
||||
"""Ensure the browser steps event loop is running"""
|
||||
global _browser_steps_loop, _browser_steps_thread
|
||||
|
||||
with _browser_steps_loop_lock:
|
||||
if _browser_steps_thread is None or not _browser_steps_thread.is_alive():
|
||||
logger.debug("Starting browser steps event loop thread")
|
||||
_browser_steps_thread = threading.Thread(
|
||||
target=_start_browser_steps_loop,
|
||||
daemon=True,
|
||||
name="BrowserStepsEventLoop"
|
||||
)
|
||||
_browser_steps_thread.start()
|
||||
|
||||
# Wait for the loop to be ready
|
||||
timeout = 5.0
|
||||
start_time = time.time()
|
||||
while _browser_steps_loop is None:
|
||||
if time.time() - start_time > timeout:
|
||||
raise RuntimeError("Browser steps event loop failed to start")
|
||||
time.sleep(0.01)
|
||||
|
||||
logger.debug("Browser steps event loop thread started and ready")
|
||||
|
||||
def run_async_in_browser_loop(coro):
|
||||
"""Run async coroutine using the dedicated browser steps event loop"""
|
||||
_ensure_browser_steps_loop()
|
||||
|
||||
if _browser_steps_loop and not _browser_steps_loop.is_closed():
|
||||
logger.debug("Browser steps using dedicated event loop")
|
||||
future = asyncio.run_coroutine_threadsafe(coro, _browser_steps_loop)
|
||||
return future.result()
|
||||
else:
|
||||
raise RuntimeError("Browser steps event loop is not available")
|
||||
|
||||
def cleanup_expired_sessions():
|
||||
"""Remove expired browsersteps sessions and cleanup their resources"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
expired_session_ids = []
|
||||
|
||||
# Find expired sessions
|
||||
for session_id, session_data in browsersteps_sessions.items():
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper and browserstepper.has_expired:
|
||||
expired_session_ids.append(session_id)
|
||||
|
||||
# Cleanup expired sessions
|
||||
for session_id in expired_session_ids:
|
||||
logger.debug(f"Cleaning up expired browsersteps session {session_id}")
|
||||
session_data = browsersteps_sessions[session_id]
|
||||
|
||||
# Cleanup playwright resources asynchronously
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
for watch_uuid, mapped_session_id in list(browsersteps_watch_to_session.items()):
|
||||
if mapped_session_id == session_id:
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
break
|
||||
|
||||
if expired_session_ids:
|
||||
logger.info(f"Cleaned up {len(expired_session_ids)} expired browsersteps session(s)")
|
||||
|
||||
def cleanup_session_for_watch(watch_uuid):
|
||||
"""Cleanup a specific browsersteps session for a watch UUID"""
|
||||
global browsersteps_sessions, browsersteps_watch_to_session
|
||||
|
||||
session_id = browsersteps_watch_to_session.get(watch_uuid)
|
||||
if not session_id:
|
||||
logger.debug(f"No browsersteps session found for watch {watch_uuid}")
|
||||
return
|
||||
|
||||
logger.debug(f"Cleaning up browsersteps session {session_id} for watch {watch_uuid}")
|
||||
|
||||
session_data = browsersteps_sessions.get(session_id)
|
||||
if session_data:
|
||||
browserstepper = session_data.get('browserstepper')
|
||||
if browserstepper:
|
||||
try:
|
||||
run_async_in_browser_loop(browserstepper.cleanup())
|
||||
except Exception as e:
|
||||
logger.error(f"Error cleaning up session {session_id} for watch {watch_uuid}: {e}")
|
||||
|
||||
# Remove from sessions dict
|
||||
del browsersteps_sessions[session_id]
|
||||
|
||||
# Remove from watch mapping
|
||||
del browsersteps_watch_to_session[watch_uuid]
|
||||
|
||||
logger.debug(f"Cleaned up session for watch {watch_uuid}")
|
||||
|
||||
# Opportunistically cleanup any other expired sessions
|
||||
cleanup_expired_sessions()
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
|
||||
@@ -123,6 +246,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
if not watch_uuid:
|
||||
return make_response('No Watch UUID specified', 500)
|
||||
|
||||
# Cleanup any existing session for this watch
|
||||
cleanup_session_for_watch(watch_uuid)
|
||||
|
||||
logger.debug("Starting connection with playwright")
|
||||
logger.debug("browser_steps.py connecting")
|
||||
|
||||
@@ -131,6 +257,10 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
browsersteps_sessions[browsersteps_session_id] = run_async_in_browser_loop(
|
||||
start_browsersteps_session(watch_uuid)
|
||||
)
|
||||
|
||||
# Store the mapping of watch_uuid -> browsersteps_session_id
|
||||
browsersteps_watch_to_session[watch_uuid] = browsersteps_session_id
|
||||
|
||||
except Exception as e:
|
||||
if 'ECONNREFUSED' in str(e):
|
||||
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
||||
|
||||
@@ -47,9 +47,6 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
if len(dates) < 2:
|
||||
return f"Watch {uuid} does not have enough history snapshots to show changes (need at least 2)", 400
|
||||
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Get the number of diffs to include (default: 5)
|
||||
rss_diff_length = datastore.data['settings']['application'].get('rss_diff_length', 5)
|
||||
|
||||
@@ -101,7 +98,7 @@ def construct_single_watch_routes(rss_blueprint, datastore):
|
||||
date_index_from, date_index_to)
|
||||
|
||||
# Create and populate feed entry
|
||||
guid = f"{watch['uuid']}/{timestamp_to}"
|
||||
guid = f"{uuid}/{timestamp_to}"
|
||||
fe = fg.add_entry()
|
||||
title_suffix = f"Change @ {res['original_context']['change_datetime']}"
|
||||
populate_feed_entry(fe, watch, res.get('body', ''), guid, timestamp_to,
|
||||
|
||||
@@ -63,11 +63,8 @@ def construct_tag_routes(rss_blueprint, datastore):
|
||||
|
||||
# Only include unviewed watches
|
||||
if not watch.viewed:
|
||||
# Add uuid to watch for proper functioning
|
||||
watch['uuid'] = uuid
|
||||
|
||||
# Include a link to the diff page
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=watch['uuid'], _external=True)}
|
||||
# Include a link to the diff page (use uuid from loop, don't modify watch dict)
|
||||
diff_link = {'href': url_for('ui.ui_diff.diff_history_page', uuid=uuid, _external=True)}
|
||||
|
||||
# Get watch label
|
||||
watch_label = get_watch_label(datastore, watch)
|
||||
|
||||
@@ -50,7 +50,8 @@
|
||||
<td>{{ "{:,}".format(tag_count[uuid]) if uuid in tag_count else 0 }}</td>
|
||||
<td class="title-col inline"> <a href="{{url_for('watchlist.index', tag=uuid) }}">{{ tag.title }}</a></td>
|
||||
<td>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a class="pure-button pure-button-primary" href="{{ url_for('tags.form_tag_edit', uuid=uuid) }}">{{ _('Edit') }}</a>
|
||||
<a href="{{ url_for('ui.form_watch_checknow', tag=uuid) }}" class="pure-button pure-button-primary" >{{ _('Recheck') }}</a>
|
||||
<a class="pure-button button-error"
|
||||
href="{{ url_for('tags.delete', uuid=uuid) }}"
|
||||
data-requires-confirm
|
||||
|
||||
@@ -101,23 +101,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's difference module
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
|
||||
# Try to get the processor's difference module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'difference')
|
||||
|
||||
# Call the processor's render() function
|
||||
if hasattr(processor_module, 'render'):
|
||||
return processor_module.render(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Processor {processor_name} does not have a difference module, falling back to text_json_diff: {e}")
|
||||
# Call the processor's render() function
|
||||
if processor_module and hasattr(processor_module, 'render'):
|
||||
return processor_module.render(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
|
||||
# Fallback: if processor doesn't have difference module, use text_json_diff as default
|
||||
from changedetectionio.processors.text_json_diff.difference import render as default_render
|
||||
@@ -157,23 +155,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's extract module
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.extract')
|
||||
# Try to get the processor's extract module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'extract')
|
||||
|
||||
# Call the processor's render_form() function
|
||||
if hasattr(processor_module, 'render_form'):
|
||||
return processor_module.render_form(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Processor {processor_name} does not have an extract module, falling back to base extractor: {e}")
|
||||
# Call the processor's render_form() function
|
||||
if processor_module and hasattr(processor_module, 'render_form'):
|
||||
return processor_module.render_form(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
|
||||
# Fallback: if processor doesn't have extract module, use base processors.extract as default
|
||||
from changedetectionio.processors.extract import render_form as default_render_form
|
||||
@@ -213,24 +209,22 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's extract module
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.extract')
|
||||
# Try to get the processor's extract module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'extract')
|
||||
|
||||
# Call the processor's process_extraction() function
|
||||
if hasattr(processor_module, 'process_extraction'):
|
||||
return processor_module.process_extraction(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
make_response=make_response,
|
||||
send_from_directory=send_from_directory,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Processor {processor_name} does not have an extract module, falling back to base extractor: {e}")
|
||||
# Call the processor's process_extraction() function
|
||||
if processor_module and hasattr(processor_module, 'process_extraction'):
|
||||
return processor_module.process_extraction(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
make_response=make_response,
|
||||
send_from_directory=send_from_directory,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
|
||||
# Fallback: if processor doesn't have extract module, use base processors.extract as default
|
||||
from changedetectionio.processors.extract import process_extraction as default_process_extraction
|
||||
@@ -280,38 +274,33 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's difference module
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
|
||||
# Try to get the processor's difference module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'difference')
|
||||
|
||||
# Call the processor's get_asset() function
|
||||
if hasattr(processor_module, 'get_asset'):
|
||||
result = processor_module.get_asset(
|
||||
asset_name=asset_name,
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request
|
||||
)
|
||||
# Call the processor's get_asset() function
|
||||
if processor_module and hasattr(processor_module, 'get_asset'):
|
||||
result = processor_module.get_asset(
|
||||
asset_name=asset_name,
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request
|
||||
)
|
||||
|
||||
if result is None:
|
||||
from flask import abort
|
||||
abort(404, description=f"Asset '{asset_name}' not found")
|
||||
|
||||
binary_data, content_type, cache_control = result
|
||||
|
||||
response = make_response(binary_data)
|
||||
response.headers['Content-Type'] = content_type
|
||||
if cache_control:
|
||||
response.headers['Cache-Control'] = cache_control
|
||||
return response
|
||||
else:
|
||||
logger.warning(f"Processor {processor_name} does not implement get_asset()")
|
||||
if result is None:
|
||||
from flask import abort
|
||||
abort(404, description=f"Processor '{processor_name}' does not support assets")
|
||||
abort(404, description=f"Asset '{asset_name}' not found")
|
||||
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Processor {processor_name} does not have a difference module: {e}")
|
||||
binary_data, content_type, cache_control = result
|
||||
|
||||
response = make_response(binary_data)
|
||||
response.headers['Content-Type'] = content_type
|
||||
if cache_control:
|
||||
response.headers['Cache-Control'] = cache_control
|
||||
return response
|
||||
else:
|
||||
logger.warning(f"Processor {processor_name} does not implement get_asset()")
|
||||
from flask import abort
|
||||
abort(404, description=f"Processor '{processor_name}' not found")
|
||||
abort(404, description=f"Processor '{processor_name}' does not support assets")
|
||||
|
||||
return diff_blueprint
|
||||
|
||||
@@ -66,8 +66,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
processor_name = datastore.data['watching'][uuid].get('processor', '')
|
||||
processor_classes = next((tpl for tpl in processors.find_processors() if tpl[1] == processor_name), None)
|
||||
if not processor_classes:
|
||||
flash(gettext("Cannot load the edit form for processor/plugin '{}', plugin missing?").format(processor_classes[1]), 'error')
|
||||
return redirect(url_for('watchlist.index'))
|
||||
flash(gettext("Could not load '{}' processor, processor plugin might be missing. Please select a different processor.").format(processor_name), 'error')
|
||||
# Fall back to default processor so user can still edit and change processor
|
||||
processor_classes = next((tpl for tpl in processors.find_processors() if tpl[1] == 'text_json_diff'), None)
|
||||
if not processor_classes:
|
||||
# If even text_json_diff is missing, something is very wrong
|
||||
flash(gettext("Could not load '{}' processor, processor plugin might be missing.").format(processor_name), 'error')
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
parent_module = processors.get_parent_module(processor_classes[0])
|
||||
|
||||
@@ -144,58 +149,10 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
extra_update_obj['time_between_check'] = form.time_between_check.data
|
||||
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
processor_config_data = {}
|
||||
fields_to_remove = []
|
||||
for field_name, field_value in form.data.items():
|
||||
if field_name.startswith('processor_config_'):
|
||||
config_key = field_name.replace('processor_config_', '')
|
||||
if field_value: # Only save non-empty values
|
||||
processor_config_data[config_key] = field_value
|
||||
fields_to_remove.append(field_name)
|
||||
|
||||
# Save processor config to JSON file if any config data exists
|
||||
if processor_config_data:
|
||||
try:
|
||||
processor_name = form.data.get('processor')
|
||||
# Create a processor instance to access config methods
|
||||
processor_instance = processors.difference_detection_processor(datastore, uuid)
|
||||
# Use processor name as filename so each processor keeps its own config
|
||||
config_filename = f'{processor_name}.json'
|
||||
processor_instance.update_extra_watch_config(config_filename, processor_config_data)
|
||||
logger.debug(f"Saved processor config to {config_filename}: {processor_config_data}")
|
||||
|
||||
# Call optional edit_hook if processor has one
|
||||
try:
|
||||
# Try to import the edit_hook module from the processor package
|
||||
import importlib
|
||||
edit_hook_module_name = f'changedetectionio.processors.{processor_name}.edit_hook'
|
||||
|
||||
try:
|
||||
edit_hook = importlib.import_module(edit_hook_module_name)
|
||||
logger.debug(f"Found edit_hook module for {processor_name}")
|
||||
|
||||
if hasattr(edit_hook, 'on_config_save'):
|
||||
logger.info(f"Calling edit_hook.on_config_save for {processor_name}")
|
||||
watch_obj = datastore.data['watching'][uuid]
|
||||
# Call hook and get updated config
|
||||
updated_config = edit_hook.on_config_save(watch_obj, processor_config_data, datastore)
|
||||
# Save updated config back to file
|
||||
processor_instance.update_extra_watch_config(config_filename, updated_config)
|
||||
logger.info(f"Edit hook updated config: {updated_config}")
|
||||
else:
|
||||
logger.debug(f"Edit hook module found but no on_config_save function")
|
||||
except ModuleNotFoundError:
|
||||
logger.debug(f"No edit_hook module for processor {processor_name} (this is normal)")
|
||||
except Exception as hook_error:
|
||||
logger.error(f"Edit hook error (non-fatal): {hook_error}", exc_info=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save processor config: {e}")
|
||||
|
||||
# Remove processor-config-* fields from form.data before updating datastore
|
||||
for field_name in fields_to_remove:
|
||||
form.data.pop(field_name, None)
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
# IMPORTANT: These must NOT be saved to url-watches.json, only to the processor-specific JSON file
|
||||
processor_config_data = processors.extract_processor_config_from_form_data(form.data)
|
||||
processors.save_processor_config(datastore, uuid, processor_config_data)
|
||||
|
||||
# Ignore text
|
||||
form_ignore_text = form.ignore_text.data
|
||||
@@ -238,6 +195,13 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
||||
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
||||
|
||||
# Cleanup any browsersteps session for this watch
|
||||
try:
|
||||
from changedetectionio.blueprint.browser_steps import cleanup_session_for_watch
|
||||
cleanup_session_for_watch(uuid)
|
||||
except Exception as e:
|
||||
logger.debug(f"Error cleaning up browsersteps session: {e}")
|
||||
|
||||
# Re #286 - We wait for syncing new data to disk in another thread every 60 seconds
|
||||
# But in the case something is added we should save straight away
|
||||
datastore.needs_write_urgent = True
|
||||
@@ -325,8 +289,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
'url': url_for('rss.rss_single_watch', uuid=watch['uuid'], token=app_rss_token)
|
||||
},
|
||||
'settings_application': datastore.data['settings']['application'],
|
||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||
|
||||
@@ -39,24 +39,21 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's preview module
|
||||
import importlib
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.preview')
|
||||
# Try to get the processor's preview module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'preview')
|
||||
|
||||
# Call the processor's render() function
|
||||
if hasattr(processor_module, 'render'):
|
||||
return processor_module.render(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.debug(f"Processor {processor_name} does not have a preview module, using default preview: {e}")
|
||||
# Call the processor's render() function
|
||||
if processor_module and hasattr(processor_module, 'render'):
|
||||
return processor_module.render(
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request,
|
||||
url_for=url_for,
|
||||
render_template=render_template,
|
||||
flash=flash,
|
||||
redirect=redirect
|
||||
)
|
||||
|
||||
# Fallback: if processor doesn't have preview module, use default text preview
|
||||
content = []
|
||||
@@ -163,39 +160,33 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Get the processor type for this watch
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
try:
|
||||
# Try to import the processor's preview module
|
||||
import importlib
|
||||
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.preview')
|
||||
# Try to get the processor's preview module (works for both built-in and plugin processors)
|
||||
from changedetectionio.processors import get_processor_submodule
|
||||
processor_module = get_processor_submodule(processor_name, 'preview')
|
||||
|
||||
# Call the processor's get_asset() function
|
||||
if hasattr(processor_module, 'get_asset'):
|
||||
result = processor_module.get_asset(
|
||||
asset_name=asset_name,
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request
|
||||
)
|
||||
# Call the processor's get_asset() function
|
||||
if processor_module and hasattr(processor_module, 'get_asset'):
|
||||
result = processor_module.get_asset(
|
||||
asset_name=asset_name,
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
request=request
|
||||
)
|
||||
|
||||
if result is None:
|
||||
from flask import abort
|
||||
abort(404, description=f"Asset '{asset_name}' not found")
|
||||
|
||||
binary_data, content_type, cache_control = result
|
||||
|
||||
response = make_response(binary_data)
|
||||
response.headers['Content-Type'] = content_type
|
||||
if cache_control:
|
||||
response.headers['Cache-Control'] = cache_control
|
||||
return response
|
||||
else:
|
||||
logger.warning(f"Processor {processor_name} does not implement get_asset()")
|
||||
if result is None:
|
||||
from flask import abort
|
||||
abort(404, description=f"Processor '{processor_name}' does not support assets")
|
||||
abort(404, description=f"Asset '{asset_name}' not found")
|
||||
|
||||
except (ImportError, ModuleNotFoundError) as e:
|
||||
logger.warning(f"Processor {processor_name} does not have a preview module: {e}")
|
||||
binary_data, content_type, cache_control = result
|
||||
|
||||
response = make_response(binary_data)
|
||||
response.headers['Content-Type'] = content_type
|
||||
if cache_control:
|
||||
response.headers['Cache-Control'] = cache_control
|
||||
return response
|
||||
else:
|
||||
logger.warning(f"Processor {processor_name} does not implement get_asset()")
|
||||
from flask import abort
|
||||
abort(404, description=f"Processor '{processor_name}' not found")
|
||||
abort(404, description=f"Processor '{processor_name}' does not support assets")
|
||||
|
||||
return preview_blueprint
|
||||
|
||||
@@ -206,7 +206,7 @@ Math: {{ 1 + 1 }}") }}
|
||||
|
||||
<div class="tab-pane-inner" id="browser-steps">
|
||||
{% if capabilities.supports_browser_steps %}
|
||||
{% if visual_selector_data_ready %}
|
||||
{% if true %}
|
||||
<img class="beta-logo" src="{{url_for('static_content', group='images', filename='beta-logo.png')}}" alt="New beta functionality">
|
||||
<fieldset>
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -200,23 +200,24 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
|
||||
</div>
|
||||
{% endif %}
|
||||
<div>
|
||||
<span class="watch-title">
|
||||
{% if system_use_url_watchlist or watch.get('use_page_title_in_list') %}
|
||||
{{ watch.label }}
|
||||
{% else %}
|
||||
{{ watch.get('title') or watch.link }}
|
||||
{% endif %}
|
||||
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}"> </a>
|
||||
</span>
|
||||
{%- if watch['processor'] and watch['processor'] in processor_badge_texts -%}
|
||||
<span class="processor-badge processor-badge-{{ watch['processor'] }}" title="{{ processor_descriptions.get(watch['processor'], watch['processor']) }}">{{ processor_badge_texts[watch['processor']] }}</span>
|
||||
{%- endif -%}
|
||||
<span class="watch-title">
|
||||
{% if system_use_url_watchlist or watch.get('use_page_title_in_list') %}
|
||||
{{ watch.label }}
|
||||
{% else %}
|
||||
{{ watch.get('title') or watch.link }}
|
||||
{% endif %}
|
||||
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}"> </a>
|
||||
</span>
|
||||
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list)|safe }}</div>
|
||||
{%- if watch['processor'] == 'text_json_diff' -%}
|
||||
{%- if watch['has_ldjson_price_data'] and not watch['track_ldjson_price_data'] -%}
|
||||
<div class="ldjson-price-track-offer">Switch to Restock & Price watch mode? <a href="{{url_for('price_data_follower.accept', uuid=watch.uuid)}}" class="pure-button button-xsmall">Yes</a> <a href="{{url_for('price_data_follower.reject', uuid=watch.uuid)}}" class="">No</a></div>
|
||||
{%- endif -%}
|
||||
{%- endif -%}
|
||||
{%- if watch['processor'] and watch['processor'] in processor_badge_texts -%}
|
||||
<span class="processor-badge processor-badge-{{ watch['processor'] }}" title="{{ processor_descriptions.get(watch['processor'], watch['processor']) }}">{{ processor_badge_texts[watch['processor']] }}</span>
|
||||
{%- endif -%}
|
||||
|
||||
{%- for watch_tag_uuid, watch_tag in datastore.get_all_tags_for_watch(watch['uuid']).items() -%}
|
||||
<span class="watch-tag-list tag-{{ watch_tag.title|sanitize_tag_class }}">{{ watch_tag.title }}</span>
|
||||
{%- endfor -%}
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import gc
|
||||
import json
|
||||
import os
|
||||
from urllib.parse import urlparse
|
||||
@@ -185,20 +186,33 @@ class fetcher(Fetcher):
|
||||
super().screenshot_step(step_n=step_n)
|
||||
screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
# Request GC immediately after screenshot to free memory
|
||||
# Screenshots can be large and browser steps take many of them
|
||||
await self.page.request_gc()
|
||||
|
||||
if self.browser_steps_screenshot_path is not None:
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.jpeg'.format(step_n))
|
||||
logger.debug(f"Saving step screenshot to {destination}")
|
||||
with open(destination, 'wb') as f:
|
||||
f.write(screenshot)
|
||||
# Clear local reference to allow screenshot bytes to be collected
|
||||
del screenshot
|
||||
gc.collect()
|
||||
|
||||
async def save_step_html(self, step_n):
|
||||
super().save_step_html(step_n=step_n)
|
||||
content = await self.page.content()
|
||||
|
||||
# Request GC after getting page content
|
||||
await self.page.request_gc()
|
||||
|
||||
destination = os.path.join(self.browser_steps_screenshot_path, 'step_{}.html'.format(step_n))
|
||||
logger.debug(f"Saving step HTML to {destination}")
|
||||
with open(destination, 'w', encoding='utf-8') as f:
|
||||
f.write(content)
|
||||
# Clear local reference
|
||||
del content
|
||||
gc.collect()
|
||||
|
||||
async def run(self,
|
||||
fetch_favicon=True,
|
||||
@@ -305,6 +319,12 @@ class fetcher(Fetcher):
|
||||
|
||||
if self.status_code != 200 and not ignore_status_codes:
|
||||
screenshot = await capture_full_page_async(self.page, screenshot_format=self.screenshot_format)
|
||||
# Cleanup before raising to prevent memory leak
|
||||
await self.page.close()
|
||||
await context.close()
|
||||
await browser.close()
|
||||
# Force garbage collection to release Playwright resources immediately
|
||||
gc.collect()
|
||||
raise Non200ErrorCodeReceived(url=url, status_code=self.status_code, screenshot=screenshot)
|
||||
|
||||
if not empty_pages_are_a_change and len((await self.page.content()).strip()) == 0:
|
||||
@@ -313,48 +333,52 @@ class fetcher(Fetcher):
|
||||
await browser.close()
|
||||
raise EmptyReply(url=url, status_code=response.status)
|
||||
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# Wrap remaining operations in try/finally to ensure cleanup
|
||||
try:
|
||||
# Run Browser Steps here
|
||||
if self.browser_steps_get_valid_steps():
|
||||
await self.iterate_browser_steps(start_url=url)
|
||||
|
||||
await self.page.wait_for_timeout(extra_wait * 1000)
|
||||
|
||||
now = time.time()
|
||||
# So we can find an element on the page where its selector was entered manually (maybe not xPath etc)
|
||||
if current_include_filters is not None:
|
||||
await self.page.evaluate("var include_filters={}".format(json.dumps(current_include_filters)))
|
||||
else:
|
||||
await self.page.evaluate("var include_filters=''")
|
||||
await self.page.request_gc()
|
||||
|
||||
# request_gc before and after evaluate to free up memory
|
||||
# @todo browsersteps etc
|
||||
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
|
||||
self.xpath_data = await self.page.evaluate(XPATH_ELEMENT_JS, {
|
||||
"visualselector_xpath_selectors": visualselector_xpath_selectors,
|
||||
"max_height": MAX_TOTAL_HEIGHT
|
||||
})
|
||||
await self.page.request_gc()
|
||||
|
||||
self.instock_data = await self.page.evaluate(INSTOCK_DATA_JS)
|
||||
await self.page.request_gc()
|
||||
|
||||
self.content = await self.page.content()
|
||||
await self.page.request_gc()
|
||||
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
|
||||
|
||||
|
||||
# Bug 3 in Playwright screenshot handling
|
||||
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
|
||||
# JPEG is better here because the screenshots can be very very large
|
||||
|
||||
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
|
||||
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
|
||||
# acceptable screenshot quality here
|
||||
# The actual screenshot - this always base64 and needs decoding! horrible! huge CPU usage
|
||||
self.screenshot = await capture_full_page_async(page=self.page, screenshot_format=self.screenshot_format)
|
||||
|
||||
except ScreenshotUnavailable:
|
||||
# Re-raise screenshot unavailable exceptions
|
||||
raise
|
||||
except Exception as e:
|
||||
# It's likely the screenshot was too long/big and something crashed
|
||||
raise ScreenshotUnavailable(url=url, status_code=self.status_code)
|
||||
@@ -389,6 +413,10 @@ class fetcher(Fetcher):
|
||||
pass
|
||||
browser = None
|
||||
|
||||
# Force Python GC to release Playwright resources immediately
|
||||
# Playwright objects can have circular references that delay cleanup
|
||||
gc.collect()
|
||||
|
||||
|
||||
# Plugin registration for built-in fetcher
|
||||
class PlaywrightFetcherPlugin:
|
||||
|
||||
@@ -15,7 +15,7 @@ class fetcher(Fetcher):
|
||||
proxy_url = None
|
||||
|
||||
# Capability flags
|
||||
supports_browser_steps = True
|
||||
supports_browser_steps = False
|
||||
supports_screenshots = True
|
||||
supports_xpath_element_data = True
|
||||
|
||||
|
||||
@@ -57,14 +57,15 @@ class SignalPriorityQueue(queue.PriorityQueue):
|
||||
def put(self, item, block=True, timeout=None):
|
||||
# Call the parent's put method first
|
||||
super().put(item, block, timeout)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
@@ -312,14 +313,15 @@ class AsyncSignalPriorityQueue(asyncio.PriorityQueue):
|
||||
async def put(self, item):
|
||||
# Call the parent's put method first
|
||||
await super().put(item)
|
||||
|
||||
|
||||
# After putting the item in the queue, check if it has a UUID and emit signal
|
||||
if hasattr(item, 'item') and isinstance(item.item, dict) and 'uuid' in item.item:
|
||||
uuid = item.item['uuid']
|
||||
# Get the signal and send it if it exists
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
# Send the watch_uuid parameter
|
||||
# NOTE: This would block other workers from .put/.get while this signal sends
|
||||
# Signal handlers may iterate the queue/datastore while holding locks
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
# Send queue_length signal with current queue size
|
||||
|
||||
@@ -863,13 +863,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore)
|
||||
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
threading.Thread(target=notification_runner).start()
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start()
|
||||
threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
threading.Thread(target=check_for_new_version).start()
|
||||
threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start()
|
||||
|
||||
# Return the Flask app - the Socket.IO will be attached to it but initialized separately
|
||||
# This avoids circular dependencies
|
||||
|
||||
@@ -29,6 +29,7 @@ def get_timeago_locale(flask_locale):
|
||||
"""
|
||||
locale_map = {
|
||||
'zh': 'zh_CN', # Chinese Simplified
|
||||
'zh_Hant_TW': 'zh_TW', # Flask-Babel normalizes zh_TW to zh_Hant_TW
|
||||
'pt': 'pt_PT', # Portuguese (Portugal)
|
||||
'sv': 'sv_SE', # Swedish
|
||||
'no': 'nb_NO', # Norwegian Bokmål
|
||||
|
||||
@@ -10,9 +10,13 @@ from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
from .. import jinja2_custom as safe_jinja
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
"""
|
||||
@@ -29,6 +33,7 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
try:
|
||||
# Receive data from parent process via pipe (avoids pickle overhead)
|
||||
contents = conn.recv()
|
||||
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
||||
|
||||
if mode is not None:
|
||||
compressed_data = brotli.compress(contents, mode=mode)
|
||||
@@ -40,9 +45,10 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
||||
|
||||
# Send success status back
|
||||
conn.send(True)
|
||||
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
||||
# No need for explicit cleanup - process exit frees all memory
|
||||
except Exception as e:
|
||||
logger.error(f"Brotli compression worker failed: {e}")
|
||||
logger.critical(f"Brotli compression worker failed: {e}")
|
||||
conn.send(False)
|
||||
finally:
|
||||
conn.close()
|
||||
@@ -66,7 +72,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
Raises:
|
||||
Exception: if compression fails and fallback_uncompressed is False
|
||||
"""
|
||||
import brotli
|
||||
import multiprocessing
|
||||
import sys
|
||||
|
||||
@@ -144,11 +149,6 @@ def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_
|
||||
else:
|
||||
raise Exception(f"Brotli compression subprocess failed for {filepath}")
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
class model(watch_base):
|
||||
__newest_history_key = None
|
||||
@@ -492,7 +492,6 @@ class model(watch_base):
|
||||
|
||||
self.ensure_data_dir_exists()
|
||||
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||
|
||||
# Binary data - detect file type and save without compression
|
||||
@@ -516,7 +515,7 @@ class model(watch_base):
|
||||
|
||||
# Text data - use brotli compression if enabled and above threshold
|
||||
else:
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
if not skip_brotli and len(contents) > BROTLI_COMPRESS_SIZE_THRESHOLD:
|
||||
# Compressed text
|
||||
import brotli
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
|
||||
@@ -105,6 +105,30 @@ class ChangeDetectionSpec:
|
||||
"""
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def register_processor(self):
|
||||
"""Register an external processor plugin.
|
||||
|
||||
External packages can implement this hook to register custom processors
|
||||
that will be discovered alongside built-in processors.
|
||||
|
||||
Returns:
|
||||
dict or None: Dictionary with processor information:
|
||||
{
|
||||
'processor_name': str, # Machine name (e.g., 'osint_recon')
|
||||
'processor_module': module, # Module containing processor.py
|
||||
'processor_class': class, # The perform_site_check class
|
||||
'metadata': { # Optional metadata
|
||||
'name': str, # Display name
|
||||
'description': str, # Description
|
||||
'processor_weight': int,# Sort weight (lower = higher priority)
|
||||
'list_badge_text': str, # Badge text for UI
|
||||
}
|
||||
}
|
||||
Return None if this plugin doesn't provide a processor
|
||||
"""
|
||||
pass
|
||||
|
||||
|
||||
# Set up Plugin Manager
|
||||
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
||||
|
||||
@@ -17,9 +17,11 @@ def find_sub_packages(package_name):
|
||||
return [name for _, name, is_pkg in pkgutil.iter_modules(package.__path__) if is_pkg]
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def find_processors():
|
||||
"""
|
||||
Find all subclasses of DifferenceDetectionProcessor in the specified package.
|
||||
Results are cached to avoid repeated discovery.
|
||||
|
||||
:param package_name: The name of the package to scan for processor modules.
|
||||
:return: A list of (module, class) tuples.
|
||||
@@ -46,6 +48,22 @@ def find_processors():
|
||||
except (ModuleNotFoundError, ImportError) as e:
|
||||
logger.warning(f"Failed to import module {module_name}: {e} (find_processors())")
|
||||
|
||||
# Discover plugin processors via pluggy
|
||||
try:
|
||||
from changedetectionio.pluggy_interface import plugin_manager
|
||||
plugin_results = plugin_manager.hook.register_processor()
|
||||
|
||||
for result in plugin_results:
|
||||
if result and isinstance(result, dict):
|
||||
processor_module = result.get('processor_module')
|
||||
processor_name = result.get('processor_name')
|
||||
|
||||
if processor_module and processor_name:
|
||||
processors.append((processor_module, processor_name))
|
||||
logger.info(f"Registered plugin processor: {processor_name}")
|
||||
except Exception as e:
|
||||
logger.warning(f"Error loading plugin processors: {e}")
|
||||
|
||||
return processors
|
||||
|
||||
|
||||
@@ -97,54 +115,137 @@ def find_processor_module(processor_name):
|
||||
return None
|
||||
|
||||
|
||||
def get_processor_module(processor_name):
|
||||
"""
|
||||
Get the actual processor module (with perform_site_check class) by name.
|
||||
Works for both built-in and plugin processors.
|
||||
|
||||
Args:
|
||||
processor_name: Processor machine name (e.g., 'text_json_diff', 'osint_recon')
|
||||
|
||||
Returns:
|
||||
module: The processor module containing perform_site_check, or None if not found
|
||||
"""
|
||||
processor_classes = find_processors()
|
||||
processor_tuple = next((tpl for tpl in processor_classes if tpl[1] == processor_name), None)
|
||||
|
||||
if processor_tuple:
|
||||
# Return the actual processor module (first element of tuple)
|
||||
return processor_tuple[0]
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_processor_submodule(processor_name, submodule_name):
|
||||
"""
|
||||
Get an optional submodule from a processor (e.g., 'difference', 'extract', 'preview').
|
||||
Works for both built-in and plugin processors.
|
||||
|
||||
Args:
|
||||
processor_name: Processor machine name (e.g., 'text_json_diff', 'osint_recon')
|
||||
submodule_name: Name of the submodule (e.g., 'difference', 'extract', 'preview')
|
||||
|
||||
Returns:
|
||||
module: The submodule if it exists, or None if not found
|
||||
"""
|
||||
processor_classes = find_processors()
|
||||
processor_tuple = next((tpl for tpl in processor_classes if tpl[1] == processor_name), None)
|
||||
|
||||
if not processor_tuple:
|
||||
return None
|
||||
|
||||
processor_module = processor_tuple[0]
|
||||
parent_module = get_parent_module(processor_module)
|
||||
|
||||
if not parent_module:
|
||||
return None
|
||||
|
||||
# Try to import the submodule
|
||||
try:
|
||||
# For built-in processors: changedetectionio.processors.text_json_diff.difference
|
||||
# For plugin processors: changedetectionio_osint.difference
|
||||
parent_module_name = parent_module.__name__
|
||||
submodule_full_name = f"{parent_module_name}.{submodule_name}"
|
||||
return importlib.import_module(submodule_full_name)
|
||||
except (ModuleNotFoundError, ImportError):
|
||||
return None
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_plugin_processor_metadata():
|
||||
"""Get metadata from plugin processors."""
|
||||
metadata = {}
|
||||
try:
|
||||
from changedetectionio.pluggy_interface import plugin_manager
|
||||
plugin_results = plugin_manager.hook.register_processor()
|
||||
|
||||
for result in plugin_results:
|
||||
if result and isinstance(result, dict):
|
||||
processor_name = result.get('processor_name')
|
||||
meta = result.get('metadata', {})
|
||||
if processor_name:
|
||||
metadata[processor_name] = meta
|
||||
except Exception as e:
|
||||
logger.warning(f"Error getting plugin processor metadata: {e}")
|
||||
return metadata
|
||||
|
||||
|
||||
def available_processors():
|
||||
"""
|
||||
Get a list of processors by name and description for the UI elements.
|
||||
Can be filtered via ALLOWED_PROCESSORS environment variable (comma-separated list).
|
||||
Can be filtered via DISABLED_PROCESSORS environment variable (comma-separated list).
|
||||
:return: A list :)
|
||||
"""
|
||||
|
||||
processor_classes = find_processors()
|
||||
|
||||
# Check if ALLOWED_PROCESSORS env var is set
|
||||
# For now we disable it, need to make a deploy with lots of new code and this will be an overload
|
||||
allowed_processors_env = os.getenv('ALLOWED_PROCESSORS', 'text_json_diff, restock_diff').strip()
|
||||
allowed_processors = None
|
||||
if allowed_processors_env:
|
||||
# Check if DISABLED_PROCESSORS env var is set
|
||||
disabled_processors_env = os.getenv('DISABLED_PROCESSORS', 'image_ssim_diff').strip()
|
||||
disabled_processors = []
|
||||
if disabled_processors_env:
|
||||
# Parse comma-separated list and strip whitespace
|
||||
allowed_processors = [p.strip() for p in allowed_processors_env.split(',') if p.strip()]
|
||||
logger.info(f"ALLOWED_PROCESSORS set, filtering to: {allowed_processors}")
|
||||
disabled_processors = [p.strip() for p in disabled_processors_env.split(',') if p.strip()]
|
||||
logger.info(f"DISABLED_PROCESSORS set, disabling: {disabled_processors}")
|
||||
|
||||
available = []
|
||||
plugin_metadata = get_plugin_processor_metadata()
|
||||
|
||||
for module, sub_package_name in processor_classes:
|
||||
# Filter by allowed processors if set
|
||||
if allowed_processors and sub_package_name not in allowed_processors:
|
||||
logger.debug(f"Skipping processor '{sub_package_name}' (not in ALLOWED_PROCESSORS)")
|
||||
# Skip disabled processors
|
||||
if sub_package_name in disabled_processors:
|
||||
logger.debug(f"Skipping processor '{sub_package_name}' (in DISABLED_PROCESSORS)")
|
||||
continue
|
||||
|
||||
# Try to get the 'name' attribute from the processor module first
|
||||
if hasattr(module, 'name'):
|
||||
description = gettext(module.name)
|
||||
# Check if this is a plugin processor
|
||||
if sub_package_name in plugin_metadata:
|
||||
meta = plugin_metadata[sub_package_name]
|
||||
description = gettext(meta.get('name', sub_package_name))
|
||||
# Plugin processors start from weight 10 to separate them from built-in processors
|
||||
weight = 100 + meta.get('processor_weight', 0)
|
||||
else:
|
||||
# Fall back to processor_description from parent module's __init__.py
|
||||
parent_module = get_parent_module(module)
|
||||
if parent_module and hasattr(parent_module, 'processor_description'):
|
||||
description = gettext(parent_module.processor_description)
|
||||
# Try to get the 'name' attribute from the processor module first
|
||||
if hasattr(module, 'name'):
|
||||
description = gettext(module.name)
|
||||
else:
|
||||
# Final fallback to a readable name
|
||||
description = sub_package_name.replace('_', ' ').title()
|
||||
# Fall back to processor_description from parent module's __init__.py
|
||||
parent_module = get_parent_module(module)
|
||||
if parent_module and hasattr(parent_module, 'processor_description'):
|
||||
description = gettext(parent_module.processor_description)
|
||||
else:
|
||||
# Final fallback to a readable name
|
||||
description = sub_package_name.replace('_', ' ').title()
|
||||
|
||||
# Get weight for sorting (lower weight = higher in list)
|
||||
weight = 0 # Default weight for processors without explicit weight
|
||||
# Get weight for sorting (lower weight = higher in list)
|
||||
weight = 0 # Default weight for processors without explicit weight
|
||||
|
||||
# Check processor module itself first
|
||||
if hasattr(module, 'processor_weight'):
|
||||
weight = module.processor_weight
|
||||
else:
|
||||
# Fall back to parent module (package __init__.py)
|
||||
parent_module = get_parent_module(module)
|
||||
if parent_module and hasattr(parent_module, 'processor_weight'):
|
||||
weight = parent_module.processor_weight
|
||||
# Check processor module itself first
|
||||
if hasattr(module, 'processor_weight'):
|
||||
weight = module.processor_weight
|
||||
else:
|
||||
# Fall back to parent module (package __init__.py)
|
||||
parent_module = get_parent_module(module)
|
||||
if parent_module and hasattr(parent_module, 'processor_weight'):
|
||||
weight = parent_module.processor_weight
|
||||
|
||||
available.append((sub_package_name, description, weight))
|
||||
|
||||
@@ -279,3 +380,76 @@ def get_processor_badge_css():
|
||||
|
||||
return '\n\n'.join(css_rules)
|
||||
|
||||
|
||||
def save_processor_config(datastore, watch_uuid, config_data):
|
||||
"""
|
||||
Save processor-specific configuration to JSON file.
|
||||
|
||||
This is a shared helper function used by both the UI edit form and API endpoints
|
||||
to consistently handle processor configuration storage.
|
||||
|
||||
Args:
|
||||
datastore: The application datastore instance
|
||||
watch_uuid: UUID of the watch
|
||||
config_data: Dictionary of configuration data to save (with processor_config_* prefix removed)
|
||||
|
||||
Returns:
|
||||
bool: True if saved successfully, False otherwise
|
||||
"""
|
||||
if not config_data:
|
||||
return True
|
||||
|
||||
try:
|
||||
from changedetectionio.processors.base import difference_detection_processor
|
||||
|
||||
# Get processor name from watch
|
||||
watch = datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
logger.error(f"Cannot save processor config: watch {watch_uuid} not found")
|
||||
return False
|
||||
|
||||
processor_name = watch.get('processor', 'text_json_diff')
|
||||
|
||||
# Create a processor instance to access config methods
|
||||
processor_instance = difference_detection_processor(datastore, watch_uuid)
|
||||
|
||||
# Use processor name as filename so each processor keeps its own config
|
||||
config_filename = f'{processor_name}.json'
|
||||
processor_instance.update_extra_watch_config(config_filename, config_data)
|
||||
|
||||
logger.debug(f"Saved processor config to {config_filename}: {config_data}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save processor config: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def extract_processor_config_from_form_data(form_data):
|
||||
"""
|
||||
Extract processor_config_* fields from form data and return separate dicts.
|
||||
|
||||
This is a shared helper function used by both the UI edit form and API endpoints
|
||||
to consistently handle processor configuration extraction.
|
||||
|
||||
IMPORTANT: This function modifies form_data in-place by removing processor_config_* fields.
|
||||
|
||||
Args:
|
||||
form_data: Dictionary of form data (will be modified in-place)
|
||||
|
||||
Returns:
|
||||
dict: Dictionary of processor config data (with processor_config_* prefix removed)
|
||||
"""
|
||||
processor_config_data = {}
|
||||
|
||||
# Use list() to create a copy of keys since we're modifying the dict
|
||||
for field_name in list(form_data.keys()):
|
||||
if field_name.startswith('processor_config_'):
|
||||
config_key = field_name.replace('processor_config_', '')
|
||||
# Save all values (including empty strings) to allow explicit clearing of settings
|
||||
processor_config_data[config_key] = form_data[field_name]
|
||||
# Remove from form_data to prevent it from reaching datastore
|
||||
del form_data[field_name]
|
||||
|
||||
return processor_config_data
|
||||
|
||||
|
||||
@@ -86,6 +86,7 @@ class RecheckPriorityQueue:
|
||||
|
||||
def get(self, block: bool = True, timeout: Optional[float] = None):
|
||||
"""Thread-safe sync get with priority ordering"""
|
||||
import queue
|
||||
try:
|
||||
# Wait for notification
|
||||
self.sync_q.get(block=block, timeout=timeout)
|
||||
@@ -103,8 +104,11 @@ class RecheckPriorityQueue:
|
||||
logger.debug(f"Successfully retrieved item: {self._get_item_uuid(item)}")
|
||||
return item
|
||||
|
||||
except queue.Empty:
|
||||
# Queue is empty with timeout - expected behavior, re-raise without logging
|
||||
raise
|
||||
except Exception as e:
|
||||
logger.critical(f"CRITICAL: Failed to get item from queue: {str(e)}")
|
||||
# Re-raise without logging - caller (worker) will handle and log appropriately
|
||||
raise
|
||||
|
||||
# ASYNC INTERFACE (for workers)
|
||||
|
||||
@@ -98,11 +98,12 @@ pytest -vv -s --maxfail=1 tests/test_rss.py
|
||||
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
||||
|
||||
# Try high concurrency
|
||||
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
||||
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
# Check file:// will pickup a file when enabled
|
||||
echo "Hello world" > /tmp/test-file.txt
|
||||
ALLOW_FILE_URI=yes pytest -vv -s tests/test_security.py
|
||||
|
||||
|
||||
|
||||
# Run it again so that brotli kicks in
|
||||
TEST_WITH_BROTLI=1 SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=100 FETCH_WORKERS=20 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
@@ -186,7 +186,7 @@ class ChangeDetectionStore:
|
||||
# Finally start the thread that will manage periodic data saves to JSON
|
||||
# Only start if thread is not already running (reload_state might be called multiple times)
|
||||
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore)
|
||||
self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
|
||||
self.save_data_thread.start()
|
||||
|
||||
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
||||
|
||||
@@ -17,6 +17,8 @@ _MAP = {
|
||||
|
||||
|
||||
def strtobool(value):
|
||||
if not value:
|
||||
return False
|
||||
try:
|
||||
return _MAP[str(value).lower()]
|
||||
except KeyError:
|
||||
|
||||
@@ -270,3 +270,6 @@ def app(request, datastore_path):
|
||||
|
||||
request.addfinalizer(teardown)
|
||||
yield app
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -206,11 +206,10 @@ def test_regex_error_handling(client, live_server, measure_memory_usage, datasto
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
time.sleep(0.2)
|
||||
### test regex error handling
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={"extract_text": '/something bad\d{3/XYZ',
|
||||
"url": test_url,
|
||||
"fetch_backend": "html_requests",
|
||||
|
||||
@@ -4,25 +4,47 @@ import time
|
||||
import os
|
||||
import json
|
||||
from flask import url_for
|
||||
from loguru import logger
|
||||
from .. import strtobool
|
||||
from .util import wait_for_all_checks, delete_all_watches
|
||||
from urllib.parse import urlparse, parse_qs
|
||||
import brotli
|
||||
|
||||
|
||||
def test_consistent_history(client, live_server, measure_memory_usage, datastore_path):
|
||||
# live_server_setup(live_server) # Setup on conftest per function
|
||||
workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
r = range(1, 10+workers)
|
||||
|
||||
for one in r:
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=str(one), _external=True)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuids = set()
|
||||
sys_fetch_workers = int(os.getenv("FETCH_WORKERS", 10))
|
||||
workers = range(1, sys_fetch_workers)
|
||||
now = time.time()
|
||||
|
||||
assert b"1 Imported" in res.data
|
||||
for one in workers:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
# A very long string that WILL trigger Brotli compression of the snapshot
|
||||
# BROTLI_COMPRESS_SIZE_THRESHOLD should be set to say 200
|
||||
from ..model.Watch import BROTLI_COMPRESS_SIZE_THRESHOLD
|
||||
content = str(one) + "x" + str(one) * (BROTLI_COMPRESS_SIZE_THRESHOLD + 10)
|
||||
else:
|
||||
# Just enough to test datastore
|
||||
content = str(one)+'x'
|
||||
|
||||
test_url = url_for('test_endpoint', content_type="text/html", content=content, _external=True)
|
||||
uuids.add(client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'title': str(one)}))
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
duration = time.time() - now
|
||||
per_worker = duration/sys_fetch_workers
|
||||
if sys_fetch_workers < 20:
|
||||
per_worker_threshold=0.6
|
||||
elif sys_fetch_workers < 50:
|
||||
per_worker_threshold = 0.8
|
||||
else:
|
||||
per_worker_threshold = 1.5
|
||||
|
||||
logger.debug(f"All fetched in {duration:.2f}s, {per_worker}s per worker")
|
||||
# Problematic on github
|
||||
#assert per_worker < per_worker_threshold, f"If concurrency is working good, no blocking async problems, each worker ({sys_fetch_workers} workers) should have done his job in under {per_worker_threshold}s, got {per_worker:.2f}s per worker, total duration was {duration:.2f}s"
|
||||
|
||||
# Essentially just triggers the DB write/update
|
||||
res = client.post(
|
||||
@@ -34,7 +56,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
|
||||
# Wait for the sync DB save to happen
|
||||
time.sleep(2)
|
||||
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
@@ -44,14 +66,18 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
json_obj = json.load(f)
|
||||
|
||||
# assert the right amount of watches was found in the JSON
|
||||
assert len(json_obj['watching']) == len(r), "Correct number of watches was found in the JSON"
|
||||
i=0
|
||||
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
|
||||
|
||||
i = 0
|
||||
# each one should have a history.txt containing just one line
|
||||
for w in json_obj['watching'].keys():
|
||||
i+=1
|
||||
i += 1
|
||||
history_txt_index_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, 'history.txt')
|
||||
assert os.path.isfile(history_txt_index_file), f"History.txt should exist where I expect it at {history_txt_index_file}"
|
||||
|
||||
# Should be no errors (could be from brotli etc)
|
||||
assert not live_server.app.config['DATASTORE'].data['watching'][w].get('last_error')
|
||||
|
||||
# Same like in model.Watch
|
||||
with open(history_txt_index_file, "r") as f:
|
||||
tmp_history = dict(i.strip().split(',', 2) for i in f.readlines())
|
||||
@@ -63,15 +89,21 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
# Find the snapshot one
|
||||
for fname in files_in_watch_dir:
|
||||
if fname != 'history.txt' and 'html' not in fname:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
|
||||
|
||||
full_snapshot_history_path = os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname)
|
||||
# contents should match what we requested as content returned from the test url
|
||||
with open(os.path.join(live_server.app.config['DATASTORE'].datastore_path, w, fname), 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
watch_url = json_obj['watching'][w]['url']
|
||||
u = urlparse(watch_url)
|
||||
q = parse_qs(u[4])
|
||||
assert q['content'][0] == contents.strip(), f"Snapshot file {fname} should contain {q['content'][0]}"
|
||||
|
||||
if fname.endswith('.br'):
|
||||
with open(full_snapshot_history_path, 'rb') as f:
|
||||
contents = brotli.decompress(f.read()).decode('utf-8')
|
||||
else:
|
||||
with open(full_snapshot_history_path, 'r') as snapshot_f:
|
||||
contents = snapshot_f.read()
|
||||
|
||||
watch_title = json_obj['watching'][w]['title']
|
||||
assert json_obj['watching'][w]['title'], "Watch should have a title set"
|
||||
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
|
||||
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
|
||||
|
||||
@@ -25,12 +25,13 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage,
|
||||
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": ''},
|
||||
follow_redirects=True
|
||||
)
|
||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
assert b'Queued 1 watch for rechecking.' in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
from .util import live_server_setup, wait_for_all_checks, wait_for_watch_history, extract_rss_token_from_UI, get_UUID_for_tag_name, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
@@ -87,6 +87,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
# Wait for initial checks to complete
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure initial snapshots are saved
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Watches did not save initial snapshots"
|
||||
|
||||
# Trigger a change
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
|
||||
@@ -94,6 +97,9 @@ def test_rss_group(client, live_server, measure_memory_usage, datastore_path):
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Ensure all watches have sufficient history for RSS generation
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "Watches did not accumulate sufficient history"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
assert rss_token is not None
|
||||
@@ -216,11 +222,13 @@ def test_rss_group_only_unviewed(client, live_server, measure_memory_usage, data
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=1, timeout=10), "Initial snapshots not saved"
|
||||
|
||||
# Trigger changes
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
assert wait_for_watch_history(client, min_history_count=2, timeout=10), "History not accumulated"
|
||||
|
||||
# Get RSS token
|
||||
rss_token = extract_rss_token_from_UI(client)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
from changedetectionio import html_tools
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import html_tools
|
||||
|
||||
# test generation guide.
|
||||
# 1. Do not include encoding in the xml declaration if the test object is a str type.
|
||||
|
||||
@@ -164,14 +164,45 @@ def wait_for_all_checks(client=None):
|
||||
if q_length == 0 and not any_workers_busy:
|
||||
if empty_since is None:
|
||||
empty_since = time.time()
|
||||
elif time.time() - empty_since >= 0.15: # Shorter wait
|
||||
# Brief stabilization period for async workers
|
||||
elif time.time() - empty_since >= 0.3:
|
||||
break
|
||||
else:
|
||||
empty_since = None
|
||||
|
||||
|
||||
attempt += 1
|
||||
time.sleep(0.3)
|
||||
|
||||
def wait_for_watch_history(client, min_history_count=2, timeout=10):
|
||||
"""
|
||||
Wait for watches to have sufficient history entries.
|
||||
Useful after wait_for_all_checks() when you need to ensure history is populated.
|
||||
|
||||
Args:
|
||||
client: Test client with access to datastore
|
||||
min_history_count: Minimum number of history entries required
|
||||
timeout: Maximum time to wait in seconds
|
||||
"""
|
||||
datastore = client.application.config.get('DATASTORE')
|
||||
start_time = time.time()
|
||||
|
||||
while time.time() - start_time < timeout:
|
||||
all_have_history = True
|
||||
for uuid, watch in datastore.data['watching'].items():
|
||||
history_count = len(watch.history.keys())
|
||||
if history_count < min_history_count:
|
||||
all_have_history = False
|
||||
break
|
||||
|
||||
if all_have_history:
|
||||
return True
|
||||
|
||||
time.sleep(0.2)
|
||||
|
||||
# Timeout - return False
|
||||
return False
|
||||
|
||||
|
||||
# Replaced by new_live_server_setup and calling per function scope in conftest.py
|
||||
def live_server_setup(live_server):
|
||||
return True
|
||||
@@ -189,6 +220,8 @@ def new_live_server_setup(live_server):
|
||||
|
||||
@live_server.app.route('/test-endpoint')
|
||||
def test_endpoint():
|
||||
from loguru import logger
|
||||
logger.debug(f"/test-endpoint hit {request}")
|
||||
ctype = request.args.get('content_type')
|
||||
status_code = request.args.get('status_code')
|
||||
content = request.args.get('content') or None
|
||||
|
||||
@@ -144,7 +144,6 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
|
||||
|
||||
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)
|
||||
four_o_four_url = four_o_four_url.replace('localhost.localdomain', 'cdio')
|
||||
four_o_four_url = four_o_four_url.replace('localhost', 'cdio')
|
||||
@@ -186,3 +185,65 @@ def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
def test_browsersteps_edit_UI_startsession(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
|
||||
|
||||
# Add a watch first
|
||||
test_url = url_for('test_interactive_html_endpoint', _external=True)
|
||||
test_url = test_url.replace('localhost.localdomain', 'cdio')
|
||||
test_url = test_url.replace('localhost', 'cdio')
|
||||
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'fetch_backend': 'html_webdriver', 'paused': True})
|
||||
|
||||
# Test starting a browsersteps session
|
||||
res = client.get(
|
||||
url_for("browser_steps.browsersteps_start_session", uuid=uuid),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
assert res.is_json
|
||||
json_data = res.get_json()
|
||||
assert 'browsersteps_session_id' in json_data
|
||||
assert json_data['browsersteps_session_id'] # Not empty
|
||||
|
||||
browsersteps_session_id = json_data['browsersteps_session_id']
|
||||
|
||||
# Verify the session exists in browsersteps_sessions
|
||||
from changedetectionio.blueprint.browser_steps import browsersteps_sessions, browsersteps_watch_to_session
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
assert browsersteps_watch_to_session[uuid] == browsersteps_session_id
|
||||
|
||||
# Verify browsersteps UI shows up on edit page
|
||||
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||
assert b'browsersteps-click-start' in res.data, "Browsersteps manual UI shows up"
|
||||
|
||||
# Session should still exist after GET (not cleaned up yet)
|
||||
assert browsersteps_session_id in browsersteps_sessions
|
||||
assert uuid in browsersteps_watch_to_session
|
||||
|
||||
# Test cleanup happens on save (POST)
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
'fetch_backend': "html_webdriver",
|
||||
"time_between_check_use_default": "y",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch" in res.data
|
||||
|
||||
# NOW verify the session was cleaned up after save
|
||||
assert browsersteps_session_id not in browsersteps_sessions
|
||||
assert uuid not in browsersteps_watch_to_session
|
||||
|
||||
# Cleanup
|
||||
client.get(
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
@@ -2,19 +2,18 @@
|
||||
Worker management module for changedetection.io
|
||||
|
||||
Handles asynchronous workers for dynamic worker scaling.
|
||||
Sync worker support has been removed in favor of async-only architecture.
|
||||
Each worker runs in its own thread with its own event loop for isolation.
|
||||
"""
|
||||
|
||||
import asyncio
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
from concurrent.futures import ThreadPoolExecutor
|
||||
from loguru import logger
|
||||
|
||||
# Global worker state
|
||||
running_async_tasks = []
|
||||
async_loop = None
|
||||
async_loop_thread = None
|
||||
# Global worker state - each worker has its own thread and event loop
|
||||
worker_threads = [] # List of WorkerThread objects
|
||||
|
||||
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
|
||||
currently_processing_uuids = {}
|
||||
@@ -22,89 +21,118 @@ currently_processing_uuids = {}
|
||||
# Configuration - async workers only
|
||||
USE_ASYNC_WORKERS = True
|
||||
|
||||
# Custom ThreadPoolExecutor for queue operations with named threads
|
||||
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
|
||||
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
|
||||
queue_executor = ThreadPoolExecutor(
|
||||
max_workers=_max_executor_workers,
|
||||
thread_name_prefix="QueueGetter-"
|
||||
)
|
||||
|
||||
def start_async_event_loop():
|
||||
"""Start a dedicated event loop for async workers in a separate thread"""
|
||||
global async_loop
|
||||
logger.info("Starting async event loop for workers")
|
||||
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
async_loop = asyncio.new_event_loop()
|
||||
# Set it as the event loop for this thread
|
||||
asyncio.set_event_loop(async_loop)
|
||||
|
||||
logger.debug(f"Event loop created and set: {async_loop}")
|
||||
|
||||
# Run the event loop forever
|
||||
async_loop.run_forever()
|
||||
except Exception as e:
|
||||
logger.error(f"Async event loop error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if async_loop and not async_loop.is_closed():
|
||||
async_loop.close()
|
||||
async_loop = None
|
||||
logger.info("Async event loop stopped")
|
||||
|
||||
class WorkerThread:
|
||||
"""Container for a worker thread with its own event loop"""
|
||||
def __init__(self, worker_id, update_q, notification_q, app, datastore):
|
||||
self.worker_id = worker_id
|
||||
self.update_q = update_q
|
||||
self.notification_q = notification_q
|
||||
self.app = app
|
||||
self.datastore = datastore
|
||||
self.thread = None
|
||||
self.loop = None
|
||||
self.running = False
|
||||
|
||||
def run(self):
|
||||
"""Run the worker in its own event loop"""
|
||||
try:
|
||||
# Create a new event loop for this thread
|
||||
self.loop = asyncio.new_event_loop()
|
||||
asyncio.set_event_loop(self.loop)
|
||||
self.running = True
|
||||
|
||||
# Run the worker coroutine
|
||||
self.loop.run_until_complete(
|
||||
start_single_async_worker(
|
||||
self.worker_id,
|
||||
self.update_q,
|
||||
self.notification_q,
|
||||
self.app,
|
||||
self.datastore,
|
||||
queue_executor
|
||||
)
|
||||
)
|
||||
except asyncio.CancelledError:
|
||||
# Normal shutdown - worker was cancelled
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
if not in_pytest:
|
||||
logger.info(f"Worker {self.worker_id} shutting down gracefully")
|
||||
except RuntimeError as e:
|
||||
# Ignore expected shutdown errors
|
||||
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
|
||||
logger.error(f"Worker {self.worker_id} runtime error: {e}")
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {self.worker_id} thread error: {e}")
|
||||
finally:
|
||||
# Clean up
|
||||
if self.loop and not self.loop.is_closed():
|
||||
self.loop.close()
|
||||
self.running = False
|
||||
self.loop = None
|
||||
|
||||
def start(self):
|
||||
"""Start the worker thread"""
|
||||
self.thread = threading.Thread(
|
||||
target=self.run,
|
||||
daemon=True,
|
||||
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
|
||||
)
|
||||
self.thread.start()
|
||||
|
||||
def stop(self):
|
||||
"""Stop the worker thread"""
|
||||
if self.loop and self.running:
|
||||
try:
|
||||
# Signal the loop to stop
|
||||
self.loop.call_soon_threadsafe(self.loop.stop)
|
||||
except RuntimeError:
|
||||
pass
|
||||
|
||||
if self.thread and self.thread.is_alive():
|
||||
self.thread.join(timeout=2.0)
|
||||
|
||||
|
||||
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
"""Start the async worker management system"""
|
||||
global async_loop_thread, async_loop, running_async_tasks, currently_processing_uuids
|
||||
|
||||
# Clear any stale UUID tracking state
|
||||
"""Start async workers, each with its own thread and event loop for isolation"""
|
||||
global worker_threads, currently_processing_uuids
|
||||
|
||||
# Clear any stale state
|
||||
currently_processing_uuids.clear()
|
||||
|
||||
# Start the event loop in a separate thread
|
||||
async_loop_thread = threading.Thread(target=start_async_event_loop, daemon=True)
|
||||
async_loop_thread.start()
|
||||
|
||||
# Wait for the loop to be available (with timeout for safety)
|
||||
max_wait_time = 5.0
|
||||
wait_start = time.time()
|
||||
while async_loop is None and (time.time() - wait_start) < max_wait_time:
|
||||
time.sleep(0.1)
|
||||
|
||||
if async_loop is None:
|
||||
logger.error("Failed to start async event loop within timeout")
|
||||
return
|
||||
|
||||
# Additional brief wait to ensure loop is running
|
||||
time.sleep(0.2)
|
||||
|
||||
# Start async workers
|
||||
logger.info(f"Starting {n_workers} async workers")
|
||||
|
||||
# Start each worker in its own thread with its own event loop
|
||||
logger.info(f"Starting {n_workers} async workers (isolated threads)")
|
||||
for i in range(n_workers):
|
||||
try:
|
||||
# Use a factory function to create named worker coroutines
|
||||
def create_named_worker(worker_id):
|
||||
async def named_worker():
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
return await start_single_async_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
return named_worker()
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(create_named_worker(i), async_loop)
|
||||
running_async_tasks.append(task_future)
|
||||
except RuntimeError as e:
|
||||
worker = WorkerThread(i, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
# No sleep needed - threads start independently and asynchronously
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to start async worker {i}: {e}")
|
||||
continue
|
||||
|
||||
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore):
|
||||
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
|
||||
"""Start a single async worker with auto-restart capability"""
|
||||
from changedetectionio.async_update_worker import async_update_worker
|
||||
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
if not in_pytest:
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore)
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||
# If we reach here, worker exited cleanly
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||
@@ -131,39 +159,38 @@ def start_workers(n_workers, update_q, notification_q, app, datastore):
|
||||
|
||||
def add_worker(update_q, notification_q, app, datastore):
|
||||
"""Add a new async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not async_loop:
|
||||
logger.error("Async loop not running, cannot add worker")
|
||||
return False
|
||||
|
||||
worker_id = len(running_async_tasks)
|
||||
global worker_threads
|
||||
|
||||
worker_id = len(worker_threads)
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore), async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
return True
|
||||
|
||||
try:
|
||||
worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
|
||||
worker.start()
|
||||
worker_threads.append(worker)
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add worker {worker_id}: {e}")
|
||||
return False
|
||||
|
||||
|
||||
def remove_worker():
|
||||
"""Remove an async worker (for dynamic scaling)"""
|
||||
global running_async_tasks
|
||||
|
||||
if not running_async_tasks:
|
||||
global worker_threads
|
||||
|
||||
if not worker_threads:
|
||||
return False
|
||||
|
||||
# Cancel the last worker
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
logger.info(f"Removed async worker, {len(running_async_tasks)} workers remaining")
|
||||
|
||||
# Stop the last worker
|
||||
worker = worker_threads.pop()
|
||||
worker.stop()
|
||||
logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
|
||||
return True
|
||||
|
||||
|
||||
def get_worker_count():
|
||||
"""Get current number of async workers"""
|
||||
return len(running_async_tasks)
|
||||
return len(worker_threads)
|
||||
|
||||
|
||||
def get_running_uuids():
|
||||
@@ -249,38 +276,21 @@ def queue_item_async_safe(update_q, item, silent=False):
|
||||
|
||||
def shutdown_workers():
|
||||
"""Shutdown all async workers fast and aggressively"""
|
||||
global async_loop, async_loop_thread, running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import os
|
||||
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Fast shutdown of async workers initiated...")
|
||||
|
||||
# Cancel all async tasks immediately
|
||||
for task_future in running_async_tasks:
|
||||
if not task_future.done():
|
||||
task_future.cancel()
|
||||
|
||||
# Stop the async event loop immediately
|
||||
if async_loop and not async_loop.is_closed():
|
||||
try:
|
||||
async_loop.call_soon_threadsafe(async_loop.stop)
|
||||
except RuntimeError:
|
||||
# Loop might already be stopped
|
||||
pass
|
||||
|
||||
running_async_tasks.clear()
|
||||
async_loop = None
|
||||
|
||||
# Give async thread minimal time to finish, then continue
|
||||
if async_loop_thread and async_loop_thread.is_alive():
|
||||
async_loop_thread.join(timeout=1.0) # Only 1 second timeout
|
||||
if async_loop_thread.is_alive() and not in_pytest:
|
||||
logger.info("Async thread still running after timeout - continuing with shutdown")
|
||||
async_loop_thread = None
|
||||
|
||||
|
||||
# Stop all worker threads
|
||||
for worker in worker_threads:
|
||||
worker.stop()
|
||||
|
||||
worker_threads.clear()
|
||||
|
||||
if not in_pytest:
|
||||
logger.info("Async workers fast shutdown complete")
|
||||
|
||||
@@ -290,69 +300,57 @@ def shutdown_workers():
|
||||
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Dynamically adjust the number of async workers.
|
||||
|
||||
|
||||
Args:
|
||||
new_count: Target number of workers
|
||||
update_q, notification_q, app, datastore: Required for adding new workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Status of the adjustment operation
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
|
||||
if new_count == current_count:
|
||||
return {
|
||||
'status': 'no_change',
|
||||
'message': f'Worker count already at {current_count}',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
if new_count > current_count:
|
||||
# Add workers
|
||||
workers_to_add = new_count - current_count
|
||||
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
if not all([update_q, notification_q, app, datastore]):
|
||||
return {
|
||||
'status': 'error',
|
||||
'message': 'Missing required parameters to add workers',
|
||||
'current_count': current_count
|
||||
}
|
||||
|
||||
|
||||
for i in range(workers_to_add):
|
||||
worker_id = len(running_async_tasks)
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
|
||||
add_worker(update_q, notification_q, app, datastore)
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Added {workers_to_add} workers',
|
||||
'previous_count': current_count,
|
||||
'current_count': new_count
|
||||
'current_count': len(worker_threads)
|
||||
}
|
||||
|
||||
|
||||
else:
|
||||
# Remove workers
|
||||
workers_to_remove = current_count - new_count
|
||||
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
|
||||
|
||||
|
||||
removed_count = 0
|
||||
for _ in range(workers_to_remove):
|
||||
if running_async_tasks:
|
||||
task_future = running_async_tasks.pop()
|
||||
task_future.cancel()
|
||||
# Wait for the task to actually stop
|
||||
try:
|
||||
task_future.result(timeout=5) # 5 second timeout
|
||||
except Exception:
|
||||
pass # Task was cancelled, which is expected
|
||||
if remove_worker():
|
||||
removed_count += 1
|
||||
|
||||
|
||||
return {
|
||||
'status': 'success',
|
||||
'message': f'Removed {removed_count} workers',
|
||||
@@ -367,72 +365,58 @@ def get_worker_status():
|
||||
'worker_type': 'async',
|
||||
'worker_count': get_worker_count(),
|
||||
'running_uuids': get_running_uuids(),
|
||||
'async_loop_running': async_loop is not None,
|
||||
'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
|
||||
}
|
||||
|
||||
|
||||
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
|
||||
"""
|
||||
Check if the expected number of async workers are running and restart any missing ones.
|
||||
|
||||
|
||||
Args:
|
||||
expected_count: Expected number of workers
|
||||
update_q, notification_q, app, datastore: Required for restarting workers
|
||||
|
||||
|
||||
Returns:
|
||||
dict: Health check results
|
||||
"""
|
||||
global running_async_tasks
|
||||
|
||||
global worker_threads
|
||||
|
||||
current_count = get_worker_count()
|
||||
|
||||
if current_count == expected_count:
|
||||
|
||||
# Check which workers are actually alive
|
||||
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
|
||||
|
||||
if alive_count == expected_count:
|
||||
return {
|
||||
'status': 'healthy',
|
||||
'expected_count': expected_count,
|
||||
'actual_count': current_count,
|
||||
'actual_count': alive_count,
|
||||
'message': f'All {expected_count} async workers running'
|
||||
}
|
||||
|
||||
# Check for crashed async workers
|
||||
|
||||
# Find dead workers
|
||||
dead_workers = []
|
||||
alive_count = 0
|
||||
|
||||
for i, task_future in enumerate(running_async_tasks[:]):
|
||||
if task_future.done():
|
||||
try:
|
||||
result = task_future.result()
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {i} completed unexpectedly")
|
||||
except Exception as e:
|
||||
dead_workers.append(i)
|
||||
logger.error(f"Async worker {i} crashed: {e}")
|
||||
else:
|
||||
alive_count += 1
|
||||
|
||||
for i, worker in enumerate(worker_threads[:]):
|
||||
if not worker.thread or not worker.thread.is_alive():
|
||||
dead_workers.append(i)
|
||||
logger.warning(f"Async worker {worker.worker_id} thread is dead")
|
||||
|
||||
# Remove dead workers from tracking
|
||||
for i in reversed(dead_workers):
|
||||
if i < len(running_async_tasks):
|
||||
running_async_tasks.pop(i)
|
||||
|
||||
if i < len(worker_threads):
|
||||
worker_threads.pop(i)
|
||||
|
||||
missing_workers = expected_count - alive_count
|
||||
restarted_count = 0
|
||||
|
||||
|
||||
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
|
||||
logger.info(f"Restarting {missing_workers} crashed async workers")
|
||||
|
||||
|
||||
for i in range(missing_workers):
|
||||
worker_id = alive_count + i
|
||||
try:
|
||||
task_future = asyncio.run_coroutine_threadsafe(
|
||||
start_single_async_worker(worker_id, update_q, notification_q, app, datastore),
|
||||
async_loop
|
||||
)
|
||||
running_async_tasks.append(task_future)
|
||||
if add_worker(update_q, notification_q, app, datastore):
|
||||
restarted_count += 1
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to restart worker {worker_id}: {e}")
|
||||
|
||||
|
||||
return {
|
||||
'status': 'repaired' if restarted_count > 0 else 'degraded',
|
||||
'expected_count': expected_count,
|
||||
|
||||
@@ -16,6 +16,13 @@ services:
|
||||
# Log output levels: TRACE, DEBUG(default), INFO, SUCCESS, WARNING, ERROR, CRITICAL
|
||||
# - LOGGER_LEVEL=TRACE
|
||||
#
|
||||
# Install additional Python packages (processor plugins, etc.)
|
||||
# Packages are installed at container startup and cached to avoid reinstalling on every restart
|
||||
# Example: Install the OSINT reconnaissance processor plugin
|
||||
# - EXTRA_PACKAGES=changedetection-osint-processor
|
||||
# Multiple packages can be installed by separating with spaces:
|
||||
# - EXTRA_PACKAGES=changedetection-osint-processor another-plugin
|
||||
#
|
||||
#
|
||||
# Uncomment below and the "sockpuppetbrowser" to use a real Chrome browser (It uses the "playwright" protocol)
|
||||
# - PLAYWRIGHT_DRIVER_URL=ws://browser-sockpuppet-chrome:3000
|
||||
|
||||
28
docker-entrypoint.sh
Executable file
28
docker-entrypoint.sh
Executable file
@@ -0,0 +1,28 @@
|
||||
#!/bin/bash
|
||||
set -e
|
||||
|
||||
# Install additional packages from EXTRA_PACKAGES env var
|
||||
# Uses a marker file to avoid reinstalling on every container restart
|
||||
INSTALLED_MARKER="/datastore/.extra_packages_installed"
|
||||
CURRENT_PACKAGES="$EXTRA_PACKAGES"
|
||||
|
||||
if [ -n "$EXTRA_PACKAGES" ]; then
|
||||
# Check if we need to install/update packages
|
||||
if [ ! -f "$INSTALLED_MARKER" ] || [ "$(cat $INSTALLED_MARKER 2>/dev/null)" != "$CURRENT_PACKAGES" ]; then
|
||||
echo "Installing extra packages: $EXTRA_PACKAGES"
|
||||
pip3 install --no-cache-dir $EXTRA_PACKAGES
|
||||
|
||||
if [ $? -eq 0 ]; then
|
||||
echo "$CURRENT_PACKAGES" > "$INSTALLED_MARKER"
|
||||
echo "Extra packages installed successfully"
|
||||
else
|
||||
echo "ERROR: Failed to install extra packages"
|
||||
exit 1
|
||||
fi
|
||||
else
|
||||
echo "Extra packages already installed: $EXTRA_PACKAGES"
|
||||
fi
|
||||
fi
|
||||
|
||||
# Execute the main command
|
||||
exec "$@"
|
||||
@@ -51,9 +51,9 @@ linkify-it-py
|
||||
|
||||
# - Needed for apprise/spush, and maybe others? hopefully doesnt trigger a rust compile.
|
||||
# - Requires extra wheel for rPi, adds build time for arm/v8 which is not in piwheels
|
||||
# Pinned to 43.0.1 for ARM compatibility (45.x may not have pre-built ARM wheels)
|
||||
# Pinned to 44.x for ARM compatibility and sslyze compatibility (sslyze requires <45)
|
||||
# Also pinned because dependabot wants specific versions
|
||||
cryptography==46.0.3
|
||||
cryptography==44.0.0
|
||||
|
||||
# apprise mqtt https://github.com/dgtlmoon/changedetection.io/issues/315
|
||||
# use any version other than 2.0.x due to https://github.com/eclipse/paho.mqtt.python/issues/814
|
||||
|
||||
Reference in New Issue
Block a user