Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
9009d0a906 Upgrading flask-socketio ( #3910 ) 2026-02-23 05:00:48 +01:00
24 changed files with 39 additions and 1851 deletions

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.53.7'
__version__ = '0.53.6'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -45,7 +45,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
extra_notification_tokens=datastore.get_unique_notification_tokens_available()
)
# Remove the last option 'System default'
form.application.form.notification_format.choices.pop()
@@ -130,12 +129,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Instantiate plugin form with POST data
plugin_form = form_class(formdata=request.form)
# Save plugin settings — use plugin's own save_fn if provided
# (allows plugins to strip ephemeral staging fields etc.)
save_fn = tab.get('save_fn')
if save_fn:
save_fn(datastore, plugin_form)
elif plugin_form.data:
# Save plugin settings (validation is optional for plugins)
if plugin_form.data:
save_plugin_settings(datastore.datastore_path, plugin_id, plugin_form.data)
flash(gettext("Settings updated."))

View File

@@ -27,7 +27,6 @@
<li class="tab"><a href="#rss">{{ _('RSS') }}</a></li>
<li class="tab"><a href="{{ url_for('backups.create') }}">{{ _('Backups') }}</a></li>
<li class="tab"><a href="#timedate">{{ _('Time & Date') }}</a></li>
<li class="tab"><a href="#proxies">{{ _('CAPTCHA & Proxies') }}</a></li>
{% if plugin_tabs %}
{% for tab in plugin_tabs %}
@@ -309,7 +308,6 @@ nav
</div>
</div>
<div class="tab-pane-inner" id="proxies">
<div id="recommended-proxy">
<div>

View File

@@ -116,11 +116,11 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
for uuid in uuids:
watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=None):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData, llm_summary_q=llm_summary_q)
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData)
ui_blueprint.register_blueprint(edit_blueprint)
# Register the notification blueprint

View File

@@ -11,7 +11,7 @@ from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.time_handler import is_within_schedule
from changedetectionio import worker_pool
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, llm_summary_q=None):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
def _watch_has_tag_options_set(watch):
@@ -404,47 +404,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
download_name=filename,
mimetype='application/zip')
@edit_blueprint.route("/edit/<string:uuid>/regenerate-llm-summaries", methods=['GET'])
@login_optionally_required
def watch_regenerate_llm_summaries(uuid):
"""Queue LLM summary generation for all history entries that don't yet have one."""
from flask import flash
from changedetectionio.llm.tokens import is_llm_data_ready
watch = datastore.data['watching'].get(uuid)
if not watch:
abort(404)
if not llm_summary_q:
flash(gettext("LLM summarisation is not configured."), 'error')
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid))
history = watch.history
history_keys = list(history.keys())
queued = 0
# Skip the first entry — there is no prior snapshot to diff against
for timestamp in history_keys[1:]:
snapshot_fname = history[timestamp]
snapshot_id = os.path.basename(snapshot_fname).split('.')[0] # always 32-char MD5
# Skip entries that already have a summary
if is_llm_data_ready(watch.data_dir, snapshot_id):
continue
llm_summary_q.put({
'uuid': uuid,
'snapshot_id': snapshot_id,
'attempts': 0,
})
queued += 1
if queued:
flash(gettext("Queued %(count)d LLM summaries for generation.", count=queued), 'success')
else:
flash(gettext("All history entries already have LLM summaries."), 'notice')
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid) + '#info')
# Ajax callback
@edit_blueprint.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])
@login_optionally_required

View File

@@ -489,9 +489,6 @@ Math: {{ 1 + 1 }}") }}
<p>
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Download watch data package') }}</a>
{% if watch.history_n > 1 %}
<a href="{{url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid)}}" class="pure-button button-small">{{ _('Regenerate LLM summaries') }}</a>
{% endif %}
</p>
{% endif %}

View File

@@ -15,7 +15,6 @@ from changedetectionio.strtobool import strtobool
from threading import Event
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
from changedetectionio import worker_pool
import changedetectionio.llm as llm
from flask import (
Flask,
@@ -57,7 +56,6 @@ extra_stylesheets = []
# Use bulletproof janus-based queues for sync/async reliability
update_q = RecheckPriorityQueue()
notification_q = NotificationQueue()
llm_summary_q = llm.create_queue()
MAX_QUEUE_SIZE = 5000
app = Flask(__name__,
@@ -853,7 +851,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=llm_summary_q))
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
@@ -976,17 +974,6 @@ def changedetection_app(config=None, datastore_o=None):
).start()
logger.info(f"Started {notification_workers} notification worker(s)")
llm.start_workers(app=app, datastore=datastore, llm_q=llm_summary_q,
n_workers=int(os.getenv("LLM_WORKERS", "1")))
# Register the LLM queue plugin so changes trigger summary jobs
from changedetectionio.llm.plugin import LLMQueuePlugin
from changedetectionio.pluggy_interface import plugin_manager
plugin_manager.register(LLMQueuePlugin(llm_summary_q), 'llm_queue_plugin')
# Re-run template path configuration now that all plugins (including LLM) are registered
_configure_plugin_templates()
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
@@ -1041,65 +1028,19 @@ def notification_runner(worker_id=0):
else:
# ── LLM deferred-send gate ─────────────────────────────────────────
# If the notification was re-queued to wait for LLM data, honour the
# scheduled retry time before doing any further processing.
_llm_next_retry = n_object.get('_llm_next_retry_at', 0)
if _llm_next_retry and _llm_next_retry > time.time():
notification_q.put(n_object)
app.config.exit.wait(min(_llm_next_retry - time.time(), 2))
continue
# Apply system-config fallbacks first so we can scan the final body/title.
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
# If the body or title references llm_* tokens, wait until LLM data is ready.
import re as _re
_llm_scan = (n_object.get('notification_body') or '') + ' ' + (n_object.get('notification_title') or '')
if _re.search(r'\bllm_(?:summary|headline|importance|sentiment|one_liner)\b', _llm_scan):
from changedetectionio.llm.tokens import (
is_llm_data_ready, read_llm_tokens,
LLM_NOTIFICATION_RETRY_DELAY_SECONDS, LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS,
)
_llm_uuid = n_object.get('uuid')
_llm_watch = datastore.data['watching'].get(_llm_uuid) if _llm_uuid else None
_llm_snap_id = n_object.get('_llm_snapshot_id')
if _llm_watch and _llm_snap_id and not is_llm_data_ready(_llm_watch.data_dir, _llm_snap_id):
_llm_attempts = n_object.get('_llm_wait_attempts', 0)
if _llm_attempts < LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS:
n_object['_llm_wait_attempts'] = _llm_attempts + 1
n_object['_llm_next_retry_at'] = time.time() + LLM_NOTIFICATION_RETRY_DELAY_SECONDS
notification_q.put(n_object)
logger.debug(
f"Notification gate: LLM data pending for {_llm_uuid} "
f"(attempt {n_object['_llm_wait_attempts']}/{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS})"
)
continue
else:
logger.warning(
f"Notification: LLM data never arrived for {_llm_uuid} after "
f"{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS} attempts — sending without LLM tokens"
)
elif _llm_watch and _llm_snap_id:
# Data is ready — populate the LLM tokens into n_object
_llm_data = read_llm_tokens(_llm_watch.data_dir, _llm_snap_id)
n_object['llm_summary'] = _llm_data.get('summary', '')
n_object['llm_headline'] = _llm_data.get('headline', '')
n_object['llm_importance'] = _llm_data.get('importance')
n_object['llm_sentiment'] = _llm_data.get('sentiment', '')
n_object['llm_one_liner'] = _llm_data.get('one_liner', '')
# ── end LLM gate ───────────────────────────────────────────────────
now = datetime.now()
sent_obj = None
try:
from changedetectionio.notification.handler import process_notification
# Fallback to system config if not set
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
if not n_object.get('notification_format') and datastore.data['settings']['application'].get('notification_format'):
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
if n_object.get('notification_urls', {}):

View File

@@ -1,64 +0,0 @@
"""
changedetectionio.llm
~~~~~~~~~~~~~~~~~~~~~
LLM summary queue and workers.
Usage in flask_app.py
---------------------
import changedetectionio.llm as llm
# At module level alongside notification_q:
llm_summary_q = llm.create_queue()
# Inside changedetection_app(), after datastore is ready:
llm.start_workers(
app=app,
datastore=datastore,
llm_q=llm_summary_q,
n_workers=int(os.getenv("LLM_WORKERS", "1")),
)
Enqueueing a summary job (e.g. from the pluggy update_finalize hook)
---------------------------------------------------------------------
if changed_detected and not processing_exception:
llm_summary_q.put({
'uuid': watch_uuid,
'snapshot_id': snapshot_id,
'attempts': 0,
})
"""
import queue
import threading
from loguru import logger
def create_queue() -> queue.Queue:
"""Return a plain Queue for LLM summary jobs. No maxsize — jobs are small dicts."""
return queue.Queue()
def start_workers(app, datastore, llm_q: queue.Queue, n_workers: int = 1) -> None:
"""
Start N LLM summary worker threads.
Args:
app: Flask application instance (for app_context and exit event)
datastore: Application datastore
llm_q: Queue returned by create_queue()
n_workers: Number of parallel workers (default 1; increase for local Ollama)
"""
from changedetectionio.llm.queue_worker import llm_summary_runner
for i in range(n_workers):
threading.Thread(
target=llm_summary_runner,
args=(i, app, datastore, llm_q),
daemon=True,
name=f"LLMSummaryWorker-{i}",
).start()
logger.info(f"Started {n_workers} LLM summary worker(s)")

View File

@@ -1,104 +0,0 @@
"""
LLM plugin — provides settings tab and enqueues summary jobs on change detection.
Registered with the pluggy plugin manager at startup (flask_app.py).
The worker (llm/queue_worker.py) drains the queue asynchronously.
"""
from loguru import logger
from changedetectionio.pluggy_interface import hookimpl
def get_llm_settings(datastore):
"""Load LLM plugin settings with fallback to legacy datastore settings.
Tries the plugin settings file (llm.json) first.
Falls back to the old storage location in datastore.data['settings']['application']
for users upgrading from a version before LLM became a first-class plugin.
"""
from changedetectionio.pluggy_interface import load_plugin_settings
settings = load_plugin_settings(datastore.datastore_path, 'llm')
if settings.get('llm_connection') is not None:
return settings
# Legacy fallback: settings were stored in datastore application settings
app_settings = datastore.data['settings']['application']
connections_dict = app_settings.get('llm_connections') or {}
connections_list = [
{
'connection_id': k,
'name': v.get('name', ''),
'model': v.get('model', ''),
'api_key': v.get('api_key', ''),
'api_base': v.get('api_base', ''),
'tokens_per_minute': int(v.get('tokens_per_minute', 0) or 0),
'is_default': bool(v.get('is_default', False)),
}
for k, v in connections_dict.items()
]
return {
'llm_connection': connections_list,
'llm_summary_prompt': app_settings.get('llm_summary_prompt', ''),
}
def save_llm_settings(datastore, plugin_form):
"""Custom save handler — strips the ephemeral new_connection staging fields
so they are never persisted to llm.json."""
from changedetectionio.pluggy_interface import save_plugin_settings
data = {
'llm_connection': plugin_form.llm_connection.data,
'llm_summary_prompt': plugin_form.llm_summary_prompt.data or '',
'llm_diff_context_lines': plugin_form.llm_diff_context_lines.data or 2,
}
save_plugin_settings(datastore.datastore_path, 'llm', data)
class LLMQueuePlugin:
"""Enqueues LLM summary jobs on successful change detection and provides settings tab."""
def __init__(self, llm_q):
self.llm_q = llm_q
@hookimpl
def plugin_settings_tab(self):
from changedetectionio.llm.settings_form import LLMSettingsForm
return {
'plugin_id': 'llm',
'tab_label': 'LLM',
'form_class': LLMSettingsForm,
'template_path': 'settings-llm.html',
'save_fn': save_llm_settings,
}
@hookimpl
def update_finalize(self, update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
"""Queue an LLM summary job when a change was successfully detected."""
if not changed_detected or processing_exception or not snapshot_id:
return
if watch is None:
return
# Need ≥2 history entries — first entry has nothing to diff against
if watch.history_n < 2:
return
# Only queue when at least one LLM connection is configured
llm_settings = get_llm_settings(datastore)
has_connection = bool(
llm_settings.get('llm_connection')
or datastore.data['settings']['application'].get('llm_api_key') # legacy
or datastore.data['settings']['application'].get('llm_model') # legacy
or watch.get('llm_api_key')
or watch.get('llm_model')
)
if not has_connection:
return
uuid = watch.get('uuid')
self.llm_q.put({'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0})
logger.debug(f"LLM: queued summary for uuid={uuid} snapshot={snapshot_id}")

View File

@@ -1,544 +0,0 @@
import fcntl
import os
import queue
import re
import threading
import time
from datetime import datetime, timezone
from loguru import logger
from changedetectionio.llm.tokens import (
STRUCTURED_OUTPUT_INSTRUCTION,
parse_llm_response,
write_llm_data,
)
MAX_RETRIES = 5
RETRY_BACKOFF_BASE_SECONDS = 60 # 1m, 2m, 4m, 8m, 16m
# Token thresholds that control which summarisation strategy is used.
# Small diffs: single-pass summarise.
# Larger diffs: two-pass (enumerate all changes first, then compress).
# Very large diffs: map-reduce (chunk → enumerate per chunk → final synthesis).
TOKEN_SINGLE_PASS_THRESHOLD = 5000 # below this: one call
TOKEN_TWO_PASS_THRESHOLD = 15000 # below this: enumerate then summarise
TOKEN_CHUNK_SIZE = 5000 # tokens per map-reduce chunk
# ---------------------------------------------------------------------------
# Proactive token-bucket rate limiter — shared across all workers in process
# ---------------------------------------------------------------------------
class _RateLimitWait(Exception):
"""Raised when the bucket is empty; worker re-queues without incrementing attempts."""
def __init__(self, wait_seconds):
self.wait_seconds = wait_seconds
super().__init__(f"Rate limit: wait {wait_seconds:.1f}s")
class _TokenBucket:
"""Thread-safe continuous token bucket. tpm=0 means unlimited."""
def __init__(self, tpm):
self._lock = threading.Lock()
self._tpm = tpm
self._tokens = float(tpm) # start full
self._last_ts = time.monotonic()
def try_consume(self, n):
"""Consume n tokens. Returns (True, 0.0) on success or (False, wait_secs) if dry."""
if self._tpm == 0:
return True, 0.0
with self._lock:
now = time.monotonic()
elapsed = now - self._last_ts
self._tokens = min(self._tpm, self._tokens + elapsed * (self._tpm / 60.0))
self._last_ts = now
if self._tokens >= n:
self._tokens -= n
return True, 0.0
deficit = n - self._tokens
return False, deficit / (self._tpm / 60.0)
_rate_buckets = {}
_rate_buckets_lock = threading.Lock()
def _get_rate_bucket(conn_id, tpm):
"""Return (or lazily create) the shared _TokenBucket for this connection."""
with _rate_buckets_lock:
if conn_id not in _rate_buckets:
_rate_buckets[conn_id] = _TokenBucket(int(tpm or 0))
return _rate_buckets[conn_id]
def _parse_retry_after(exc):
"""Extract a retry-after delay (seconds) from a litellm RateLimitError."""
if hasattr(exc, 'retry_after') and exc.retry_after:
try:
return float(exc.retry_after)
except (TypeError, ValueError):
pass
m = re.search(r'(?:try again in|retry after)\s*([\d.]+)\s*s', str(exc), re.IGNORECASE)
return float(m.group(1)) + 1.0 if m else 60.0
def _read_snapshot(watch, snapshot_fname):
"""Read a snapshot file from disk, handling plain text and brotli compression."""
path = os.path.join(watch.data_dir, snapshot_fname)
if snapshot_fname.endswith('.br'):
import brotli
with open(path, 'rb') as f:
return brotli.decompress(f.read()).decode('utf-8', errors='replace')
else:
with open(path, 'r', encoding='utf-8', errors='replace') as f:
return f.read()
def _append_llm_log(log_path, model, sent_tokens, recv_tokens, elapsed_ms):
"""Append one line to the datastore-level LLM activity log.
Line format (tab-separated, LF terminated):
ISO-8601-UTC fetched LLM via <model> sent=<N> recv=<N> ms=<N>
The file is flock-locked for the duration of the write so concurrent
workers don't interleave lines.
"""
ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # ms precision
line = f"{ts}\tfetched LLM via {model}\tsent={sent_tokens}\trecv={recv_tokens}\tms={elapsed_ms}\n"
try:
with open(log_path, 'a', encoding='utf-8', newline='\n') as f:
fcntl.flock(f, fcntl.LOCK_EX)
try:
f.write(line)
f.flush()
finally:
fcntl.flock(f, fcntl.LOCK_UN)
except Exception as exc:
logger.warning(f"LLM log write failed: {exc}")
def _call_llm(model, messages, api_key=None, api_base=None, max_tokens=600, conn_id=None, tpm=0, log_path=None):
"""
Thin wrapper around litellm.completion.
Isolated as a named function so tests can mock.patch it without importing litellm.
Determinism settings
--------------------
temperature=0 — greedy decoding; same input produces the same output consistently.
seed=0 — passed through to providers that support it (OpenAI, some others)
for near-bit-identical reproducibility across calls.
Deliberately NOT set
--------------------
top_p — redundant at temperature=0 and can interact badly with some providers.
frequency_penalty / presence_penalty — would penalise the model for repeating specific
values (e.g. "$10 → $10") which is exactly wrong for change detection.
max_tokens — caller sets this based on the pass type:
enumerate pass needs more room than the final summary pass.
conn_id / tpm — optional rate limiting; when both are set, a proactive token-bucket
check is performed before calling the API. Raises _RateLimitWait if
the bucket is empty so the worker can re-queue without retrying.
log_path — when set, each call is appended to the datastore LLM activity log.
Returns the response text string.
"""
import litellm
# Proactive rate check (skipped when tpm=0 or conn_id is None)
if conn_id and tpm:
prompt_tokens = litellm.token_counter(model=model, messages=messages)
total_est = prompt_tokens + max_tokens
bucket = _get_rate_bucket(conn_id, tpm)
ok, wait = bucket.try_consume(total_est)
if not ok:
raise _RateLimitWait(wait)
kwargs = dict(
model=model,
messages=messages,
temperature=0,
seed=0,
max_tokens=max_tokens,
)
if api_key:
kwargs['api_key'] = api_key
if api_base:
kwargs['api_base'] = api_base
t0 = time.monotonic()
response = litellm.completion(**kwargs)
elapsed_ms = round((time.monotonic() - t0) * 1000)
if log_path:
usage = getattr(response, 'usage', None)
sent_tok = getattr(usage, 'prompt_tokens', 0) or 0
recv_tok = getattr(usage, 'completion_tokens', 0) or 0
_append_llm_log(log_path, model, sent_tok, recv_tok, elapsed_ms)
return response.choices[0].message.content.strip()
def _resolve_llm_connection(watch, datastore):
"""Return (model, api_key, api_base, conn_id, tpm) for the given watch.
Resolution order:
1. Watch-level connection_id pointing to a named entry in plugin settings.
2. The default entry in plugin settings (is_default=True).
3. Legacy flat fields on the watch or in global settings — backward compat.
4. Hard-coded fallback: gpt-4o-mini with no key / base.
"""
from changedetectionio.llm.plugin import get_llm_settings
from changedetectionio.llm.settings_form import sanitised_conn_id
llm_settings = get_llm_settings(datastore)
connections = llm_settings.get('llm_connection') or []
# 1. Watch-level override by explicit connection_id
watch_conn_id = watch.get('llm_connection_id')
if watch_conn_id:
for c in connections:
if c.get('connection_id') == watch_conn_id:
cid = sanitised_conn_id(c.get('connection_id', ''))
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
cid, int(c.get('tokens_per_minute', 0) or 0))
# 2. Global default connection
for c in connections:
if c.get('is_default'):
cid = sanitised_conn_id(c.get('connection_id', ''))
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
cid, int(c.get('tokens_per_minute', 0) or 0))
# 3. Legacy flat fields (backward compat)
app_settings = datastore.data['settings']['application']
model = watch.get('llm_model') or app_settings.get('llm_model', 'gpt-4o-mini')
api_key = watch.get('llm_api_key') or app_settings.get('llm_api_key', '')
api_base = watch.get('llm_api_base') or app_settings.get('llm_api_base', '')
return model, api_key, api_base, 'legacy', 0
SYSTEM_PROMPT = (
'You are a change detection assistant. '
'Be precise and factual. Never speculate. '
'Always use exact numbers, values, and quoted text when present in the diff. '
'If nothing meaningful changed, say so explicitly.'
)
def _build_context_header(watch, datastore):
"""Return a short multi-line string describing what this watch monitors.
Included lines (only when non-empty / non-redundant):
URL: <url>
Monitor: <user title or fetched page title> (omitted when same as URL)
Tags: <comma-separated tag titles> (omitted when none)
"""
url = watch.get('url', '')
title = watch.get('title', '') or watch.get('page_title', '')
lines = [f"URL: {url}"]
if title and title != url:
lines.append(f"Monitor: {title}")
tag_titles = []
for tag_uuid in (watch.get('tags') or []):
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid, {})
t = tag.get('title', '').strip()
if t:
tag_titles.append(t)
if tag_titles:
lines.append(f"Tags: {', '.join(tag_titles)}")
return '\n'.join(lines)
def _chunk_lines(lines, model, chunk_token_size):
"""Split lines into chunks that each fit within chunk_token_size tokens."""
import litellm
chunks, current, current_tokens = [], [], 0
for line in lines:
line_tokens = litellm.token_counter(model=model, text=line)
if current and current_tokens + line_tokens > chunk_token_size:
chunks.append('\n'.join(current))
current, current_tokens = [], 0
current.append(line)
current_tokens += line_tokens
if current:
chunks.append('\n'.join(current))
return chunks
def _enumerate_changes(diff_text, context_header, model, llm_kwargs):
"""
Pass 1 — ask the model to list every distinct change exhaustively, one per line.
Returns a plain-text list string.
This avoids compression decisions: the model just lists, it does not prioritise.
"""
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"Diff:\n{diff_text}\n\n"
"List every distinct change you see, one item per line. "
"Be exhaustive — do not filter or prioritise. "
"Use exact values from the diff (prices, dates, counts, quoted text)."
),
},
]
# Enumerate pass needs more output room than the final summary
return _call_llm(model=model, messages=messages, max_tokens=1200, **llm_kwargs)
def _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction=None):
"""
Pass 2 — compress the exhaustive enumeration into the final output.
Operates on a small, structured input so nothing is lost that wasn't already listed.
summary_instruction overrides the default STRUCTURED_OUTPUT_INSTRUCTION when set.
"""
instruction = summary_instruction or (
"Now produce the final structured output for all of these changes.\n\n"
+ STRUCTURED_OUTPUT_INSTRUCTION
)
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"All changes detected:\n{enumerated}\n\n"
+ instruction
),
},
]
return _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
def process_llm_summary(item, datastore):
"""
Generate an LLM summary for a detected change and write {snapshot_id}-llm.txt.
item keys:
uuid - watch UUID
snapshot_id - the newer snapshot ID (md5 hex), maps to {snapshot_id}.txt[.br]
attempts - retry counter
Summarisation strategy (chosen by diff token count):
Small (< SINGLE_PASS_TOKEN_LIMIT): one call — enumerate + summarise together.
Medium (< TWO_PASS_TOKEN_LIMIT): two calls — enumerate all changes, then compress.
Large (≥ TWO_PASS_TOKEN_LIMIT): map-reduce — chunk → enumerate per chunk →
synthesise chunk enumerations → final summary.
The two-pass / map-reduce approach prevents lossiness: temperature=0 causes the model
to greedily commit to the most prominent change and drop the rest in a single pass.
Enumerating first forces comprehensive coverage before any compression happens.
Split into _call_llm / _write_summary so each step is independently patchable in tests.
"""
import difflib
import litellm
uuid = item['uuid']
snapshot_id = item['snapshot_id']
watch = datastore.data['watching'].get(uuid)
if not watch:
raise ValueError(f"Watch {uuid} not found")
# Find this snapshot and the one before it in history
history = watch.history
history_keys = list(history.keys())
try:
idx = next(
i for i, k in enumerate(history_keys)
if os.path.basename(history[k]).split('.')[0] == snapshot_id
)
except StopIteration:
raise ValueError(f"snapshot_id {snapshot_id} not found in history for watch {uuid}")
if idx == 0:
raise ValueError(f"snapshot_id {snapshot_id} is the first history entry — no prior to diff against")
before_text = _read_snapshot(watch, history[history_keys[idx - 1]])
current_text = _read_snapshot(watch, history[history_keys[idx]])
# Resolve model / credentials via connections table (with legacy flat-field fallback)
model, api_key, api_base, conn_id, tpm = _resolve_llm_connection(watch, datastore)
url = watch.get('url', '')
context_header = _build_context_header(watch, datastore)
llm_kwargs = {
'log_path': os.path.join(datastore.datastore_path, 'llm-log.txt'),
}
if api_key:
llm_kwargs['api_key'] = api_key
if api_base:
llm_kwargs['api_base'] = api_base
if conn_id:
llm_kwargs['conn_id'] = conn_id
if tpm:
llm_kwargs['tpm'] = tpm
# Use custom prompt / context-line setting if configured
from changedetectionio.llm.plugin import get_llm_settings
llm_settings = get_llm_settings(datastore)
custom_prompt = (llm_settings.get('llm_summary_prompt') or '').strip()
summary_instruction = custom_prompt if custom_prompt else (
"Analyse all changes in this diff.\n\n" + STRUCTURED_OUTPUT_INSTRUCTION
)
context_n = int(llm_settings.get('llm_diff_context_lines') or 2)
diff_lines = list(difflib.unified_diff(
before_text.splitlines(),
current_text.splitlines(),
lineterm='',
n=context_n,
))
diff_text = '\n'.join(diff_lines)
if not diff_text.strip():
logger.debug(f"LLM: no diff content for {uuid}/{snapshot_id}, skipping")
return
diff_tokens = litellm.token_counter(model=model, text=diff_text)
logger.debug(f"LLM: diff is {diff_tokens} tokens for {uuid}/{snapshot_id}")
if diff_tokens < TOKEN_SINGLE_PASS_THRESHOLD:
# Small diff — single call, model can see everything at once
messages = [
{'role': 'system', 'content': SYSTEM_PROMPT},
{
'role': 'user',
'content': (
f"{context_header}\n"
f"Diff:\n{diff_text}\n\n"
+ summary_instruction
),
},
]
raw = _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
strategy = 'single'
elif diff_tokens < TOKEN_TWO_PASS_THRESHOLD:
# Medium diff — two-pass: enumerate exhaustively, then compress
enumerated = _enumerate_changes(diff_text, context_header, model, llm_kwargs)
raw = _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction)
strategy = 'two-pass'
else:
# Large diff — map-reduce: chunk → enumerate per chunk → synthesise
chunks = _chunk_lines(diff_lines, model, TOKEN_CHUNK_SIZE)
logger.debug(f"LLM: map-reduce over {len(chunks)} chunks for {uuid}/{snapshot_id}")
chunk_enumerations = []
for i, chunk in enumerate(chunks):
logger.debug(f"LLM: enumerating chunk {i+1}/{len(chunks)}")
chunk_enumerations.append(
_enumerate_changes(chunk, context_header, model, llm_kwargs)
)
combined = '\n'.join(chunk_enumerations)
raw = _summarise_enumeration(combined, context_header, model, llm_kwargs, summary_instruction)
strategy = 'map-reduce'
llm_data = parse_llm_response(raw)
write_llm_data(watch.data_dir, snapshot_id, llm_data)
logger.info(f"LLM tokens written for {uuid}/{snapshot_id} (strategy: {strategy}, tokens: {diff_tokens})")
def llm_summary_runner(worker_id, app, datastore, llm_q):
"""
Sync LLM summary worker — mirrors the notification_runner pattern.
One worker is the right default (LLM API rate limits constrain throughput
more than parallelism helps). Increase via LLM_WORKERS env var if using
a local Ollama endpoint with no rate limits.
Failed items are re-queued with exponential backoff (see MAX_RETRIES /
RETRY_BACKOFF_BASE_SECONDS). After MAX_RETRIES the item is dropped and
the failure is recorded on the watch.
"""
with app.app_context():
while not app.config.exit.is_set():
try:
item = llm_q.get(block=False)
except queue.Empty:
app.config.exit.wait(1)
continue
# Honour retry delay — if the item isn't due yet, put it back
# and sleep briefly rather than spinning.
next_retry_at = item.get('next_retry_at', 0)
if next_retry_at > time.time():
llm_q.put(item)
app.config.exit.wait(min(next_retry_at - time.time(), 5))
continue
uuid = item.get('uuid')
snapshot_id = item.get('snapshot_id')
attempts = item.get('attempts', 0)
logger.debug(f"LLM worker {worker_id} processing uuid={uuid} snapshot={snapshot_id} attempt={attempts}")
try:
process_llm_summary(item, datastore)
logger.info(f"LLM worker {worker_id} completed summary for uuid={uuid} snapshot={snapshot_id}")
except NotImplementedError:
# Silently drop until the processor is implemented
logger.debug(f"LLM worker {worker_id} skipping — processor not yet implemented")
except _RateLimitWait as rw:
# Proactive bucket empty — re-queue without counting as a failure
item['next_retry_at'] = time.time() + rw.wait_seconds
llm_q.put(item)
logger.info(
f"LLM worker {worker_id} rate-limited (proactive) for {rw.wait_seconds:.1f}s "
f"uuid={uuid}"
)
except Exception as e:
# Reactive: check if the API itself returned a rate-limit error
try:
import litellm as _litellm
if isinstance(e, _litellm.RateLimitError):
wait = _parse_retry_after(e)
item['next_retry_at'] = time.time() + wait
llm_q.put(item)
logger.warning(
f"LLM worker {worker_id} API rate limit for uuid={uuid}, "
f"retry in {wait:.1f}s"
)
continue
except ImportError:
pass
logger.error(f"LLM worker {worker_id} error for uuid={uuid} snapshot={snapshot_id}: {e}")
if attempts < MAX_RETRIES:
backoff = RETRY_BACKOFF_BASE_SECONDS * (2 ** attempts)
item['attempts'] = attempts + 1
item['next_retry_at'] = time.time() + backoff
llm_q.put(item)
logger.info(
f"LLM worker {worker_id} re-queued uuid={uuid} "
f"attempt={item['attempts']}/{MAX_RETRIES} retry_in={backoff}s"
)
else:
logger.error(
f"LLM worker {worker_id} gave up on uuid={uuid} snapshot={snapshot_id} "
f"after {MAX_RETRIES} attempts"
)
if uuid and uuid in datastore.data['watching']:
datastore.update_watch(
uuid=uuid,
update_obj={'last_error': f"LLM summary failed after {MAX_RETRIES} attempts: {e}"}
)

View File

@@ -1,139 +0,0 @@
import re
import uuid as _uuid
from flask_babel import lazy_gettext as _l
from wtforms import (
BooleanField,
FieldList,
Form,
FormField,
HiddenField,
IntegerField,
PasswordField,
SelectField,
StringField,
TextAreaField,
)
from wtforms.validators import Length, NumberRange, Optional
from changedetectionio.llm.tokens import STRUCTURED_OUTPUT_INSTRUCTION
# The built-in instruction appended after the diff — shown as placeholder text.
DEFAULT_SUMMARY_PROMPT = (
"Analyse all changes in this diff.\n\n"
+ STRUCTURED_OUTPUT_INSTRUCTION
)
# Allowed characters for a connection ID coming from the browser.
_CONN_ID_RE = re.compile(r'^[a-zA-Z0-9_-]{1,64}$')
def sanitised_conn_id(raw):
"""Return raw if it looks like a safe identifier, otherwise a fresh UUID."""
s = (raw or '').strip()
return s if _CONN_ID_RE.match(s) else str(_uuid.uuid4())
class LLMConnectionEntryForm(Form):
"""Schema for a single LLM connection.
Declaring every field here is what prevents arbitrary key injection:
only these fields can ever reach the datastore from this form.
"""
connection_id = HiddenField()
name = StringField(_l('Name'), validators=[Optional(), Length(max=100)])
model = StringField(_l('Model string'), validators=[Optional(), Length(max=200)])
api_key = StringField(_l('API Key'), validators=[Optional(), Length(max=500)])
api_base = StringField(_l('API Endpoint'), validators=[Optional(), Length(max=500)])
tokens_per_minute = IntegerField(_l('Tokens/min'), validators=[Optional(), NumberRange(min=0, max=10_000_000)], default=0)
is_default = BooleanField(_l('Default'), validators=[Optional()])
class LLMNewConnectionForm(Form):
"""Staging fields for the 'Add a connection' UI.
These are read client-side by llm.js to build a new FieldList entry on click.
They are never used server-side — render_kw sets the id attributes llm.js
looks up with $('#llm-add-name') etc.
"""
preset = SelectField(
_l('Provider template'),
validate_choice=False,
# WTForms 3.x uses a dict for optgroups (has_groups() checks isinstance(choices, dict)).
# An empty-string key renders as <optgroup label=""> which browsers treat as ungrouped.
choices={
'': [('', '')],
_l('Cloud'): [
('openai-mini', 'OpenAI — gpt-4o-mini'),
('openai-4o', 'OpenAI — gpt-4o'),
('anthropic-haiku', 'Anthropic — claude-3-haiku'),
('anthropic-sonnet', 'Anthropic — claude-3-5-sonnet'),
('groq-8b', 'Groq — llama-3.1-8b-instant'),
('groq-70b', 'Groq — llama-3.3-70b-versatile'),
('gemini-flash', 'Google — gemini-1.5-flash'),
('mistral-small', 'Mistral — mistral-small'),
('deepseek', 'DeepSeek — deepseek-chat'),
('openrouter', 'OpenRouter (custom model)'),
],
_l('Local'): [
('ollama-llama', 'Ollama — llama3.1'),
('ollama-mistral', 'Ollama — mistral'),
('lmstudio', 'LM Studio'),
],
_l('Custom'): [
('custom', _l('Manual entry')),
],
},
render_kw={'id': 'llm-preset'},
)
name = StringField(_l('Name'),
render_kw={'id': 'llm-add-name', 'size': 30,
'autocomplete': 'off'})
model = StringField(_l('Model string'),
render_kw={'id': 'llm-add-model', 'size': 40,
'placeholder': 'gpt-4o-mini', 'autocomplete': 'off'})
api_key = PasswordField(_l('API Key'),
render_kw={'id': 'llm-add-key', 'size': 40,
'placeholder': 'sk-…', 'autocomplete': 'off'})
api_base = StringField(_l('API Endpoint'),
render_kw={'id': 'llm-add-base', 'size': 40,
'placeholder': 'http://localhost:11434', 'autocomplete': 'off'})
tokens_per_minute = IntegerField(_l('Tokens/min'), default=0,
render_kw={'id': 'llm-add-tpm', 'style': 'width: 8em;',
'min': '0', 'step': '1000'})
class LLMSettingsForm(Form):
"""WTForms form for the LLM settings tab.
llm_connection is a FieldList of LLMConnectionEntryForm entries.
llm.js emits individual hidden inputs (llm_connection-N-fieldname) on submit
instead of a JSON blob, so WTForms processes them through the declared schema.
"""
llm_connection = FieldList(FormField(LLMConnectionEntryForm), min_entries=0)
new_connection = FormField(LLMNewConnectionForm)
llm_diff_context_lines = IntegerField(
_l('Diff context lines'),
validators=[Optional(), NumberRange(min=0, max=20)],
default=2,
description=_l(
'Number of unchanged lines shown around each change in the diff. '
'More lines give the LLM more context but increase token usage. (default: 2)'
),
render_kw={'style': 'width: 5em;', 'min': '0', 'max': '20'},
)
llm_summary_prompt = TextAreaField(
_l('Summary prompt'),
validators=[Optional()],
description=_l(
'Override the instruction sent to the LLM after the diff. '
'Leave blank to use the built-in default (structured JSON output).'
),
render_kw={
'rows': 8,
'placeholder': DEFAULT_SUMMARY_PROMPT,
'class': 'pure-input-1',
},
)

View File

@@ -1,103 +0,0 @@
<script src="{{url_for('static_content', group='js', filename='llm.js')}}" defer></script>
<script>
var LLM_CONNECTIONS = (function () {
var list = {{ plugin_form.llm_connection.data|tojson }};
var out = {};
(list || []).forEach(function (c) { if (c && c.connection_id) out[c.connection_id] = c; });
return out;
}());
var LLM_I18N = {
noConnections: '{{ _("No connections configured yet.") }}',
setDefault: '{{ _("Set as default") }}',
remove: '{{ _("Remove") }}',
show: '{{ _("show") }}',
hide: '{{ _("hide") }}',
nameModelRequired: '{{ _("Name and Model string are required.") }}'
};
</script>
{# ── Configured connections table ──────────────────── #}
<fieldset>
<legend>{{ _('LLM Connections') }}</legend>
<table class="pure-table pure-table-horizontal llm-connections">
<thead>
<tr>
<th class="llm-col-def" title="{{ _('Default') }}">{{ _('Default') }}</th>
<th class="llm-col-name">{{ _('Name') }}</th>
<th class="llm-col-model">{{ _('Model') }}</th>
<th class="llm-col-key">{{ _('API Key') }}</th>
<th class="llm-col-tpm" title="{{ _('Tokens per minute limit (0 = unlimited)') }}">{{ _('TPM') }}</th>
<th class="llm-col-del"></th>
</tr>
</thead>
<tbody id="llm-connections-tbody">
</tbody>
</table>
</fieldset>
{# ── Add connection ─────────────────────────────────── #}
{% set nf = plugin_form.new_connection.form %}
<fieldset>
<legend>{{ _('Add a connection') }}</legend>
<div class="pure-control-group">
{{ nf.preset.label }}
{{ nf.preset() }}
</div>
<div class="pure-control-group">
{{ nf.name.label }}
{{ nf.name(placeholder=_('e.g. My OpenAI')) }}
</div>
<div class="pure-control-group">
{{ nf.model.label }}
{{ nf.model() }}
</div>
<div class="pure-control-group">
<label for="llm-add-key">
{{ _('API Key') }}
<span class="pure-form-message-inline">({{ _('leave blank for local') }})</span>
</label>
<div class="llm-key-wrap">
{{ nf.api_key() }}
<button type="button" id="llm-key-toggle" class="pure-button">{{ _('show') }}</button>
</div>
</div>
<div class="pure-control-group" id="llm-base-group" style="display:none">
<label for="llm-add-base">
{{ _('API Endpoint') }}
<span class="pure-form-message-inline">({{ _('optional') }})</span>
</label>
{{ nf.api_base() }}
</div>
<div class="pure-control-group">
<label for="llm-add-tpm">
{{ _('Tokens/min limit') }}
<span class="pure-form-message-inline">({{ _('0 = unlimited') }})</span>
</label>
{{ nf.tokens_per_minute() }}
</div>
<div class="pure-controls">
<button type="button" id="llm-btn-add" class="pure-button pure-button-primary">{{ _('+ Add connection') }}</button>
</div>
</fieldset>
{# ── Prompt configuration ────────────────────────────────── #}
<fieldset>
<legend>{{ _('Summary Prompt') }}</legend>
<div class="pure-control-group">
{{ plugin_form.llm_diff_context_lines.label }}
{{ plugin_form.llm_diff_context_lines() }}
<span class="pure-form-message-inline">
{{ _('Unchanged lines shown around each change in the diff sent to the LLM. More lines = more context but higher token cost. (default: 2)') }}
</span>
</div>
<div class="pure-control-group">
{{ render_field(plugin_form.llm_summary_prompt) }}
<span class="pure-form-message-inline">
{{ _('Instruction appended after the diff in every LLM call. Leave blank to use the built-in default (structured JSON output).') }}
</span>
</div>
</fieldset>

View File

@@ -1,197 +0,0 @@
"""
LLM notification token definitions and file I/O helpers.
All LLM data for a snapshot is stored under a dedicated subdirectory:
{data_dir}/llm/{snapshot_id}-llm.json
A plain-text {snapshot_id}-llm.txt is also written containing just the
summary field, for backward compatibility with any code that already reads it.
Token catalogue
---------------
llm_summary 1-3 sentence description of all changes, exact values.
llm_headline 5-8 word punchy title — ideal for the notification subject line.
llm_importance Numeric 1-10 significance score; enables routing rules like
"only escalate if llm_importance >= 8".
llm_sentiment Machine-readable: "positive", "negative", or "neutral".
Useful for trend tracking and coloured alert styling.
llm_one_liner Shortest useful summary — one sentence for SMS, Pushover,
and other character-limited channels.
"""
import os
import json
from loguru import logger
# ── Constants ──────────────────────────────────────────────────────────────
LLM_TOKEN_NAMES = (
'llm_summary',
'llm_headline',
'llm_importance',
'llm_sentiment',
'llm_one_liner',
)
# How long the notification runner waits for LLM data before giving up.
LLM_NOTIFICATION_RETRY_DELAY_SECONDS = int(os.getenv('LLM_NOTIFICATION_RETRY_DELAY', '10'))
LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS = int(os.getenv('LLM_NOTIFICATION_MAX_WAIT', '18')) # 18 × 10s = 3 min
# JSON prompt fragment — embedded in the final summarisation call.
STRUCTURED_OUTPUT_INSTRUCTION = (
'Return ONLY a valid JSON object — no markdown fences, no extra text — using exactly these keys:\n'
'{"summary":"1-3 sentences covering ALL changes; use exact values from the diff.","headline":"5-8 word punchy title for this specific change","importance":7,"sentiment":"positive","one_liner":"One sentence for SMS/push character limits."}\n'
'importance: 1=trivial whitespace, 5=moderate content change, 10=critical price/availability change.\n'
'sentiment: "positive" (desirable for the user), "negative" (undesirable), or "neutral" (informational only).'
)
# ── File I/O ───────────────────────────────────────────────────────────────
def llm_subdir(data_dir: str) -> str:
"""Return the llm/ subdirectory path (does not create it)."""
return os.path.join(data_dir, 'llm')
def llm_json_path(data_dir: str, snapshot_id: str) -> str:
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.json")
def llm_txt_path(data_dir: str, snapshot_id: str) -> str:
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.txt")
def is_llm_data_ready(data_dir: str, snapshot_id: str) -> bool:
"""Return True if LLM data has been written for this snapshot."""
return os.path.exists(llm_json_path(data_dir, snapshot_id)) or \
os.path.exists(llm_txt_path(data_dir, snapshot_id))
def read_llm_tokens(data_dir: str, snapshot_id: str) -> dict:
"""
Read LLM token data for a snapshot.
Tries JSON first (new format), falls back to plain .txt (old format).
Returns an empty dict if no data is available yet.
"""
json_file = llm_json_path(data_dir, snapshot_id)
if os.path.exists(json_file):
try:
with open(json_file, 'r', encoding='utf-8') as f:
data = json.load(f)
if isinstance(data, dict):
return _normalise(data)
except Exception as exc:
logger.warning(f"LLM tokens: failed to read {json_file}: {exc}")
txt_file = llm_txt_path(data_dir, snapshot_id)
if os.path.exists(txt_file):
try:
with open(txt_file, 'r', encoding='utf-8') as f:
summary = f.read().strip()
return _normalise({'summary': summary, 'one_liner': summary[:200]})
except Exception as exc:
logger.warning(f"LLM tokens: failed to read {txt_file}: {exc}")
return {}
def write_llm_data(data_dir: str, snapshot_id: str, data: dict) -> str:
"""
Atomically write LLM data to the llm/ subdirectory.
Writes:
llm/{snapshot_id}-llm.json — full structured data (all tokens)
llm/{snapshot_id}-llm.txt — plain summary text (backward compat)
Returns the path of the JSON file.
"""
normalised = _normalise(data)
subdir = llm_subdir(data_dir)
os.makedirs(subdir, exist_ok=True)
json_file = llm_json_path(data_dir, snapshot_id)
_atomic_write_text(json_file, json.dumps(normalised, ensure_ascii=False))
txt_file = llm_txt_path(data_dir, snapshot_id)
_atomic_write_text(txt_file, normalised.get('summary', ''))
return json_file
def parse_llm_response(response: str) -> dict:
"""
Parse a structured JSON response from the LLM.
Tries strict JSON parse, then extracts from markdown code fences,
then a bare object search. Falls back to treating the whole response
as the 'summary' field if nothing parses.
"""
import re
text = response.strip()
# 1. Direct JSON parse
try:
obj = json.loads(text)
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 2. Markdown code fence: ```json { ... } ```
m = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group(1))
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 3. Bare JSON object anywhere in the response
m = re.search(r'\{[^{}]*\}', text, re.DOTALL)
if m:
try:
obj = json.loads(m.group(0))
if isinstance(obj, dict):
return _normalise(obj)
except (json.JSONDecodeError, ValueError):
pass
# 4. Fallback — treat entire response as summary
logger.debug("LLM response was not valid JSON — using raw text as summary")
return _normalise({'summary': text, 'one_liner': text[:200] if len(text) > 200 else text})
# ── Internal helpers ───────────────────────────────────────────────────────
def _normalise(data: dict) -> dict:
"""Return a clean token dict with all expected keys present."""
importance = data.get('importance')
if importance is not None:
try:
importance = max(1, min(10, int(float(importance))))
except (TypeError, ValueError):
importance = None
sentiment = str(data.get('sentiment', '')).lower().strip()
if sentiment not in ('positive', 'negative', 'neutral'):
sentiment = ''
return {
'summary': str(data.get('summary', '') or '').strip(),
'headline': str(data.get('headline', '') or '').strip(),
'importance': importance,
'sentiment': sentiment,
'one_liner': str(data.get('one_liner', '') or '').strip(),
}
def _atomic_write_text(path: str, text: str) -> None:
tmp = path + '.tmp'
with open(tmp, 'w', encoding='utf-8') as f:
f.write(text)
f.flush()
os.fsync(f.fileno())
os.replace(tmp, path)

View File

@@ -1009,31 +1009,14 @@ class model(EntityPersistenceMixin, watch_base):
def extra_notification_token_values(self):
from changedetectionio.llm.tokens import read_llm_tokens
history = self.history
if not history:
return {}
latest_fname = history[list(history.keys())[-1]]
snapshot_id = os.path.basename(latest_fname).split('.')[0] # always 32-char MD5
data = read_llm_tokens(self.data_dir, snapshot_id)
if not data:
return {}
return {
'llm_summary': data.get('summary', ''),
'llm_headline': data.get('headline', ''),
'llm_importance': data.get('importance'),
'llm_sentiment': data.get('sentiment', ''),
'llm_one_liner': data.get('one_liner', ''),
}
# Used for providing extra tokens
# return {'widget': 555}
return {}
def extra_notification_token_placeholder_info(self):
return [
('llm_summary', "LLM: 1-3 sentence summary of all changes with exact values"),
('llm_headline', "LLM: 5-8 word punchy title for this specific change"),
('llm_importance', "LLM: Significance score 1-10 (1=trivial, 10=critical)"),
('llm_sentiment', "LLM: Change sentiment — positive, negative, or neutral"),
('llm_one_liner', "LLM: One sentence for SMS/push character limits"),
]
# Used for providing extra tokens
# return [('widget', "Get widget amounts")]
return []
def extract_regex_from_all_history(self, regex):

View File

@@ -6,7 +6,6 @@ Extracted from update_worker.py to provide standalone notification functionality
for both sync and async workers
"""
import datetime
import os
import pytz
from loguru import logger
@@ -83,12 +82,6 @@ class NotificationContextData(dict):
'watch_tag': None,
'watch_title': None,
'watch_url': 'https://WATCH-PLACE-HOLDER/',
# LLM-generated tokens (populated by notification_runner once LLM data is ready)
'llm_headline': None,
'llm_importance': None,
'llm_one_liner': None,
'llm_sentiment': None,
'llm_summary': None,
})
# Apply any initial data passed in
@@ -281,11 +274,6 @@ class NotificationService:
timestamp_changed=dates[date_index_to]))
if self.notification_q:
# Store snapshot_id hint so notification_runner can gate on LLM data readiness
if watch and len(dates) > 0:
latest_fname = watch.history.get(dates[date_index_to], '')
if latest_fname:
n_object['_llm_snapshot_id'] = os.path.basename(latest_fname).split('.')[0]
logger.debug("Queued notification for sending")
self.notification_q.put(n_object)
else:

View File

@@ -151,8 +151,7 @@ class ChangeDetectionSpec:
pass
@hookspec
def update_finalize(update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
def update_finalize(update_handler, watch, datastore, processing_exception):
"""Called after watch processing completes (success or failure).
This hook is called in the finally block after all processing is complete,
@@ -169,10 +168,6 @@ class ChangeDetectionSpec:
processing_exception: The exception from the main processing block, or None if successful.
This does NOT include cleanup exceptions - only exceptions from
the actual watch processing (fetch, diff, etc).
changed_detected: True when the processor detected a content change (default False).
snapshot_id: MD5 hex string of the new snapshot, matches the prefix of the history
filename (e.g. 'abc123…''abc123….txt[.br]'). None when no snapshot
was saved (first run, error, same content).
Returns:
None: This hook doesn't return a value
@@ -585,8 +580,7 @@ def apply_update_handler_alter(update_handler, watch, datastore):
return current_handler
def apply_update_finalize(update_handler, watch, datastore, processing_exception,
changed_detected=False, snapshot_id=None):
def apply_update_finalize(update_handler, watch, datastore, processing_exception):
"""Apply update_finalize hooks from all plugins.
Called in the finally block after watch processing completes, allowing plugins
@@ -597,8 +591,6 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
watch: The watch dict that was processed (may be None)
datastore: The application datastore
processing_exception: The exception from processing, or None if successful
changed_detected: True when the processor detected a content change.
snapshot_id: MD5 hex string of the new snapshot, or None.
Returns:
None
@@ -609,9 +601,7 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
update_handler=update_handler,
watch=watch,
datastore=datastore,
processing_exception=processing_exception,
changed_detected=changed_detected,
snapshot_id=snapshot_id,
processing_exception=processing_exception
)
except Exception as e:
# Don't let plugin errors crash the worker

View File

@@ -1,187 +0,0 @@
/* llm.js — LLM Connections management (settings page)
* Depends on: jQuery (global), LLM_CONNECTIONS + LLM_I18N injected by Jinja2 template.
*/
(function ($) {
'use strict';
// Provider presets: [value, label, model, api_base, tpm]
// tpm = tokens-per-minute limit (0 = unlimited / local).
// Defaults reflect free-tier or conservative tier-1 limits.
var LLM_PRESETS = [
['openai-mini', 'OpenAI — gpt-4o-mini', 'gpt-4o-mini', '', 200000],
['openai-4o', 'OpenAI — gpt-4o', 'gpt-4o', '', 30000],
['anthropic-haiku', 'Anthropic — claude-3-haiku', 'anthropic/claude-3-haiku-20240307', '', 100000],
['anthropic-sonnet', 'Anthropic — claude-3-5-sonnet', 'anthropic/claude-3-5-sonnet-20241022', '', 40000],
['groq-8b', 'Groq — llama-3.1-8b-instant', 'groq/llama-3.1-8b-instant', '', 6000],
['groq-70b', 'Groq — llama-3.3-70b-versatile', 'groq/llama-3.3-70b-versatile', '', 6000],
['gemini-flash', 'Google — gemini-1.5-flash', 'gemini/gemini-1.5-flash', '', 1000000],
['mistral-small', 'Mistral — mistral-small', 'mistral/mistral-small-latest', '', 500000],
['deepseek', 'DeepSeek — deepseek-chat', 'deepseek/deepseek-chat', '', 50000],
['openrouter', 'OpenRouter (custom model)', 'openrouter/', '', 20000],
['ollama-llama', 'Ollama — llama3.1 (local)', 'ollama/llama3.1', 'http://localhost:11434', 0],
['ollama-mistral', 'Ollama — mistral (local)', 'ollama/mistral', 'http://localhost:11434', 0],
['lmstudio', 'LM Studio (local)', 'openai/local', 'http://localhost:1234/v1', 0],
];
var presetMap = {};
$.each(LLM_PRESETS, function (_, p) { presetMap[p[0]] = p; });
function escHtml(s) {
return $('<div>').text(String(s)).html();
}
function maskKey(k) {
if (!k) return '<span style="color:var(--color-grey-700)">—</span>';
return escHtml(k.substring(0, 4)) + '••••';
}
// Emit WTForms FieldList hidden inputs (llm_connection-N-fieldname) so the
// server processes connections through the declared schema — no arbitrary keys.
function serialise() {
var $form = $('form.settings');
$form.find('input[data-llm-gen]').remove();
var ids = Object.keys(LLM_CONNECTIONS);
$.each(ids, function (i, id) {
var c = LLM_CONNECTIONS[id];
var prefix = 'llm_connection-' + i + '-';
var fields = {
connection_id: id,
name: c.name || '',
model: c.model || '',
api_key: c.api_key || '',
api_base: c.api_base || '',
tokens_per_minute: parseInt(c.tokens_per_minute || 0, 10)
};
$.each(fields, function (field, value) {
$('<input>').attr({ type: 'hidden', name: prefix + field, value: value, 'data-llm-gen': '1' }).appendTo($form);
});
// BooleanField: only emit when true (absence == false in WTForms)
if (c.is_default) {
$('<input>').attr({ type: 'hidden', name: prefix + 'is_default', value: 'y', 'data-llm-gen': '1' }).appendTo($form);
}
});
}
function renderTable() {
var $tbody = $('#llm-connections-tbody');
$tbody.empty();
var ids = Object.keys(LLM_CONNECTIONS);
if (!ids.length) {
$tbody.html('<tr class="llm-empty"><td colspan="6">' + escHtml(LLM_I18N.noConnections) + '</td></tr>');
return;
}
$.each(ids, function (_, id) {
var c = LLM_CONNECTIONS[id];
var tpm = parseInt(c.tokens_per_minute || 0, 10);
var tpmLabel = tpm ? tpm.toLocaleString() : '<span style="color:var(--color-grey-700)">∞</span>';
$tbody.append(
'<tr>' +
'<td class="llm-col-def">' +
'<input type="radio" class="llm-default-radio" name="llm_default_radio"' +
' title="' + escHtml(LLM_I18N.setDefault) + '"' +
(c.is_default ? ' checked' : '') +
' data-id="' + escHtml(id) + '">' +
'</td>' +
'<td class="llm-col-name">' + escHtml(c.name) + '</td>' +
'<td class="llm-col-model">' + escHtml(c.model) + '</td>' +
'<td class="llm-col-key">' + maskKey(c.api_key) + '</td>' +
'<td class="llm-col-tpm">' + tpmLabel + '</td>' +
'<td class="llm-col-del">' +
'<button type="button" class="llm-del"' +
' title="' + escHtml(LLM_I18N.remove) + '"' +
' data-id="' + escHtml(id) + '">×</button>' +
'</td>' +
'</tr>'
);
});
}
$(function () {
// Event delegation on tbody — survives re-renders
$('#llm-connections-tbody')
.on('change', '.llm-default-radio', function () {
var chosen = String($(this).data('id'));
$.each(LLM_CONNECTIONS, function (k) {
LLM_CONNECTIONS[k].is_default = (k === chosen);
});
serialise();
})
.on('click', '.llm-del', function () {
var id = String($(this).data('id'));
delete LLM_CONNECTIONS[id];
var remaining = Object.keys(LLM_CONNECTIONS);
if (remaining.length && !remaining.some(function (k) { return LLM_CONNECTIONS[k].is_default; })) {
LLM_CONNECTIONS[remaining[0]].is_default = true;
}
renderTable();
serialise();
});
function updateBaseVisibility() {
var val = $('#llm-preset').val();
var preset = presetMap[val];
var hasBase = preset ? !!preset[3] : (val === 'custom');
var show = (val === 'custom') || hasBase;
$('#llm-base-group').toggle(show);
}
// Preset dropdown pre-fills add form
$('#llm-preset').on('change', function () {
var val = $(this).val();
var p = presetMap[val];
if (p) {
$('#llm-add-name').val(p[1].replace(/\s*—.*/, '').trim());
$('#llm-add-model').val(p[2]);
$('#llm-add-base').val(p[3]);
$('#llm-add-tpm').val(p[4] !== undefined ? p[4] : 0);
$('#llm-add-key').val('');
}
updateBaseVisibility();
});
// Add connection
$('#llm-btn-add').on('click', function () {
var name = $.trim($('#llm-add-name').val());
var model = $.trim($('#llm-add-model').val());
var key = $.trim($('#llm-add-key').val());
var base = $.trim($('#llm-add-base').val());
var tpm = parseInt($('#llm-add-tpm').val(), 10) || 0;
if (!name || !model) {
alert(LLM_I18N.nameModelRequired);
return;
}
var id = 'llm-' + Date.now();
var isFirst = !Object.keys(LLM_CONNECTIONS).length;
LLM_CONNECTIONS[id] = {
name: name, model: model, api_key: key, api_base: base,
tokens_per_minute: tpm, is_default: isFirst
};
$('#llm-preset, #llm-add-name, #llm-add-model, #llm-add-key, #llm-add-base').val('');
$('#llm-add-tpm').val('0');
$('#llm-base-group').hide();
renderTable();
serialise();
});
// Show/hide API key visibility
$('#llm-key-toggle').on('click', function () {
var $inp = $('#llm-add-key');
if ($inp.attr('type') === 'password') {
$inp.attr('type', 'text');
$(this).text(LLM_I18N.hide);
} else {
$inp.attr('type', 'password');
$(this).text(LLM_I18N.show);
}
});
// Serialise connections to hidden field before form submit
$('form.settings').on('submit', serialise);
// Init
renderTable();
serialise();
});
}(jQuery));

View File

@@ -1,57 +0,0 @@
#llm {
// ── Key field wrapper — input + show/hide toggle inline ───────────────
.llm-key-wrap {
display: flex;
gap: 0.3em;
align-items: center;
input { flex: 1; min-width: 0; }
button {
flex: 0 0 auto;
}
}
// ── Pure-grid column padding consistency ──────────────────────────────
.pure-u-md-1-2 {
.pure-control-group {
padding-right: 1em;
}
}
// ── Connections table ─────────────────────────────────────────────────
table.llm-connections {
width: 100%;
.llm-col-def { width: 3em; text-align: center; }
.llm-col-name { font-weight: 500; }
.llm-col-model { font-family: monospace; font-size: 0.85em; color: var(--color-grey-400); }
.llm-col-key {
font-family: monospace; font-size: 0.82em; color: var(--color-grey-600);
max-width: 140px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;
}
.llm-col-del { width: 2.5em; text-align: center; }
.llm-del {
background: none;
border: none;
cursor: pointer;
color: var(--color-grey-600);
padding: 0.15em 0.4em;
border-radius: 3px;
font-size: 1.1em;
line-height: 1;
&:hover { color: var(--color-dark-red); background: #ffeaea; }
}
.llm-empty td {
text-align: center;
color: var(--color-grey-600);
padding: 1.8em;
font-style: italic;
font-size: 0.92em;
}
}
.llm-default-radio { cursor: pointer; }
}

View File

@@ -32,7 +32,6 @@
@use "parts/toast";
@use "parts/login_form";
@use "parts/tabs";
@use "parts/llm";
// Smooth transitions for theme switching
body,

File diff suppressed because one or more lines are too long

View File

@@ -249,14 +249,13 @@ def prepare_test_function(live_server, datastore_path):
# CRITICAL: Get datastore and stop it from writing stale data
datastore = live_server.app.config.get('DATASTORE')
# Clear the queues before starting the test to prevent state leakage
from changedetectionio.flask_app import update_q, llm_summary_q
for q in (update_q, llm_summary_q):
while not q.empty():
try:
q.get_nowait()
except:
break
# Clear the queue before starting the test to prevent state leakage
from changedetectionio.flask_app import update_q
while not update_q.empty():
try:
update_q.get_nowait()
except:
break
# Add test helper methods to the app for worker management
def set_workers(count):

View File

@@ -1,260 +0,0 @@
"""
Tests for LLM summary queue, worker, and regenerate route.
Mocking strategy
----------------
- `_call_llm` is patched at the module level so no real LiteLLM/API calls are made.
- `_write_summary` is left un-patched so we can assert the file was actually written.
- `process_llm_summary` is called directly in unit tests (no worker thread needed).
"""
import os
import queue
import time
from unittest.mock import patch, MagicMock
import pytest
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, wait_for_all_checks
# ---------------------------------------------------------------------------
# Unit tests — process_llm_summary directly, no HTTP, no worker thread
# ---------------------------------------------------------------------------
class TestProcessLlmSummary:
def _make_watch_with_two_snapshots(self, client, datastore_path):
"""Helper: returns (datastore, uuid, snapshot_id) with 2 history entries."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
snapshot_id = os.path.basename(watch.history[history_keys[1]]).split('.')[0]
return datastore, uuid, snapshot_id
def test_writes_summary_file(self, client, live_server, datastore_path):
"""process_llm_summary writes {snapshot_id}-llm.txt when _call_llm succeeds."""
datastore, uuid, snapshot_id = self._make_watch_with_two_snapshots(client, datastore_path)
watch = datastore.data['watching'][uuid]
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with patch('changedetectionio.llm.queue_worker._call_llm', return_value='Price dropped from $10 to $8.') as mock_llm:
process_llm_summary(item, datastore)
assert mock_llm.called
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
assert os.path.exists(summary_path), "Summary file was not written"
assert open(summary_path).read() == 'Price dropped from $10 to $8.'
def test_call_llm_uses_temperature_zero_and_seed(self, client, live_server, datastore_path):
"""_call_llm always passes temperature=0 and seed=0 to litellm for determinism."""
import litellm
from changedetectionio.llm.queue_worker import _call_llm
messages = [{'role': 'user', 'content': 'hello'}]
mock_response = MagicMock()
mock_response.choices[0].message.content = 'ok'
with patch('litellm.completion', return_value=mock_response) as mock_completion:
_call_llm(model='gpt-4o-mini', messages=messages)
call_kwargs = mock_completion.call_args.kwargs
assert call_kwargs['temperature'] == 0, "temperature must be 0"
assert call_kwargs['seed'] == 0, "seed must be 0 for reproducibility"
assert 'top_p' not in call_kwargs, "top_p must not be set (redundant at temp=0)"
assert 'frequency_penalty' not in call_kwargs, "frequency_penalty must not be set"
assert 'presence_penalty' not in call_kwargs, "presence_penalty must not be set"
def test_skips_first_history_entry(self, client, live_server, datastore_path):
"""process_llm_summary raises ValueError for the first history entry (no prior to diff)."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
first_fname = watch.history[history_keys[0]]
snapshot_id = os.path.basename(first_fname).split('.')[0]
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with pytest.raises(ValueError, match="first history entry"):
process_llm_summary(item, datastore)
def test_raises_for_unknown_watch(self, client, live_server, datastore_path):
"""process_llm_summary raises ValueError if the watch UUID doesn't exist."""
datastore = client.application.config['DATASTORE']
item = {'uuid': 'does-not-exist', 'snapshot_id': 'abc123', 'attempts': 0}
from changedetectionio.llm.queue_worker import process_llm_summary
with pytest.raises(ValueError, match="not found"):
process_llm_summary(item, datastore)
# ---------------------------------------------------------------------------
# Unit tests — worker retry logic, no HTTP
# ---------------------------------------------------------------------------
class TestWorkerRetry:
def test_requeues_on_failure_with_backoff(self, client, live_server, datastore_path):
"""Worker re-queues a failed item with incremented attempts and future next_retry_at."""
from changedetectionio.llm.queue_worker import MAX_RETRIES, RETRY_BACKOFF_BASE_SECONDS
llm_q = queue.Queue()
app = client.application
datastore = client.application.config['DATASTORE']
item = {'uuid': 'fake-uuid', 'snapshot_id': 'abc123', 'attempts': 0}
llm_q.put(item)
from changedetectionio.llm.queue_worker import process_llm_summary
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("API down")):
# Run one iteration manually (don't start the full runner thread)
from changedetectionio.llm import queue_worker
got = llm_q.get(block=False)
try:
queue_worker.process_llm_summary(got, datastore)
except Exception as e:
got['attempts'] += 1
got['next_retry_at'] = time.time() + RETRY_BACKOFF_BASE_SECONDS * (2 ** (got['attempts'] - 1))
llm_q.put(got)
assert llm_q.qsize() == 1
requeued = llm_q.get_nowait()
assert requeued['attempts'] == 1
assert requeued['next_retry_at'] > time.time()
def test_drops_after_max_retries(self, client, live_server, datastore_path):
"""Worker drops item and records last_error after MAX_RETRIES exhausted."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
from changedetectionio.llm.queue_worker import MAX_RETRIES
item = {'uuid': uuid, 'snapshot_id': 'abc123', 'attempts': MAX_RETRIES}
llm_q = queue.Queue()
llm_q.put(item)
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("still down")):
from changedetectionio.llm import queue_worker
got = llm_q.get(block=False)
try:
queue_worker.process_llm_summary(got, datastore)
except Exception as e:
if got['attempts'] < MAX_RETRIES:
llm_q.put(got)
else:
datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
# Queue should be empty — item was dropped
assert llm_q.empty()
watch = datastore.data['watching'][uuid]
assert 'still down' in (watch.get('last_error') or '')
# ---------------------------------------------------------------------------
# Route tests — GET /edit/<uuid>/regenerate-llm-summaries
# ---------------------------------------------------------------------------
class TestRegenerateLlmSummariesRoute:
def test_queues_missing_summaries(self, client, live_server, datastore_path):
"""Route queues one item per history entry that lacks a -llm.txt file."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
assert watch.history_n >= 2
from changedetectionio.flask_app import llm_summary_q
res = client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
follow_redirects=True,
)
assert res.status_code == 200
# history_n - 1 items queued (first entry skipped, no prior to diff)
expected = watch.history_n - 1
assert llm_summary_q.qsize() == expected
# Each item has the right shape
items = []
while not llm_summary_q.empty():
items.append(llm_summary_q.get_nowait())
for item in items:
assert item['uuid'] == uuid
assert item['attempts'] == 0
assert len(item['snapshot_id']) == 32 # MD5 hex
def test_skips_already_summarised_entries(self, client, live_server, datastore_path):
"""Route skips entries where {snapshot_id}-llm.txt already exists."""
set_original_response(datastore_path=datastore_path)
datastore = client.application.config['DATASTORE']
test_url = url_for('test_endpoint', _external=True)
uuid = datastore.add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response(datastore_path=datastore_path)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = datastore.data['watching'][uuid]
history_keys = list(watch.history.keys())
second_fname = watch.history[history_keys[1]]
snapshot_id = os.path.basename(second_fname).split('.')[0]
# Pre-write a summary file
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
with open(summary_path, 'w') as f:
f.write('already done')
from changedetectionio.flask_app import llm_summary_q
client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
follow_redirects=True,
)
# That entry should have been skipped — queue should be empty
assert llm_summary_q.empty()
def test_404_for_unknown_watch(self, client, live_server, datastore_path):
res = client.get(
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid='does-not-exist'),
follow_redirects=False,
)
assert res.status_code == 404

View File

@@ -518,8 +518,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
# (cleanup may delete these variables, but plugins need the original references)
finalize_handler = update_handler # Capture now, before cleanup deletes it
finalize_watch = watch # Capture now, before any modifications
finalize_changed_detected = locals().get('changed_detected', False)
finalize_snapshot_id = (locals().get('update_obj') or {}).get('previous_md5') or ''
# Call quit() as backup (Puppeteer/Playwright have internal cleanup, but this acts as safety net)
try:
@@ -560,9 +558,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
update_handler=finalize_handler,
watch=finalize_watch,
datastore=datastore,
processing_exception=processing_exception,
changed_detected=finalize_changed_detected,
snapshot_id=finalize_snapshot_id,
processing_exception=processing_exception
)
except Exception as finalize_error:
logger.error(f"Worker {worker_id} error in finalize hook: {finalize_error}")

View File

@@ -97,8 +97,12 @@ pytest ~=9.0
pytest-flask ~=1.3
pytest-mock ~=3.15
# Anything 4.0 and up but not 5.0
jsonschema ~= 4.26
# OpenAPI validation support
openapi-core[flask] ~= 0.22
openapi-core[flask] >= 0.19.0
loguru
@@ -120,7 +124,8 @@ greenlet >= 3.0.3
# Default SOCKETIO_MODE=threading is recommended for better compatibility
gevent
referencing # Don't pin — jsonschema-path (required by openapi-core>=0.18) caps referencing<0.37.0, so pinning 0.37.0 forces openapi-core back to 0.17.2. Revisit once jsonschema-path>=0.3.5 relaxes the cap.
# Previously pinned for flask_expects_json (removed 2026-02). Unpinning for now.
referencing
# For conditions
panzi-json-logic
@@ -151,10 +156,3 @@ blinker
pytest-xdist
litellm
# pydantic-core >=2.41 imports typing_extensions.Sentinel, which is absent in the
# system-installed typing_extensions on many Linux distros (e.g. Ubuntu 22/24).
# When the system path leaks into sys.path before the venv, the system copy is
# cached first and the import fails at runtime inside the LLM worker thread.
pydantic-core<2.41
pydantic<2.12