Compare commits

..

6 Commits

Author SHA1 Message Date
dgtlmoon
e1b048f961 oops 2026-02-18 17:52:57 +01:00
dgtlmoon
9ba645d4cc Do it threaded 2026-02-18 17:39:09 +01:00
dgtlmoon
e6c0d538e6 oops forgot module 2026-02-18 17:34:01 +01:00
dgtlmoon
e2fffc36e4 Small tidy 2026-02-18 17:32:16 +01:00
dgtlmoon
b9a2f781ac Backups was missing tags 2026-02-18 17:29:45 +01:00
dgtlmoon
76abb4ab71 WIP 2026-02-18 17:26:44 +01:00
27 changed files with 71 additions and 2051 deletions

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.53.7'
__version__ = '0.53.4'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -45,7 +45,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
extra_notification_tokens=datastore.get_unique_notification_tokens_available()
)
# Remove the last option 'System default'
form.application.form.notification_format.choices.pop()
@@ -130,12 +129,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Instantiate plugin form with POST data
plugin_form = form_class(formdata=request.form)
# Save plugin settings — use plugin's own save_fn if provided
# (allows plugins to strip ephemeral staging fields etc.)
save_fn = tab.get('save_fn')
if save_fn:
save_fn(datastore, plugin_form)
elif plugin_form.data:
# Save plugin settings (validation is optional for plugins)
if plugin_form.data:
save_plugin_settings(datastore.datastore_path, plugin_id, plugin_form.data)
flash(gettext("Settings updated."))

View File

@@ -27,7 +27,6 @@
<li class="tab"><a href="#rss">{{ _('RSS') }}</a></li>
<li class="tab"><a href="{{ url_for('backups.create') }}">{{ _('Backups') }}</a></li>
<li class="tab"><a href="#timedate">{{ _('Time & Date') }}</a></li>
<li class="tab"><a href="#proxies">{{ _('CAPTCHA & Proxies') }}</a></li>
{% if plugin_tabs %}
{% for tab in plugin_tabs %}
@@ -309,7 +308,6 @@ nav
</div>
</div>
<div class="tab-pane-inner" id="proxies">
<div id="recommended-proxy">
<div>

View File

@@ -116,11 +116,11 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids:
watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=None):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData, llm_summary_q=llm_summary_q)
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData)
ui_blueprint.register_blueprint(edit_blueprint)
# Register the notification blueprint

View File

@@ -11,7 +11,7 @@ from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.time_handler import is_within_schedule
from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, llm_summary_q=None):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
def _watch_has_tag_options_set(watch):
@@ -404,47 +404,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
download_name=filename,
mimetype='application/zip')
@edit_blueprint.route("/edit/<string:uuid>/regenerate-llm-summaries", methods=['GET'])
@login_optionally_required
def watch_regenerate_llm_summaries(uuid):
"""Queue LLM summary generation for all history entries that don't yet have one."""
from flask import flash
from changedetectionio.llm.tokens import is_llm_data_ready
watch = datastore.data['watching'].get(uuid)
if not watch:
abort(404)
if not llm_summary_q:
flash(gettext("LLM summarisation is not configured."), 'error')
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid))
history = watch.history
history_keys = list(history.keys())
queued = 0
# Skip the first entry — there is no prior snapshot to diff against
for timestamp in history_keys[1:]:
snapshot_fname = history[timestamp]
snapshot_id = os.path.basename(snapshot_fname).split('.')[0] # always 32-char MD5
# Skip entries that already have a summary
if is_llm_data_ready(watch.data_dir, snapshot_id):
continue
llm_summary_q.put({
'uuid': uuid,
'snapshot_id': snapshot_id,
'attempts': 0,
})
queued += 1
if queued:
flash(gettext("Queued %(count)d LLM summaries for generation.", count=queued), 'success')
else:
flash(gettext("All history entries already have LLM summaries."), 'notice')
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid) + '#info')
# Ajax callback
@edit_blueprint.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])
@login_optionally_required

View File

@@ -489,9 +489,6 @@ Math: {{ 1 + 1 }}") }}
<p>
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Download watch data package') }}</a>
{% if watch.history_n > 1 %}
<a href="{{url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid)}}" class="pure-button button-small">{{ _('Regenerate LLM summaries') }}</a>
{% endif %}
</p>
{% endif %}

View File

@@ -15,7 +15,6 @@ from changedetectionio.strtobool import strtobool
from threading import Event
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
from changedetectionio import worker_pool
import changedetectionio.llm as llm
from flask import (
Flask,
@@ -28,6 +27,7 @@ from flask import (
session,
url_for,
)
from flask_compress import Compress as FlaskCompress
from flask_restful import abort, Api
from flask_cors import CORS
@@ -57,7 +57,6 @@ extra_stylesheets = []
# Use bulletproof janus-based queues for sync/async reliability
update_q = RecheckPriorityQueue()
notification_q = NotificationQueue()
llm_summary_q = llm.create_queue()
MAX_QUEUE_SIZE = 5000
app = Flask(__name__,
@@ -75,14 +74,14 @@ CORS(app)
# There's also a bug between flask compress and socketio that causes some kind of slow memory leak
# It's better to use compression on your reverse proxy (nginx etc) instead.
if strtobool(os.getenv("FLASK_ENABLE_COMPRESSION")):
from flask_compress import Compress as FlaskCompress
app.config['COMPRESS_MIN_SIZE'] = 2096
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml']
# Use gzip only - smaller memory footprint than zstd/brotli (4-8KB vs 200-500KB contexts)
app.config['COMPRESS_ALGORITHM'] = ['gzip']
compress = FlaskCompress()
compress.init_app(app)
compress = FlaskCompress()
compress.init_app(app)
app.config['TEMPLATES_AUTO_RELOAD'] = False
@@ -853,7 +852,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=llm_summary_q))
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
@@ -976,17 +975,6 @@ def changedetection_app(config=None, datastore_o=None):
).start()
logger.info(f"Started {notification_workers} notification worker(s)")
llm.start_workers(app=app, datastore=datastore, llm_q=llm_summary_q,
n_workers=int(os.getenv("LLM_WORKERS", "1")))
# Register the LLM queue plugin so changes trigger summary jobs
from changedetectionio.llm.plugin import LLMQueuePlugin
from changedetectionio.pluggy_interface import plugin_manager
plugin_manager.register(LLMQueuePlugin(llm_summary_q), 'llm_queue_plugin')
# Re-run template path configuration now that all plugins (including LLM) are registered
_configure_plugin_templates()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
@@ -1041,65 +1029,19 @@ def notification_runner(worker_id=0):
else:
# ── LLM deferred-send gate ─────────────────────────────────────────
# If the notification was re-queued to wait for LLM data, honour the
# scheduled retry time before doing any further processing.
_llm_next_retry = n_object.get('_llm_next_retry_at', 0)
if _llm_next_retry and _llm_next_retry > time.time():
notification_q.put(n_object)
app.config.exit.wait(min(_llm_next_retry - time.time(), 2))
continue
# Apply system-config fallbacks first so we can scan the final body/title.
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
# If the body or title references llm_* tokens, wait until LLM data is ready.
import re as _re
_llm_scan = (n_object.get('notification_body') or '') + ' ' + (n_object.get('notification_title') or '')
if _re.search(r'\bllm_(?:summary|headline|importance|sentiment|one_liner)\b', _llm_scan):
from changedetectionio.llm.tokens import (
is_llm_data_ready, read_llm_tokens,
LLM_NOTIFICATION_RETRY_DELAY_SECONDS, LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS,
)
_llm_uuid = n_object.get('uuid')
_llm_watch = datastore.data['watching'].get(_llm_uuid) if _llm_uuid else None
_llm_snap_id = n_object.get('_llm_snapshot_id')
if _llm_watch and _llm_snap_id and not is_llm_data_ready(_llm_watch.data_dir, _llm_snap_id):
_llm_attempts = n_object.get('_llm_wait_attempts', 0)
if _llm_attempts < LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS:
n_object['_llm_wait_attempts'] = _llm_attempts + 1
n_object['_llm_next_retry_at'] = time.time() + LLM_NOTIFICATION_RETRY_DELAY_SECONDS
notification_q.put(n_object)
logger.debug(
f"Notification gate: LLM data pending for {_llm_uuid} "
f"(attempt {n_object['_llm_wait_attempts']}/{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS})"
)
continue
else:
logger.warning(
f"Notification: LLM data never arrived for {_llm_uuid} after "
f"{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS} attempts — sending without LLM tokens"
)
elif _llm_watch and _llm_snap_id:
# Data is ready — populate the LLM tokens into n_object
_llm_data = read_llm_tokens(_llm_watch.data_dir, _llm_snap_id)
n_object['llm_summary'] = _llm_data.get('summary', '')
n_object['llm_headline'] = _llm_data.get('headline', '')
n_object['llm_importance'] = _llm_data.get('importance')
n_object['llm_sentiment'] = _llm_data.get('sentiment', '')
n_object['llm_one_liner'] = _llm_data.get('one_liner', '')
# ── end LLM gate ───────────────────────────────────────────────────
now = datetime.now()
sent_obj = None
try:
from changedetectionio.notification.handler import process_notification
# Fallback to system config if not set
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
if not n_object.get('notification_format') and datastore.data['settings']['application'].get('notification_format'):
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
if n_object.get('notification_urls', {}):

View File

@@ -561,33 +561,31 @@ def html_to_text(html_content: str, render_anchor_tag_content=False, is_rss=Fals
)
else:
parser_config = None
if is_rss:
html_content = re.sub(r'<title([\s>])', r'<h1\1', html_content)
html_content = re.sub(r'</title>', r'</h1>', html_content)
else:
# Use BS4 html.parser to strip bloat — SPA's often dump 10MB+ of CSS/JS into <head>,
# causing inscriptis to silently give up. Regex-based stripping is unsafe because tags
# can appear inside JSON data attributes with JS-escaped closing tags (e.g. <\/script>),
# causing the regex to scan past the intended close and eat real page content.
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, 'html.parser')
# Strip tags that inscriptis cannot render as meaningful text and which can be very large.
# svg/math: produce path-data/MathML garbage; canvas/iframe/template: no inscriptis handlers.
# video/audio/picture are kept — they may contain meaningful fallback text or captions.
for tag in soup.find_all(['head', 'script', 'style', 'noscript', 'svg',
'math', 'canvas', 'iframe', 'template']):
tag.decompose()
# Strip bloat in one pass, SPA's often dump 10Mb+ into the <head> for styles, which is not needed
# Causing inscriptis to silently exit when more than ~10MB is found.
# All we are doing here is converting the HTML to text, no CSS layout etc
# Use backreference (\1) to ensure opening/closing tags match (prevents <style> matching </svg> in CSS data URIs)
html_content = re.sub(r'<(style|script|svg|noscript)[^>]*>.*?</\1>|<(?:link|meta)[^>]*/?>|<!--.*?-->',
'', html_content, flags=re.DOTALL | re.IGNORECASE)
# SPAs often use <body style="display:none"> to hide content until JS loads.
# inscriptis respects CSS display rules, so strip hiding styles from the body tag.
body_tag = soup.find('body')
if body_tag and body_tag.get('style'):
style = body_tag['style']
if re.search(r'\b(?:display\s*:\s*none|visibility\s*:\s*hidden)\b', style, re.IGNORECASE):
logger.debug(f"html_to_text: Removing hiding styles from body tag (found: '{style}')")
del body_tag['style']
# SPAs often use <body style="display:none"> to hide content until JS loads
# inscriptis respects CSS display rules, so we need to remove these hiding styles
# to extract the actual page content
body_style_pattern = r'(<body[^>]*)\s+style\s*=\s*["\']([^"\']*\b(?:display\s*:\s*none|visibility\s*:\s*hidden)\b[^"\']*)["\']'
# Check if body has hiding styles that need to be fixed
body_match = re.search(body_style_pattern, html_content, flags=re.IGNORECASE)
if body_match:
from loguru import logger
logger.debug(f"html_to_text: Removing hiding styles from body tag (found: '{body_match.group(2)}')")
html_content = re.sub(body_style_pattern, r'\1', html_content, flags=re.IGNORECASE)
html_content = str(soup)
text_content = get_text(html_content, config=parser_config)
return text_content

View File

@@ -1,64 +0,0 @@
"""
changedetectionio.llm
~~~~~~~~~~~~~~~~~~~~~
LLM summary queue and workers.
Usage in flask_app.py
---------------------
import changedetectionio.llm as llm
# At module level alongside notification_q:
llm_summary_q = llm.create_queue()
# Inside changedetection_app(), after datastore is ready:
llm.start_workers(
app=app,
datastore=datastore,
llm_q=llm_summary_q,
n_workers=int(os.getenv("LLM_WORKERS", "1")),
)
Enqueueing a summary job (e.g. from the pluggy update_finalize hook)
---------------------------------------------------------------------
if changed_detected and not processing_exception:
llm_summary_q.put({
'uuid': watch_uuid,
'snapshot_id': snapshot_id,
'attempts': 0,
})
"""
import queue
import threading
from loguru import logger
def create_queue() -> queue.Queue:
"""Return a plain Queue for LLM summary jobs. No maxsize — jobs are small dicts."""
return queue.Queue()
def start_workers(app, datastore, llm_q: queue.Queue, n_workers: int = 1) -> None:
"""
Start N LLM summary worker threads.
Args:
app: Flask application instance (for app_context and exit event)
datastore: Application datastore
llm_q: Queue returned by create_queue()
n_workers: Number of parallel workers (default 1; increase for local Ollama)
"""
from changedetectionio.llm.queue_worker import llm_summary_runner
for i in range(n_workers):
threading.Thread(
target=llm_summary_runner,
args=(i, app, datastore, llm_q),
daemon=True,
name=f"LLMSummaryWorker-{i}",
).start()
logger.info(f"Started {n_workers} LLM summary worker(s)")

View File

@@ -1,104 +0,0 @@
"""
LLM plugin — provides settings tab and enqueues summary jobs on change detection.
Registered with the pluggy plugin manager at startup (flask_app.py).
The worker (llm/queue_worker.py) drains the queue asynchronously.
"""
from loguru import logger
from changedetectionio.pluggy_interface import hookimpl
def get_llm_settings(datastore):
"""Load LLM plugin settings with fallback to legacy datastore settings.
Tries the plugin settings file (llm.json) first.
Falls back to the old storage location in datastore.data['settings']['application']
for users upgrading from a version before LLM became a first-class plugin.
"""
from changedetectionio.pluggy_interface import load_plugin_settings
settings = load_plugin_settings(datastore.datastore_path, 'llm')
if settings.get('llm_connection') is not None:
return settings
# Legacy fallback: settings were stored in datastore application settings
app_settings = datastore.data['settings']['application']
connections_dict = app_settings.get('llm_connections') or {}
connections_list = [
{
'connection_id': k,
'name': v.get('name', ''),
'model': v.get('model', ''),
'api_key': v.get('api_key', ''),
'api_base': v.get('api_base', ''),
'tokens_per_minute': int(v.get('tokens_per_minute', 0) or 0),
'is_default': bool(v.get('is_default', False)),
}
for k, v in connections_dict.items()
]
return {
'llm_connection': connections_list,
'llm_summary_prompt': app_settings.get('llm_summary_prompt', ''),
}
def save_llm_settings(datastore, plugin_form):
"""Custom save handler — strips the ephemeral new_connection staging fields
so they are never persisted to llm.json."""
from changedetectionio.pluggy_interface import save_plugin_settings
data = {
'llm_connection': plugin_form.llm_connection.data,
'llm_summary_prompt': plugin_form.llm_summary_prompt.data or '',
'llm_diff_context_lines': plugin_form.llm_diff_context_lines.data or 2,
}
save_plugin_settings(datastore.datastore_path, 'llm', data)
class LLMQueuePlugin:
"""Enqueues LLM summary jobs on successful change detection and provides settings tab."""
def __init__(self, llm_q):
self.llm_q = llm_q
@hookimpl
def plugin_settings_tab(self):
from changedetectionio.llm.settings_form import LLMSettingsForm
return {
'plugin_id': 'llm',
'tab_label': 'LLM',
'form_class': LLMSettingsForm,
'template_path': 'settings-llm.html',
'save_fn': save_llm_settings,
}
@hookimpl
def update_finalize(self, update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
"""Queue an LLM summary job when a change was successfully detected."""
if not changed_detected or processing_exception or not snapshot_id:
return
if watch is None:
return
# Need ≥2 history entries — first entry has nothing to diff against
if watch.history_n < 2:
return
# Only queue when at least one LLM connection is configured
llm_settings = get_llm_settings(datastore)
has_connection = bool(
llm_settings.get('llm_connection')
or datastore.data['settings']['application'].get('llm_api_key') # legacy
or datastore.data['settings']['application'].get('llm_model') # legacy
or watch.get('llm_api_key')
or watch.get('llm_model')
)
if not has_connection:
return
uuid = watch.get('uuid')
self.llm_q.put({'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0})
logger.debug(f"LLM: queued summary for uuid={uuid} snapshot={snapshot_id}")

View File

@@ -1,544 +0,0 @@
import fcntl
import os
import queue
import re
import threading
import time
from datetime import datetime, timezone
from loguru import logger
from changedetectionio.llm.tokens import (
STRUCTURED_OUTPUT_INSTRUCTION,
parse_llm_response,
write_llm_data,
)
MAX_RETRIES = 5
RETRY_BACKOFF_BASE_SECONDS = 60 # 1m, 2m, 4m, 8m, 16m
# Token thresholds that control which summarisation strategy is used.
# Small diffs: single-pass summarise.
# Larger diffs: two-pass (enumerate all changes first, then compress).
# Very large diffs: map-reduce (chunk → enumerate per chunk → final synthesis).
TOKEN_SINGLE_PASS_THRESHOLD = 5000 # below this: one call
TOKEN_TWO_PASS_THRESHOLD = 15000 # below this: enumerate then summarise
TOKEN_CHUNK_SIZE = 5000 # tokens per map-reduce chunk
# ---------------------------------------------------------------------------
# Proactive token-bucket rate limiter — shared across all workers in process
# ---------------------------------------------------------------------------
class _RateLimitWait(Exception):
"""Raised when the bucket is empty; worker re-queues without incrementing attempts."""
def __init__(self, wait_seconds):
self.wait_seconds = wait_seconds
super().__init__(f"Rate limit: wait {wait_seconds:.1f}s")
class _TokenBucket:
"""Thread-safe continuous token bucket. tpm=0 means unlimited."""
def __init__(self, tpm):
self._lock = threading.Lock()
self._tpm = tpm
self._tokens = float(tpm) # start full
self._last_ts = time.monotonic()
def try_consume(self, n):
"""Consume n tokens. Returns (True, 0.0) on success or (False, wait_secs) if dry."""
if self._tpm == 0:
return True, 0.0
with self._lock:
now = time.monotonic()
elapsed = now - self._last_ts
self._tokens = min(self._tpm, self._tokens + elapsed * (self._tpm / 60.0))
self._last_ts = now
if self._tokens >= n:
self._tokens -= n
return True, 0.0
deficit = n - self._tokens
return False, deficit / (self._tpm / 60.0)
_rate_buckets = {}
_rate_buckets_lock = threading.Lock()
def _get_rate_bucket(conn_id, tpm):
"""Return (or lazily create) the shared _TokenBucket for this connection."""
with _rate_buckets_lock:
if conn_id not in _rate_buckets:
_rate_buckets[conn_id] = _TokenBucket(int(tpm or 0))
return _rate_buckets[conn_id]
def _parse_retry_after(exc):
"""Extract a retry-after delay (seconds) from a litellm RateLimitError."""
if hasattr(exc, 'retry_after') and exc.retry_after:
try:
return float(exc.retry_after)
except (TypeError, ValueError):
pass
m = re.search(r'(?:try again in|retry after)\s*([\d.]+)\s*s', str(exc), re.IGNORECASE)
return float(m.group(1)) + 1.0 if m else 60.0
def _read_snapshot(watch, snapshot_fname):
"""Read a snapshot file from disk, handling plain text and brotli compression."""
path = os.path.join(watch.data_dir, snapshot_fname)
if snapshot_fname.endswith('.br'):
import brotli
with open(path, 'rb') as f:
return brotli.decompress(f.read()).decode('utf-8', errors='replace')
else:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
return f.read()
def _append_llm_log(log_path, model, sent_tokens, recv_tokens, elapsed_ms):
"""Append one line to the datastore-level LLM activity log.
Line format (tab-separated, LF terminated):
ISO-8601-UTC fetched LLM via <model> sent=<N> recv=<N> ms=<N>
The file is flock-locked for the duration of the write so concurrent
workers don't interleave lines.
"""
ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # ms precision
line = f"{ts}\tfetched LLM via {model}\tsent={sent_tokens}\trecv={recv_tokens}\tms={elapsed_ms}\n"
try:
with open(log_path, 'a', encoding='utf-8', newline='\n') as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
f.write(line)
f.flush()
finally:
fcntl.flock(f, fcntl.LOCK_UN)
except Exception as exc:
logger.warning(f"LLM log write failed: {exc}")
def _call_llm(model, messages, api_key=None, api_base=None, max_tokens=600, conn_id=None, tpm=0, log_path=None):
"""
Thin wrapper around litellm.completion.
Isolated as a named function so tests can mock.patch it without importing litellm.
Determinism settings
--------------------
temperature=0 — greedy decoding; same input produces the same output consistently.
seed=0 — passed through to providers that support it (OpenAI, some others)
for near-bit-identical reproducibility across calls.
Deliberately NOT set
--------------------
top_p — redundant at temperature=0 and can interact badly with some providers.
frequency_penalty / presence_penalty — would penalise the model for repeating specific
values (e.g. "$10 → $10") which is exactly wrong for change detection.
max_tokens — caller sets this based on the pass type:
enumerate pass needs more room than the final summary pass.
conn_id / tpm — optional rate limiting; when both are set, a proactive token-bucket
check is performed before calling the API. Raises _RateLimitWait if
the bucket is empty so the worker can re-queue without retrying.
log_path — when set, each call is appended to the datastore LLM activity log.
Returns the response text string.
"""
import litellm
# Proactive rate check (skipped when tpm=0 or conn_id is None)
if conn_id and tpm:
prompt_tokens = litellm.token_counter(model=model, messages=messages)
total_est = prompt_tokens + max_tokens
bucket = _get_rate_bucket(conn_id, tpm)
ok, wait = bucket.try_consume(total_est)
if not ok:
raise _RateLimitWait(wait)
kwargs = dict(
model=model,
messages=messages,
temperature=0,
seed=0,
max_tokens=max_tokens,
)
if api_key:
kwargs['api_key'] = api_key
if api_base:
kwargs['api_base'] = api_base
t0 = time.monotonic()
response = litellm.completion(**kwargs)
elapsed_ms = round((time.monotonic() - t0) * 1000)
if log_path:
usage = getattr(response, 'usage', None)
sent_tok = getattr(usage, 'prompt_tokens', 0) or 0
recv_tok = getattr(usage, 'completion_tokens', 0) or 0
_append_llm_log(log_path, model, sent_tok, recv_tok, elapsed_ms)
return response.choices[0].message.content.strip()
def _resolve_llm_connection(watch, datastore):
"""Return (model, api_key, api_base, conn_id, tpm) for the given watch.
Resolution order:
1. Watch-level connection_id pointing to a named entry in plugin settings.
2. The default entry in plugin settings (is_default=True).
3. Legacy flat fields on the watch or in global settings — backward compat.
4. Hard-coded fallback: gpt-4o-mini with no key / base.
"""
from changedetectionio.llm.plugin import get_llm_settings
from changedetectionio.llm.settings_form import sanitised_conn_id
llm_settings = get_llm_settings(datastore)
connections = llm_settings.get('llm_connection') or []
# 1. Watch-level override by explicit connection_id
watch_conn_id = watch.get('llm_connection_id')
if watch_conn_id:
for c in connections:
if c.get('connection_id') == watch_conn_id:
cid = sanitised_conn_id(c.get('connection_id', ''))
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
cid, int(c.get('tokens_per_minute', 0) or 0))
# 2. Global default connection
for c in connections:
if c.get('is_default'):
cid = sanitised_conn_id(c.get('connection_id', ''))
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
cid, int(c.get('tokens_per_minute', 0) or 0))
# 3. Legacy flat fields (backward compat)
app_settings = datastore.data['settings']['application']
model = watch.get('llm_model') or app_settings.get('llm_model', 'gpt-4o-mini')
api_key = watch.get('llm_api_key') or app_settings.get('llm_api_key', '')
api_base = watch.get('llm_api_base') or app_settings.get('llm_api_base', '')
return model, api_key, api_base, 'legacy', 0
SYSTEM_PROMPT = (
'You are a change detection assistant. '
'Be precise and factual. Never speculate. '
'Always use exact numbers, values, and quoted text when present in the diff. '
'If nothing meaningful changed, say so explicitly.'
)
def _build_context_header(watch, datastore):
"""Return a short multi-line string describing what this watch monitors.
Included lines (only when non-empty / non-redundant):
URL: <url>
Monitor: <user title or fetched page title> (omitted when same as URL)
Tags: <comma-separated tag titles> (omitted when none)
"""
url = watch.get('url', '')
title = watch.get('title', '') or watch.get('page_title', '')
lines = [f"URL: {url}"]
if title and title != url:
lines.append(f"Monitor: {title}")
tag_titles = []
for tag_uuid in (watch.get('tags') or []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid, {})
t = tag.get('title', '').strip()
if t:
tag_titles.append(t)
if tag_titles:
lines.append(f"Tags: {', '.join(tag_titles)}")
return '\n'.join(lines)
def _chunk_lines(lines, model, chunk_token_size):
"""Split lines into chunks that each fit within chunk_token_size tokens."""
import litellm
chunks, current, current_tokens = [], [], 0
for line in lines:
line_tokens = litellm.token_counter(model=model, text=line)
if current and current_tokens + line_tokens > chunk_token_size:
chunks.append('\n'.join(current))
current, current_tokens = [], 0
current.append(line)
current_tokens += line_tokens
if current:
chunks.append('\n'.join(current))
return chunks
def _enumerate_changes(diff_text, context_header, model, llm_kwargs):
"""
Pass 1 — ask the model to list every distinct change exhaustively, one per line.
Returns a plain-text list string.
This avoids compression decisions: the model just lists, it does not prioritise.
"""
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"Diff:\n{diff_text}\n\n"
"List every distinct change you see, one item per line. "
"Be exhaustive — do not filter or prioritise. "
"Use exact values from the diff (prices, dates, counts, quoted text)."
),
},
]
# Enumerate pass needs more output room than the final summary
return _call_llm(model=model, messages=messages, max_tokens=1200, **llm_kwargs)
def _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction=None):
"""
Pass 2 — compress the exhaustive enumeration into the final output.
Operates on a small, structured input so nothing is lost that wasn't already listed.
summary_instruction overrides the default STRUCTURED_OUTPUT_INSTRUCTION when set.
"""
instruction = summary_instruction or (
"Now produce the final structured output for all of these changes.\n\n"
+ STRUCTURED_OUTPUT_INSTRUCTION
)
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"All changes detected:\n{enumerated}\n\n"
+ instruction
),
},
]
return _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
def process_llm_summary(item, datastore):
"""
Generate an LLM summary for a detected change and write {snapshot_id}-llm.txt.
item keys:
uuid - watch UUID
snapshot_id - the newer snapshot ID (md5 hex), maps to {snapshot_id}.txt[.br]
attempts - retry counter
Summarisation strategy (chosen by diff token count):
Small (< SINGLE_PASS_TOKEN_LIMIT): one call — enumerate + summarise together.
Medium (< TWO_PASS_TOKEN_LIMIT): two calls — enumerate all changes, then compress.
Large (≥ TWO_PASS_TOKEN_LIMIT): map-reduce — chunk → enumerate per chunk →
synthesise chunk enumerations → final summary.
The two-pass / map-reduce approach prevents lossiness: temperature=0 causes the model
to greedily commit to the most prominent change and drop the rest in a single pass.
Enumerating first forces comprehensive coverage before any compression happens.
Split into _call_llm / _write_summary so each step is independently patchable in tests.
"""
import difflib
import litellm
uuid = item['uuid']
snapshot_id = item['snapshot_id']
watch = datastore.data['watching'].get(uuid)
if not watch:
raise ValueError(f"Watch {uuid} not found")
# Find this snapshot and the one before it in history
history = watch.history
history_keys = list(history.keys())
try:
idx = next(
i for i, k in enumerate(history_keys)
if os.path.basename(history[k]).split('.')[0] == snapshot_id
)
except StopIteration:
raise ValueError(f"snapshot_id {snapshot_id} not found in history for watch {uuid}")
if idx == 0:
raise ValueError(f"snapshot_id {snapshot_id} is the first history entry — no prior to diff against")
before_text = _read_snapshot(watch, history[history_keys[idx - 1]])
current_text = _read_snapshot(watch, history[history_keys[idx]])
# Resolve model / credentials via connections table (with legacy flat-field fallback)
model, api_key, api_base, conn_id, tpm = _resolve_llm_connection(watch, datastore)
url = watch.get('url', '')
context_header = _build_context_header(watch, datastore)
llm_kwargs = {
'log_path': os.path.join(datastore.datastore_path, 'llm-log.txt'),
}
if api_key:
llm_kwargs['api_key'] = api_key
if api_base:
llm_kwargs['api_base'] = api_base
if conn_id:
llm_kwargs['conn_id'] = conn_id
if tpm:
llm_kwargs['tpm'] = tpm
# Use custom prompt / context-line setting if configured
from changedetectionio.llm.plugin import get_llm_settings
llm_settings = get_llm_settings(datastore)
custom_prompt = (llm_settings.get('llm_summary_prompt') or '').strip()
summary_instruction = custom_prompt if custom_prompt else (
"Analyse all changes in this diff.\n\n" + STRUCTURED_OUTPUT_INSTRUCTION
)
context_n = int(llm_settings.get('llm_diff_context_lines') or 2)
diff_lines = list(difflib.unified_diff(
before_text.splitlines(),
current_text.splitlines(),
lineterm='',
n=context_n,
))
diff_text = '\n'.join(diff_lines)
if not diff_text.strip():
logger.debug(f"LLM: no diff content for {uuid}/{snapshot_id}, skipping")
return
diff_tokens = litellm.token_counter(model=model, text=diff_text)
logger.debug(f"LLM: diff is {diff_tokens} tokens for {uuid}/{snapshot_id}")
if diff_tokens < TOKEN_SINGLE_PASS_THRESHOLD:
# Small diff — single call, model can see everything at once
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"Diff:\n{diff_text}\n\n"
+ summary_instruction
),
},
]
raw = _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
strategy = 'single'
elif diff_tokens < TOKEN_TWO_PASS_THRESHOLD:
# Medium diff — two-pass: enumerate exhaustively, then compress
enumerated = _enumerate_changes(diff_text, context_header, model, llm_kwargs)
raw = _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction)
strategy = 'two-pass'
else:
# Large diff — map-reduce: chunk → enumerate per chunk → synthesise
chunks = _chunk_lines(diff_lines, model, TOKEN_CHUNK_SIZE)
logger.debug(f"LLM: map-reduce over {len(chunks)} chunks for {uuid}/{snapshot_id}")
chunk_enumerations = []
for i, chunk in enumerate(chunks):
logger.debug(f"LLM: enumerating chunk {i+1}/{len(chunks)}")
chunk_enumerations.append(
_enumerate_changes(chunk, context_header, model, llm_kwargs)
)
combined = '\n'.join(chunk_enumerations)
raw = _summarise_enumeration(combined, context_header, model, llm_kwargs, summary_instruction)
strategy = 'map-reduce'
llm_data = parse_llm_response(raw)
write_llm_data(watch.data_dir, snapshot_id, llm_data)
logger.info(f"LLM tokens written for {uuid}/{snapshot_id} (strategy: {strategy}, tokens: {diff_tokens})")
def llm_summary_runner(worker_id, app, datastore, llm_q):
"""
Sync LLM summary worker — mirrors the notification_runner pattern.
One worker is the right default (LLM API rate limits constrain throughput
more than parallelism helps). Increase via LLM_WORKERS env var if using
a local Ollama endpoint with no rate limits.
Failed items are re-queued with exponential backoff (see MAX_RETRIES /
RETRY_BACKOFF_BASE_SECONDS). After MAX_RETRIES the item is dropped and
the failure is recorded on the watch.
"""
with app.app_context():
while not app.config.exit.is_set():
try:
item = llm_q.get(block=False)
except queue.Empty:
app.config.exit.wait(1)
continue
# Honour retry delay — if the item isn't due yet, put it back
# and sleep briefly rather than spinning.
next_retry_at = item.get('next_retry_at', 0)
if next_retry_at > time.time():
llm_q.put(item)
app.config.exit.wait(min(next_retry_at - time.time(), 5))
continue
uuid = item.get('uuid')
snapshot_id = item.get('snapshot_id')
attempts = item.get('attempts', 0)
logger.debug(f"LLM worker {worker_id} processing uuid={uuid} snapshot={snapshot_id} attempt={attempts}")
try:
process_llm_summary(item, datastore)
logger.info(f"LLM worker {worker_id} completed summary for uuid={uuid} snapshot={snapshot_id}")
except NotImplementedError:
# Silently drop until the processor is implemented
logger.debug(f"LLM worker {worker_id} skipping — processor not yet implemented")
except _RateLimitWait as rw:
# Proactive bucket empty — re-queue without counting as a failure
item['next_retry_at'] = time.time() + rw.wait_seconds
llm_q.put(item)
logger.info(
f"LLM worker {worker_id} rate-limited (proactive) for {rw.wait_seconds:.1f}s "
f"uuid={uuid}"
)
except Exception as e:
# Reactive: check if the API itself returned a rate-limit error
try:
import litellm as _litellm
if isinstance(e, _litellm.RateLimitError):
wait = _parse_retry_after(e)
item['next_retry_at'] = time.time() + wait
llm_q.put(item)
logger.warning(
f"LLM worker {worker_id} API rate limit for uuid={uuid}, "
f"retry in {wait:.1f}s"
)
continue
except ImportError:
pass
logger.error(f"LLM worker {worker_id} error for uuid={uuid} snapshot={snapshot_id}: {e}")
if attempts < MAX_RETRIES:
backoff = RETRY_BACKOFF_BASE_SECONDS * (2 ** attempts)
item['attempts'] = attempts + 1
item['next_retry_at'] = time.time() + backoff
llm_q.put(item)
logger.info(
f"LLM worker {worker_id} re-queued uuid={uuid} "
f"attempt={item['attempts']}/{MAX_RETRIES} retry_in={backoff}s"
)
else:
logger.error(
f"LLM worker {worker_id} gave up on uuid={uuid} snapshot={snapshot_id} "
f"after {MAX_RETRIES} attempts"
)
if uuid and uuid in datastore.data['watching']:
datastore.update_watch(
uuid=uuid,
update_obj={'last_error': f"LLM summary failed after {MAX_RETRIES} attempts: {e}"}
)

View File

@@ -1,139 +0,0 @@
import re
import uuid as _uuid
from flask_babel import lazy_gettext as _l
from wtforms import (
BooleanField,
FieldList,
Form,
FormField,
HiddenField,
IntegerField,
PasswordField,
SelectField,
StringField,
TextAreaField,
)
from wtforms.validators import Length, NumberRange, Optional
from changedetectionio.llm.tokens import STRUCTURED_OUTPUT_INSTRUCTION
# The built-in instruction appended after the diff — shown as placeholder text.
DEFAULT_SUMMARY_PROMPT = (
"Analyse all changes in this diff.\n\n"
+ STRUCTURED_OUTPUT_INSTRUCTION
)
# Allowed characters for a connection ID coming from the browser.
_CONN_ID_RE = re.compile(r'^[a-zA-Z0-9_-]{1,64}$')
def sanitised_conn_id(raw):
"""Return raw if it looks like a safe identifier, otherwise a fresh UUID."""
s = (raw or '').strip()
return s if _CONN_ID_RE.match(s) else str(_uuid.uuid4())
class LLMConnectionEntryForm(Form):
"""Schema for a single LLM connection.
Declaring every field here is what prevents arbitrary key injection:
only these fields can ever reach the datastore from this form.
"""
connection_id = HiddenField()
name = StringField(_l('Name'), validators=[Optional(), Length(max=100)])
model = StringField(_l('Model string'), validators=[Optional(), Length(max=200)])
api_key = StringField(_l('API Key'), validators=[Optional(), Length(max=500)])
api_base = StringField(_l('API Endpoint'), validators=[Optional(), Length(max=500)])
tokens_per_minute = IntegerField(_l('Tokens/min'), validators=[Optional(), NumberRange(min=0, max=10_000_000)], default=0)
is_default = BooleanField(_l('Default'), validators=[Optional()])
class LLMNewConnectionForm(Form):
"""Staging fields for the 'Add a connection' UI.
These are read client-side by llm.js to build a new FieldList entry on click.
They are never used server-side — render_kw sets the id attributes llm.js
looks up with $('#llm-add-name') etc.
"""
preset = SelectField(
_l('Provider template'),
validate_choice=False,
# WTForms 3.x uses a dict for optgroups (has_groups() checks isinstance(choices, dict)).
# An empty-string key renders as <optgroup label=""> which browsers treat as ungrouped.
choices={
'': [('', '')],
_l('Cloud'): [
('openai-mini', 'OpenAI — gpt-4o-mini'),
('openai-4o', 'OpenAI — gpt-4o'),
('anthropic-haiku', 'Anthropic — claude-3-haiku'),
('anthropic-sonnet', 'Anthropic — claude-3-5-sonnet'),
('groq-8b', 'Groq — llama-3.1-8b-instant'),
('groq-70b', 'Groq — llama-3.3-70b-versatile'),
('gemini-flash', 'Google — gemini-1.5-flash'),
('mistral-small', 'Mistral — mistral-small'),
('deepseek', 'DeepSeek — deepseek-chat'),
('openrouter', 'OpenRouter (custom model)'),
],
_l('Local'): [
('ollama-llama', 'Ollama — llama3.1'),
('ollama-mistral', 'Ollama — mistral'),
('lmstudio', 'LM Studio'),
],
_l('Custom'): [
('custom', _l('Manual entry')),
],
},
render_kw={'id': 'llm-preset'},
)
name = StringField(_l('Name'),
render_kw={'id': 'llm-add-name', 'size': 30,
'autocomplete': 'off'})
model = StringField(_l('Model string'),
render_kw={'id': 'llm-add-model', 'size': 40,
'placeholder': 'gpt-4o-mini', 'autocomplete': 'off'})
api_key = PasswordField(_l('API Key'),
render_kw={'id': 'llm-add-key', 'size': 40,
'placeholder': 'sk-…', 'autocomplete': 'off'})
api_base = StringField(_l('API Endpoint'),
render_kw={'id': 'llm-add-base', 'size': 40,
'placeholder': 'http://localhost:11434', 'autocomplete': 'off'})
tokens_per_minute = IntegerField(_l('Tokens/min'), default=0,
render_kw={'id': 'llm-add-tpm', 'style': 'width: 8em;',
'min': '0', 'step': '1000'})
class LLMSettingsForm(Form):
"""WTForms form for the LLM settings tab.
llm_connection is a FieldList of LLMConnectionEntryForm entries.
llm.js emits individual hidden inputs (llm_connection-N-fieldname) on submit
instead of a JSON blob, so WTForms processes them through the declared schema.
"""
llm_connection = FieldList(FormField(LLMConnectionEntryForm), min_entries=0)
new_connection = FormField(LLMNewConnectionForm)
llm_diff_context_lines = IntegerField(
_l('Diff context lines'),
validators=[Optional(), NumberRange(min=0, max=20)],
default=2,
description=_l(
'Number of unchanged lines shown around each change in the diff. '
'More lines give the LLM more context but increase token usage. (default: 2)'
),
render_kw={'style': 'width: 5em;', 'min': '0', 'max': '20'},
)
llm_summary_prompt = TextAreaField(
_l('Summary prompt'),
validators=[Optional()],
description=_l(
'Override the instruction sent to the LLM after the diff. '
'Leave blank to use the built-in default (structured JSON output).'
),
render_kw={
'rows': 8,
'placeholder': DEFAULT_SUMMARY_PROMPT,
'class': 'pure-input-1',
},
)

View File

@@ -1,103 +0,0 @@
<script src="{{url_for('static_content', group='js', filename='llm.js')}}" defer></script>
<script>
var LLM_CONNECTIONS = (function () {
var list = {{ plugin_form.llm_connection.data|tojson }};
var out = {};
(list || []).forEach(function (c) { if (c && c.connection_id) out[c.connection_id] = c; });
return out;
}());
var LLM_I18N = {
noConnections: '{{ _("No connections configured yet.") }}',
setDefault: '{{ _("Set as default") }}',
remove: '{{ _("Remove") }}',
show: '{{ _("show") }}',
hide: '{{ _("hide") }}',
nameModelRequired: '{{ _("Name and Model string are required.") }}'
};
</script>
{# ── Configured connections table ──────────────────── #}
<fieldset>
<legend>{{ _('LLM Connections') }}</legend>
<table class="pure-table pure-table-horizontal llm-connections">
<thead>
<tr>
<th class="llm-col-def" title="{{ _('Default') }}">{{ _('Default') }}</th>
<th class="llm-col-name">{{ _('Name') }}</th>
<th class="llm-col-model">{{ _('Model') }}</th>
<th class="llm-col-key">{{ _('API Key') }}</th>
<th class="llm-col-tpm" title="{{ _('Tokens per minute limit (0 = unlimited)') }}">{{ _('TPM') }}</th>
<th class="llm-col-del"></th>
</tr>
</thead>
<tbody id="llm-connections-tbody">
</tbody>
</table>
</fieldset>
{# ── Add connection ─────────────────────────────────── #}
{% set nf = plugin_form.new_connection.form %}
<fieldset>
<legend>{{ _('Add a connection') }}</legend>
<div class="pure-control-group">
{{ nf.preset.label }}
{{ nf.preset() }}
</div>
<div class="pure-control-group">
{{ nf.name.label }}
{{ nf.name(placeholder=_('e.g. My OpenAI')) }}
</div>
<div class="pure-control-group">
{{ nf.model.label }}
{{ nf.model() }}
</div>
<div class="pure-control-group">
<label for="llm-add-key">
{{ _('API Key') }}
<span class="pure-form-message-inline">({{ _('leave blank for local') }})</span>
</label>
<div class="llm-key-wrap">
{{ nf.api_key() }}
<button type="button" id="llm-key-toggle" class="pure-button">{{ _('show') }}</button>
</div>
</div>
<div class="pure-control-group" id="llm-base-group" style="display:none">
<label for="llm-add-base">
{{ _('API Endpoint') }}
<span class="pure-form-message-inline">({{ _('optional') }})</span>
</label>
{{ nf.api_base() }}
</div>
<div class="pure-control-group">
<label for="llm-add-tpm">
{{ _('Tokens/min limit') }}
<span class="pure-form-message-inline">({{ _('0 = unlimited') }})</span>
</label>
{{ nf.tokens_per_minute() }}
</div>
<div class="pure-controls">
<button type="button" id="llm-btn-add" class="pure-button pure-button-primary">{{ _('+ Add connection') }}</button>
</div>
</fieldset>
{# ── Prompt configuration ────────────────────────────────── #}
<fieldset>
<legend>{{ _('Summary Prompt') }}</legend>
<div class="pure-control-group">
{{ plugin_form.llm_diff_context_lines.label }}
{{ plugin_form.llm_diff_context_lines() }}
<span class="pure-form-message-inline">
{{ _('Unchanged lines shown around each change in the diff sent to the LLM. More lines = more context but higher token cost. (default: 2)') }}
</span>
</div>
<div class="pure-control-group">
{{ render_field(plugin_form.llm_summary_prompt) }}
<span class="pure-form-message-inline">
{{ _('Instruction appended after the diff in every LLM call. Leave blank to use the built-in default (structured JSON output).') }}
</span>
</div>
</fieldset>

View File

@@ -1,197 +0,0 @@
"""
LLM notification token definitions and file I/O helpers.
All LLM data for a snapshot is stored under a dedicated subdirectory:
{data_dir}/llm/{snapshot_id}-llm.json
A plain-text {snapshot_id}-llm.txt is also written containing just the
summary field, for backward compatibility with any code that already reads it.
Token catalogue
---------------
llm_summary 1-3 sentence description of all changes, exact values.
llm_headline 5-8 word punchy title — ideal for the notification subject line.
llm_importance Numeric 1-10 significance score; enables routing rules like
"only escalate if llm_importance >= 8".
llm_sentiment Machine-readable: "positive", "negative", or "neutral".
Useful for trend tracking and coloured alert styling.
llm_one_liner Shortest useful summary — one sentence for SMS, Pushover,
and other character-limited channels.
"""
import os
import json
from loguru import logger
# ── Constants ──────────────────────────────────────────────────────────────
LLM_TOKEN_NAMES = (
'llm_summary',
'llm_headline',
'llm_importance',
'llm_sentiment',
'llm_one_liner',
)
# How long the notification runner waits for LLM data before giving up.
LLM_NOTIFICATION_RETRY_DELAY_SECONDS = int(os.getenv('LLM_NOTIFICATION_RETRY_DELAY', '10'))
LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS = int(os.getenv('LLM_NOTIFICATION_MAX_WAIT', '18')) # 18 × 10s = 3 min
# JSON prompt fragment — embedded in the final summarisation call.
STRUCTURED_OUTPUT_INSTRUCTION = (
'Return ONLY a valid JSON object — no markdown fences, no extra text — using exactly these keys:\n'
'{"summary":"1-3 sentences covering ALL changes; use exact values from the diff.","headline":"5-8 word punchy title for this specific change","importance":7,"sentiment":"positive","one_liner":"One sentence for SMS/push character limits."}\n'
'importance: 1=trivial whitespace, 5=moderate content change, 10=critical price/availability change.\n'
'sentiment: "positive" (desirable for the user), "negative" (undesirable), or "neutral" (informational only).'
)
# ── File I/O ───────────────────────────────────────────────────────────────
def llm_subdir(data_dir: str) -> str:
"""Return the llm/ subdirectory path (does not create it)."""
return os.path.join(data_dir, 'llm')
def llm_json_path(data_dir: str, snapshot_id: str) -> str:
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.json")
def llm_txt_path(data_dir: str, snapshot_id: str) -> str:
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.txt")
def is_llm_data_ready(data_dir: str, snapshot_id: str) -> bool:
"""Return True if LLM data has been written for this snapshot."""
return os.path.exists(llm_json_path(data_dir, snapshot_id)) or \
os.path.exists(llm_txt_path(data_dir, snapshot_id))
def read_llm_tokens(data_dir: str, snapshot_id: str) -> dict:
"""
Read LLM token data for a snapshot.
Tries JSON first (new format), falls back to plain .txt (old format).
Returns an empty dict if no data is available yet.
"""
json_file = llm_json_path(data_dir, snapshot_id)
if os.path.exists(json_file):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
return _normalise(data)
except Exception as exc:
logger.warning(f"LLM tokens: failed to read {json_file}: {exc}")
txt_file = llm_txt_path(data_dir, snapshot_id)
if os.path.exists(txt_file):
try:
with open(txt_file, 'r', encoding='utf-8') as f:
summary = f.read().strip()
return _normalise({'summary': summary, 'one_liner': summary[:200]})
except Exception as exc:
logger.warning(f"LLM tokens: failed to read {txt_file}: {exc}")
return {}
def write_llm_data(data_dir: str, snapshot_id: str, data: dict) -> str:
"""
Atomically write LLM data to the llm/ subdirectory.
Writes:
llm/{snapshot_id}-llm.json — full structured data (all tokens)
llm/{snapshot_id}-llm.txt — plain summary text (backward compat)
Returns the path of the JSON file.
"""
normalised = _normalise(data)
subdir = llm_subdir(data_dir)
os.makedirs(subdir, exist_ok=True)
json_file = llm_json_path(data_dir, snapshot_id)
_atomic_write_text(json_file, json.dumps(normalised, ensure_ascii=False))
txt_file = llm_txt_path(data_dir, snapshot_id)
_atomic_write_text(txt_file, normalised.get('summary', ''))
return json_file
def parse_llm_response(response: str) -> dict:
"""
Parse a structured JSON response from the LLM.
Tries strict JSON parse, then extracts from markdown code fences,
then a bare object search. Falls back to treating the whole response
as the 'summary' field if nothing parses.
"""
import re
text = response.strip()
# 1. Direct JSON parse
try:
obj = json.loads(text)
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 2. Markdown code fence: ```json { ... } ```
m = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group(1))
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 3. Bare JSON object anywhere in the response
m = re.search(r'\{[^{}]*\}', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group(0))
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 4. Fallback — treat entire response as summary
logger.debug("LLM response was not valid JSON — using raw text as summary")
return _normalise({'summary': text, 'one_liner': text[:200] if len(text) > 200 else text})
# ── Internal helpers ───────────────────────────────────────────────────────
def _normalise(data: dict) -> dict:
"""Return a clean token dict with all expected keys present."""
importance = data.get('importance')
if importance is not None:
try:
importance = max(1, min(10, int(float(importance))))
except (TypeError, ValueError):
importance = None
sentiment = str(data.get('sentiment', '')).lower().strip()
if sentiment not in ('positive', 'negative', 'neutral'):
sentiment = ''
return {
'summary': str(data.get('summary', '') or '').strip(),
'headline': str(data.get('headline', '') or '').strip(),
'importance': importance,
'sentiment': sentiment,
'one_liner': str(data.get('one_liner', '') or '').strip(),
}
def _atomic_write_text(path: str, text: str) -> None:
tmp = path + '.tmp'
with open(tmp, 'w', encoding='utf-8') as f:
f.write(text)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)

View File

@@ -1009,31 +1009,14 @@ class model(EntityPersistenceMixin, watch_base):
def extra_notification_token_values(self):
from changedetectionio.llm.tokens import read_llm_tokens
history = self.history
if not history:
return {}
latest_fname = history[list(history.keys())[-1]]
snapshot_id = os.path.basename(latest_fname).split('.')[0] # always 32-char MD5
data = read_llm_tokens(self.data_dir, snapshot_id)
if not data:
return {}
return {
'llm_summary': data.get('summary', ''),
'llm_headline': data.get('headline', ''),
'llm_importance': data.get('importance'),
'llm_sentiment': data.get('sentiment', ''),
'llm_one_liner': data.get('one_liner', ''),
}
# Used for providing extra tokens
# return {'widget': 555}
return {}
def extra_notification_token_placeholder_info(self):
return [
('llm_summary', "LLM: 1-3 sentence summary of all changes with exact values"),
('llm_headline', "LLM: 5-8 word punchy title for this specific change"),
('llm_importance', "LLM: Significance score 1-10 (1=trivial, 10=critical)"),
('llm_sentiment', "LLM: Change sentiment — positive, negative, or neutral"),
('llm_one_liner', "LLM: One sentence for SMS/push character limits"),
]
# Used for providing extra tokens
# return [('widget', "Get widget amounts")]
return []
def extract_regex_from_all_history(self, regex):

View File

@@ -6,7 +6,6 @@ Extracted from update_worker.py to provide standalone notification functionality
for both sync and async workers
"""
import datetime
import os
import pytz
from loguru import logger
@@ -83,12 +82,6 @@ class NotificationContextData(dict):
'watch_tag': None,
'watch_title': None,
'watch_url': 'https://WATCH-PLACE-HOLDER/',
# LLM-generated tokens (populated by notification_runner once LLM data is ready)
'llm_headline': None,
'llm_importance': None,
'llm_one_liner': None,
'llm_sentiment': None,
'llm_summary': None,
})
# Apply any initial data passed in
@@ -281,11 +274,6 @@ class NotificationService:
timestamp_changed=dates[date_index_to]))
if self.notification_q:
# Store snapshot_id hint so notification_runner can gate on LLM data readiness
if watch and len(dates) > 0:
latest_fname = watch.history.get(dates[date_index_to], '')
if latest_fname:
n_object['_llm_snapshot_id'] = os.path.basename(latest_fname).split('.')[0]
logger.debug("Queued notification for sending")
self.notification_q.put(n_object)
else:

View File

@@ -151,8 +151,7 @@ class ChangeDetectionSpec:
pass
@hookspec
def update_finalize(update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
def update_finalize(update_handler, watch, datastore, processing_exception):
"""Called after watch processing completes (success or failure).
This hook is called in the finally block after all processing is complete,
@@ -169,10 +168,6 @@ class ChangeDetectionSpec:
processing_exception: The exception from the main processing block, or None if successful.
This does NOT include cleanup exceptions - only exceptions from
the actual watch processing (fetch, diff, etc).
changed_detected: True when the processor detected a content change (default False).
snapshot_id: MD5 hex string of the new snapshot, matches the prefix of the history
filename (e.g. 'abc123…''abc123….txt[.br]'). None when no snapshot
was saved (first run, error, same content).
Returns:
None: This hook doesn't return a value
@@ -585,8 +580,7 @@ def apply_update_handler_alter(update_handler, watch, datastore):
return current_handler
def apply_update_finalize(update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
def apply_update_finalize(update_handler, watch, datastore, processing_exception):
"""Apply update_finalize hooks from all plugins.
Called in the finally block after watch processing completes, allowing plugins
@@ -597,8 +591,6 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
watch: The watch dict that was processed (may be None)
datastore: The application datastore
processing_exception: The exception from processing, or None if successful
changed_detected: True when the processor detected a content change.
snapshot_id: MD5 hex string of the new snapshot, or None.
Returns:
None
@@ -609,9 +601,7 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
update_handler=update_handler,
watch=watch,
datastore=datastore,
processing_exception=processing_exception,
changed_detected=changed_detected,
snapshot_id=snapshot_id,
processing_exception=processing_exception
)
except Exception as e:
# Don't let plugin errors crash the worker

View File

@@ -198,6 +198,7 @@ def handle_watch_update(socketio, **kwargs):
except Exception as e:
logger.error(f"Socket.IO error in handle_watch_update: {str(e)}")
def init_socketio(app, datastore):
"""Initialize SocketIO with the main Flask app"""
import platform

View File

@@ -1,187 +0,0 @@
/* llm.js — LLM Connections management (settings page)
* Depends on: jQuery (global), LLM_CONNECTIONS + LLM_I18N injected by Jinja2 template.
*/
(function ($) {
'use strict';
// Provider presets: [value, label, model, api_base, tpm]
// tpm = tokens-per-minute limit (0 = unlimited / local).
// Defaults reflect free-tier or conservative tier-1 limits.
var LLM_PRESETS = [
['openai-mini', 'OpenAI — gpt-4o-mini', 'gpt-4o-mini', '', 200000],
['openai-4o', 'OpenAI — gpt-4o', 'gpt-4o', '', 30000],
['anthropic-haiku', 'Anthropic — claude-3-haiku', 'anthropic/claude-3-haiku-20240307', '', 100000],
['anthropic-sonnet', 'Anthropic — claude-3-5-sonnet', 'anthropic/claude-3-5-sonnet-20241022', '', 40000],
['groq-8b', 'Groq — llama-3.1-8b-instant', 'groq/llama-3.1-8b-instant', '', 6000],
['groq-70b', 'Groq — llama-3.3-70b-versatile', 'groq/llama-3.3-70b-versatile', '', 6000],
['gemini-flash', 'Google — gemini-1.5-flash', 'gemini/gemini-1.5-flash', '', 1000000],
['mistral-small', 'Mistral — mistral-small', 'mistral/mistral-small-latest', '', 500000],
['deepseek', 'DeepSeek — deepseek-chat', 'deepseek/deepseek-chat', '', 50000],
['openrouter', 'OpenRouter (custom model)', 'openrouter/', '', 20000],
['ollama-llama', 'Ollama — llama3.1 (local)', 'ollama/llama3.1', 'http://localhost:11434', 0],
['ollama-mistral', 'Ollama — mistral (local)', 'ollama/mistral', 'http://localhost:11434', 0],
['lmstudio', 'LM Studio (local)', 'openai/local', 'http://localhost:1234/v1', 0],
];
var presetMap = {};
$.each(LLM_PRESETS, function (_, p) { presetMap[p[0]] = p; });
function escHtml(s) {
return $('<div>').text(String(s)).html();
}
function maskKey(k) {
if (!k) return '<span style="color:var(--color-grey-700)">—</span>';
return escHtml(k.substring(0, 4)) + '••••';
}
// Emit WTForms FieldList hidden inputs (llm_connection-N-fieldname) so the
// server processes connections through the declared schema — no arbitrary keys.
function serialise() {
var $form = $('form.settings');
$form.find('input[data-llm-gen]').remove();
var ids = Object.keys(LLM_CONNECTIONS);
$.each(ids, function (i, id) {
var c = LLM_CONNECTIONS[id];
var prefix = 'llm_connection-' + i + '-';
var fields = {
connection_id: id,
name: c.name || '',
model: c.model || '',
api_key: c.api_key || '',
api_base: c.api_base || '',
tokens_per_minute: parseInt(c.tokens_per_minute || 0, 10)
};
$.each(fields, function (field, value) {
$('<input>').attr({ type: 'hidden', name: prefix + field, value: value, 'data-llm-gen': '1' }).appendTo($form);
});
// BooleanField: only emit when true (absence == false in WTForms)
if (c.is_default) {
$('<input>').attr({ type: 'hidden', name: prefix + 'is_default', value: 'y', 'data-llm-gen': '1' }).appendTo($form);
}
});
}
function renderTable() {
var $tbody = $('#llm-connections-tbody');
$tbody.empty();
var ids = Object.keys(LLM_CONNECTIONS);
if (!ids.length) {
$tbody.html('<tr class="llm-empty"><td colspan="6">' + escHtml(LLM_I18N.noConnections) + '</td></tr>');
return;
}
$.each(ids, function (_, id) {
var c = LLM_CONNECTIONS[id];
var tpm = parseInt(c.tokens_per_minute || 0, 10);
var tpmLabel = tpm ? tpm.toLocaleString() : '<span style="color:var(--color-grey-700)">∞</span>';
$tbody.append(
'<tr>' +
'<td class="llm-col-def">' +
'<input type="radio" class="llm-default-radio" name="llm_default_radio"' +
' title="' + escHtml(LLM_I18N.setDefault) + '"' +
(c.is_default ? ' checked' : '') +
' data-id="' + escHtml(id) + '">' +
'</td>' +
'<td class="llm-col-name">' + escHtml(c.name) + '</td>' +
'<td class="llm-col-model">' + escHtml(c.model) + '</td>' +
'<td class="llm-col-key">' + maskKey(c.api_key) + '</td>' +
'<td class="llm-col-tpm">' + tpmLabel + '</td>' +
'<td class="llm-col-del">' +
'<button type="button" class="llm-del"' +
' title="' + escHtml(LLM_I18N.remove) + '"' +
' data-id="' + escHtml(id) + '">×</button>' +
'</td>' +
'</tr>'
);
});
}
$(function () {
// Event delegation on tbody — survives re-renders
$('#llm-connections-tbody')
.on('change', '.llm-default-radio', function () {
var chosen = String($(this).data('id'));
$.each(LLM_CONNECTIONS, function (k) {
LLM_CONNECTIONS[k].is_default = (k === chosen);
});
serialise();
})
.on('click', '.llm-del', function () {
var id = String($(this).data('id'));
delete LLM_CONNECTIONS[id];
var remaining = Object.keys(LLM_CONNECTIONS);
if (remaining.length && !remaining.some(function (k) { return LLM_CONNECTIONS[k].is_default; })) {
LLM_CONNECTIONS[remaining[0]].is_default = true;
}
renderTable();
serialise();
});
function updateBaseVisibility() {
var val = $('#llm-preset').val();
var preset = presetMap[val];
var hasBase = preset ? !!preset[3] : (val === 'custom');
var show = (val === 'custom') || hasBase;
$('#llm-base-group').toggle(show);
}
// Preset dropdown pre-fills add form
$('#llm-preset').on('change', function () {
var val = $(this).val();
var p = presetMap[val];
if (p) {
$('#llm-add-name').val(p[1].replace(/\s*—.*/, '').trim());
$('#llm-add-model').val(p[2]);
$('#llm-add-base').val(p[3]);
$('#llm-add-tpm').val(p[4] !== undefined ? p[4] : 0);
$('#llm-add-key').val('');
}
updateBaseVisibility();
});
// Add connection
$('#llm-btn-add').on('click', function () {
var name = $.trim($('#llm-add-name').val());
var model = $.trim($('#llm-add-model').val());
var key = $.trim($('#llm-add-key').val());
var base = $.trim($('#llm-add-base').val());
var tpm = parseInt($('#llm-add-tpm').val(), 10) || 0;
if (!name || !model) {
alert(LLM_I18N.nameModelRequired);
return;
}
var id = 'llm-' + Date.now();
var isFirst = !Object.keys(LLM_CONNECTIONS).length;
LLM_CONNECTIONS[id] = {
name: name, model: model, api_key: key, api_base: base,
tokens_per_minute: tpm, is_default: isFirst
};
$('#llm-preset, #llm-add-name, #llm-add-model, #llm-add-key, #llm-add-base').val('');
$('#llm-add-tpm').val('0');
$('#llm-base-group').hide();
renderTable();
serialise();
});
// Show/hide API key visibility
$('#llm-key-toggle').on('click', function () {
var $inp = $('#llm-add-key');
if ($inp.attr('type') === 'password') {
$inp.attr('type', 'text');
$(this).text(LLM_I18N.hide);
} else {
$inp.attr('type', 'password');
$(this).text(LLM_I18N.show);
}
});
// Serialise connections to hidden field before form submit
$('form.settings').on('submit', serialise);
// Init
renderTable();
serialise();
});
}(jQuery));

View File

@@ -1,57 +0,0 @@
#llm {
// ── Key field wrapper — input + show/hide toggle inline ───────────────
.llm-key-wrap {
display: flex;
gap: 0.3em;
align-items: center;
input { flex: 1; min-width: 0; }
button {
flex: 0 0 auto;
}
}
// ── Pure-grid column padding consistency ──────────────────────────────
.pure-u-md-1-2 {
.pure-control-group {
padding-right: 1em;
}
}
// ── Connections table ─────────────────────────────────────────────────
table.llm-connections {
width: 100%;
.llm-col-def { width: 3em; text-align: center; }
.llm-col-name { font-weight: 500; }
.llm-col-model { font-family: monospace; font-size: 0.85em; color: var(--color-grey-400); }
.llm-col-key {
font-family: monospace; font-size: 0.82em; color: var(--color-grey-600);
max-width: 140px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;
}
.llm-col-del { width: 2.5em; text-align: center; }
.llm-del {
background: none;
border: none;
cursor: pointer;
color: var(--color-grey-600);
padding: 0.15em 0.4em;
border-radius: 3px;
font-size: 1.1em;
line-height: 1;
&:hover { color: var(--color-dark-red); background: #ffeaea; }
}
.llm-empty td {
text-align: center;
color: var(--color-grey-600);
padding: 1.8em;
font-style: italic;
font-size: 0.92em;
}
}
.llm-default-radio { cursor: pointer; }
}

View File

@@ -32,7 +32,6 @@
@use "parts/toast";
@use "parts/login_form";
@use "parts/tabs";
@use "parts/llm";
// Smooth transitions for theme switching
body,

File diff suppressed because one or more lines are too long

View File

@@ -249,14 +249,13 @@ def prepare_test_function(live_server, datastore_path):
# CRITICAL: Get datastore and stop it from writing stale data
datastore = live_server.app.config.get('DATASTORE')
# Clear the queues before starting the test to prevent state leakage
from changedetectionio.flask_app import update_q, llm_summary_q
for q in (update_q, llm_summary_q):
while not q.empty():
try:
q.get_nowait()
except:
break
# Clear the queue before starting the test to prevent state leakage
from changedetectionio.flask_app import update_q
while not update_q.empty():
try:
update_q.get_nowait()
except:
break
# Add test helper methods to the app for worker management
def set_workers(count):

View File

@@ -1,260 +0,0 @@
"""
Tests for LLM summary queue, worker, and regenerate route.
Mocking strategy
----------------
- `_call_llm` is patched at the module level so no real LiteLLM/API calls are made.
- `_write_summary` is left un-patched so we can assert the file was actually written.
- `process_llm_summary` is called directly in unit tests (no worker thread needed).
"""
import os
import queue
import time
from unittest.mock import patch, MagicMock
import pytest
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, wait_for_all_checks
# ---------------------------------------------------------------------------
# Unit tests — process_llm_summary directly, no HTTP, no worker thread
# ---------------------------------------------------------------------------
class TestProcessLlmSummary:
def _make_watch_with_two_snapshots(self, client, datastore_path):
"""Helper: returns (datastore, uuid, snapshot_id) with 2 history entries."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
snapshot_id = os.path.basename(watch.history[history_keys[1]]).split('.')[0]
return datastore, uuid, snapshot_id
def test_writes_summary_file(self, client, live_server, datastore_path):
"""process_llm_summary writes {snapshot_id}-llm.txt when _call_llm succeeds."""
datastore, uuid, snapshot_id = self._make_watch_with_two_snapshots(client, datastore_path)
watch = datastore.data['watching'][uuid]
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with patch('changedetectionio.llm.queue_worker._call_llm', return_value='Price dropped from $10 to $8.') as mock_llm:
process_llm_summary(item, datastore)
assert mock_llm.called
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
assert os.path.exists(summary_path), "Summary file was not written"
assert open(summary_path).read() == 'Price dropped from $10 to $8.'
def test_call_llm_uses_temperature_zero_and_seed(self, client, live_server, datastore_path):
"""_call_llm always passes temperature=0 and seed=0 to litellm for determinism."""
import litellm
from changedetectionio.llm.queue_worker import _call_llm
messages = [{'role': 'user', 'content': 'hello'}]
mock_response = MagicMock()
mock_response.choices[0].message.content = 'ok'
with patch('litellm.completion', return_value=mock_response) as mock_completion:
_call_llm(model='gpt-4o-mini', messages=messages)
call_kwargs = mock_completion.call_args.kwargs
assert call_kwargs['temperature'] == 0, "temperature must be 0"
assert call_kwargs['seed'] == 0, "seed must be 0 for reproducibility"
assert 'top_p' not in call_kwargs, "top_p must not be set (redundant at temp=0)"
assert 'frequency_penalty' not in call_kwargs, "frequency_penalty must not be set"
assert 'presence_penalty' not in call_kwargs, "presence_penalty must not be set"
def test_skips_first_history_entry(self, client, live_server, datastore_path):
"""process_llm_summary raises ValueError for the first history entry (no prior to diff)."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
first_fname = watch.history[history_keys[0]]
snapshot_id = os.path.basename(first_fname).split('.')[0]
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with pytest.raises(ValueError, match="first history entry"):
process_llm_summary(item, datastore)
def test_raises_for_unknown_watch(self, client, live_server, datastore_path):
"""process_llm_summary raises ValueError if the watch UUID doesn't exist."""
datastore = client.application.config['DATASTORE']
item = {'uuid': 'does-not-exist', 'snapshot_id': 'abc123', 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with pytest.raises(ValueError, match="not found"):
process_llm_summary(item, datastore)
# ---------------------------------------------------------------------------
# Unit tests — worker retry logic, no HTTP
# ---------------------------------------------------------------------------
class TestWorkerRetry:
def test_requeues_on_failure_with_backoff(self, client, live_server, datastore_path):
"""Worker re-queues a failed item with incremented attempts and future next_retry_at."""
from changedetectionio.llm.queue_worker import MAX_RETRIES, RETRY_BACKOFF_BASE_SECONDS
llm_q = queue.Queue()
app = client.application
datastore = client.application.config['DATASTORE']
item = {'uuid': 'fake-uuid', 'snapshot_id': 'abc123', 'attempts': 0}
llm_q.put(item)
from changedetectionio.llm.queue_worker import process_llm_summary
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("API down")):
# Run one iteration manually (don't start the full runner thread)
from changedetectionio.llm import queue_worker
got = llm_q.get(block=False)
try:
queue_worker.process_llm_summary(got, datastore)
except Exception as e:
got['attempts'] += 1
got['next_retry_at'] = time.time() + RETRY_BACKOFF_BASE_SECONDS * (2 ** (got['attempts'] - 1))
llm_q.put(got)
assert llm_q.qsize() == 1
requeued = llm_q.get_nowait()
assert requeued['attempts'] == 1
assert requeued['next_retry_at'] > time.time()
def test_drops_after_max_retries(self, client, live_server, datastore_path):
"""Worker drops item and records last_error after MAX_RETRIES exhausted."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
from changedetectionio.llm.queue_worker import MAX_RETRIES
item = {'uuid': uuid, 'snapshot_id': 'abc123', 'attempts': MAX_RETRIES}
llm_q = queue.Queue()
llm_q.put(item)
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("still down")):
from changedetectionio.llm import queue_worker
got = llm_q.get(block=False)
try:
queue_worker.process_llm_summary(got, datastore)
except Exception as e:
if got['attempts'] < MAX_RETRIES:
llm_q.put(got)
else:
datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
# Queue should be empty — item was dropped
assert llm_q.empty()
watch = datastore.data['watching'][uuid]
assert 'still down' in (watch.get('last_error') or '')
# ---------------------------------------------------------------------------
# Route tests — GET /edit/<uuid>/regenerate-llm-summaries
# ---------------------------------------------------------------------------
class TestRegenerateLlmSummariesRoute:
def test_queues_missing_summaries(self, client, live_server, datastore_path):
"""Route queues one item per history entry that lacks a -llm.txt file."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
assert watch.history_n >= 2
from changedetectionio.flask_app import llm_summary_q
res = client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
follow_redirects=True,
)
assert res.status_code == 200
# history_n - 1 items queued (first entry skipped, no prior to diff)
expected = watch.history_n - 1
assert llm_summary_q.qsize() == expected
# Each item has the right shape
items = []
while not llm_summary_q.empty():
items.append(llm_summary_q.get_nowait())
for item in items:
assert item['uuid'] == uuid
assert item['attempts'] == 0
assert len(item['snapshot_id']) == 32 # MD5 hex
def test_skips_already_summarised_entries(self, client, live_server, datastore_path):
"""Route skips entries where {snapshot_id}-llm.txt already exists."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
second_fname = watch.history[history_keys[1]]
snapshot_id = os.path.basename(second_fname).split('.')[0]
# Pre-write a summary file
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
with open(summary_path, 'w') as f:
f.write('already done')
from changedetectionio.flask_app import llm_summary_q
client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
follow_redirects=True,
)
# That entry should have been skipped — queue should be empty
assert llm_summary_q.empty()
def test_404_for_unknown_watch(self, client, live_server, datastore_path):
res = client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid='does-not-exist'),
follow_redirects=False,
)
assert res.status_code == 404

View File

@@ -453,175 +453,6 @@ class TestHtmlToText(unittest.TestCase):
def test_script_with_closing_tag_in_string_does_not_eat_content(self):
"""
Script tag containing </script> inside a JS string must not prematurely end the block.
This is the classic regex failure mode: the old pattern would find the first </script>
inside the JS string literal and stop there, leaving the tail of the script block
(plus any following content) exposed as raw text. BS4 parses the HTML correctly.
"""
html = '''<html><body>
<p>Before script</p>
<script>
var html = "<div>foo<\\/script><p>bar</p>";
var also = 1;
</script>
<p>AFTER SCRIPT</p>
</body></html>'''
text = html_to_text(html)
assert 'Before script' in text
assert 'AFTER SCRIPT' in text
# Script internals must not leak
assert 'var html' not in text
assert 'var also' not in text
def test_content_sandwiched_between_multiple_body_scripts(self):
"""Content between multiple script/style blocks in the body must all survive."""
html = '''<html><body>
<script>var a = 1;</script>
<p>CONTENT A</p>
<style>.x { color: red; }</style>
<p>CONTENT B</p>
<script>var b = 2;</script>
<p>CONTENT C</p>
<style>.y { color: blue; }</style>
<p>CONTENT D</p>
</body></html>'''
text = html_to_text(html)
for label in ['CONTENT A', 'CONTENT B', 'CONTENT C', 'CONTENT D']:
assert label in text, f"'{label}' was eaten by script/style stripping"
assert 'var a' not in text
assert 'var b' not in text
assert 'color: red' not in text
assert 'color: blue' not in text
def test_unicode_and_international_content_preserved(self):
"""Non-ASCII content (umlauts, CJK, soft hyphens) must survive stripping."""
html = '''<html><body>
<style>.x{color:red}</style>
<p>German: Aus\xadge\xadbucht! — ANMELDUNG — Fan\xadday 2026</p>
<p>Chinese: \u6ce8\u518c</p>
<p>Japanese: \u767b\u9332</p>
<p>Korean: \ub4f1\ub85d</p>
<p>Emoji: \U0001f4e2</p>
<script>var x = 1;</script>
</body></html>'''
text = html_to_text(html)
assert 'ANMELDUNG' in text
assert '\u6ce8\u518c' in text # Chinese
assert '\u767b\u9332' in text # Japanese
assert '\ub4f1\ub85d' in text # Korean
def test_style_with_type_attribute_is_stripped(self):
"""<style type="text/css"> (with type attribute) must be stripped just like bare <style>."""
html = '''<html><body>
<style type="text/css">.important { display: none; }</style>
<p>VISIBLE CONTENT</p>
</body></html>'''
text = html_to_text(html)
assert 'VISIBLE CONTENT' in text
assert '.important' not in text
assert 'display: none' not in text
def test_ldjson_script_is_stripped(self):
"""<script type="application/ld+json"> must be stripped — raw JSON must not appear as text."""
html = '''<html><body>
<script type="application/ld+json">
{"@type": "Product", "name": "Widget", "price": "9.99"}
</script>
<p>PRODUCT PAGE</p>
</body></html>'''
text = html_to_text(html)
assert 'PRODUCT PAGE' in text
assert '@type' not in text
assert '"price"' not in text
def test_inline_svg_is_stripped_entirely(self):
"""
Inline SVG elements in the body are stripped by BS4 before passing to inscriptis.
SVGs can be huge (icon libraries, data visualisations) and produce garbage path-data
text. The old regex code explicitly stripped <svg>; the BS4 path must do the same.
"""
html = '''<html><body>
<p>Before SVG</p>
<svg xmlns="http://www.w3.org/2000/svg" viewBox="0 0 24 24">
<path d="M14 5L7 12L14 19Z" fill="none"/>
<circle cx="12" cy="12" r="10"/>
</svg>
<p>After SVG</p>
</body></html>'''
text = html_to_text(html)
assert 'Before SVG' in text
assert 'After SVG' in text
assert 'M14 5L7' not in text, "SVG path data should not appear in text output"
assert 'viewBox' not in text, "SVG attributes should not appear in text output"
def test_tag_inside_json_data_attribute_does_not_eat_content(self):
"""
Tags inside JSON data attributes with JS-escaped closing tags must not eat real content.
Real-world case: Elementor/JetEngine WordPress widgets embed HTML (including SVG icons)
inside JSON data attributes like data-slider-atts. The HTML inside is JS-escaped, so
closing tags appear as <\\/svg> rather than </svg>.
The old regex approach would find <svg> inside the attribute value, then fail to find
<\/svg> as a matching close tag, and scan forward to the next real </svg> in the DOM —
eating tens of kilobytes of actual page content in the process.
"""
html = '''<!DOCTYPE html>
<html>
<head><title>Test</title></head>
<body>
<div class="slider" data-slider-atts="{&quot;prevArrow&quot;:&quot;<i class=\\&quot;icon\\&quot;><svg width=\\&quot;24\\&quot; height=\\&quot;24\\&quot; viewBox=\\&quot;0 0 24 24\\&quot; xmlns=\\&quot;http:\\/\\/www.w3.org\\/2000\\/svg\\&quot;><path d=\\&quot;M14 5L7 12L14 19\\&quot;\\/><\\/svg><\\/i>&quot;}">
</div>
<div class="content">
<h1>IMPORTANT CONTENT</h1>
<p>This text must not be eaten by the tag-stripping logic.</p>
</div>
<svg><circle cx="50" cy="50" r="40"/></svg>
</body>
</html>'''
text = html_to_text(html)
assert 'IMPORTANT CONTENT' in text, (
"Content after a JS-escaped tag in a data attribute was incorrectly stripped. "
"The tag-stripping logic is matching <tag> inside attribute values and scanning "
"forward to the next real closing tag in the DOM."
)
assert 'This text must not be eaten' in text
def test_script_inside_json_data_attribute_does_not_eat_content(self):
"""Same issue as above but with <script> embedded in a data attribute with JS-escaped closing tag."""
html = '''<!DOCTYPE html>
<html>
<head><title>Test</title></head>
<body>
<div data-config="{&quot;template&quot;:&quot;<script type=\\&quot;text\\/javascript\\&quot;>var x=1;<\\/script>&quot;}">
</div>
<div>
<h1>MUST SURVIVE</h1>
<p>Real content after the data attribute with embedded script tag.</p>
</div>
<script>var real = 1;</script>
</body>
</html>'''
text = html_to_text(html)
assert 'MUST SURVIVE' in text, (
"Content after a JS-escaped <script> in a data attribute was incorrectly stripped."
)
assert 'Real content after the data attribute' in text
if __name__ == '__main__':
# Can run this file directly for quick testing
unittest.main()

View File

@@ -518,8 +518,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
# (cleanup may delete these variables, but plugins need the original references)
finalize_handler = update_handler # Capture now, before cleanup deletes it
finalize_watch = watch # Capture now, before any modifications
finalize_changed_detected = locals().get('changed_detected', False)
finalize_snapshot_id = (locals().get('update_obj') or {}).get('previous_md5') or ''
# Call quit() as backup (Puppeteer/Playwright have internal cleanup, but this acts as safety net)
try:
@@ -560,9 +558,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
update_handler=finalize_handler,
watch=finalize_watch,
datastore=datastore,
processing_exception=processing_exception,
changed_detected=finalize_changed_detected,
snapshot_id=finalize_snapshot_id,
processing_exception=processing_exception
)
except Exception as finalize_error:
logger.error(f"Worker {worker_id} error in finalize hook: {finalize_error}")

View File

@@ -1,21 +1,22 @@
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
feedgen~=1.0
feedparser~=6.0 # For parsing RSS/Atom feeds
flask-compress
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
flask-login>=0.6.3
flask-paginate
flask-socketio>=5.6.1,<6 # Re #3910
flask>=3.1,<4
flask_cors # For the Chrome extension to operate
flask_restful
flask_cors # For the Chrome extension to operate
# janus # No longer needed - using pure threading.Queue for multi-loop support
flask_wtf~=1.2
flask~=3.1
flask-socketio~=5.6.0
python-socketio~=5.16.1
python-engineio~=4.13.1
inscriptis~=2.2
python-engineio>=4.9.0,<5
python-socketio>=5.11.0,<6
pytz
timeago~=1.0
validators~=0.35
werkzeug==3.1.6
# Set these versions together to avoid a RequestsDependencyWarning
# >= 2.26 also adds Brotli support if brotli is installed
@@ -97,8 +98,12 @@ pytest ~=9.0
pytest-flask ~=1.3
pytest-mock ~=3.15
# Anything 4.0 and up but not 5.0
jsonschema ~= 4.26
# OpenAPI validation support
openapi-core[flask] ~= 0.22
openapi-core[flask] >= 0.19.0
loguru
@@ -120,7 +125,8 @@ greenlet >= 3.0.3
# Default SOCKETIO_MODE=threading is recommended for better compatibility
gevent
referencing # Don't pin — jsonschema-path (required by openapi-core>=0.18) caps referencing<0.37.0, so pinning 0.37.0 forces openapi-core back to 0.17.2. Revisit once jsonschema-path>=0.3.5 relaxes the cap.
# Previously pinned for flask_expects_json (removed 2026-02). Unpinning for now.
referencing
# For conditions
panzi-json-logic
@@ -151,10 +157,3 @@ blinker
pytest-xdist
litellm
# pydantic-core >=2.41 imports typing_extensions.Sentinel, which is absent in the
# system-installed typing_extensions on many Linux distros (e.g. Ubuntu 22/24).
# When the system path leaks into sys.path before the venv, the system copy is
# cached first and the import fails at runtime inside the LLM worker thread.
pydantic-core<2.41
pydantic<2.12