mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-23 22:56:17 +00:00
Compare commits
1 Commits
llm
...
flask-sock
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9009d0a906 |
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
# Semver means never use .01, or 00. Should be .1.
|
||||
__version__ = '0.53.7'
|
||||
__version__ = '0.53.6'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -45,7 +45,6 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
extra_notification_tokens=datastore.get_unique_notification_tokens_available()
|
||||
)
|
||||
|
||||
|
||||
# Remove the last option 'System default'
|
||||
form.application.form.notification_format.choices.pop()
|
||||
|
||||
@@ -130,12 +129,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
# Instantiate plugin form with POST data
|
||||
plugin_form = form_class(formdata=request.form)
|
||||
|
||||
# Save plugin settings — use plugin's own save_fn if provided
|
||||
# (allows plugins to strip ephemeral staging fields etc.)
|
||||
save_fn = tab.get('save_fn')
|
||||
if save_fn:
|
||||
save_fn(datastore, plugin_form)
|
||||
elif plugin_form.data:
|
||||
# Save plugin settings (validation is optional for plugins)
|
||||
if plugin_form.data:
|
||||
save_plugin_settings(datastore.datastore_path, plugin_id, plugin_form.data)
|
||||
|
||||
flash(gettext("Settings updated."))
|
||||
|
||||
@@ -27,7 +27,6 @@
|
||||
<li class="tab"><a href="#rss">{{ _('RSS') }}</a></li>
|
||||
<li class="tab"><a href="{{ url_for('backups.create') }}">{{ _('Backups') }}</a></li>
|
||||
<li class="tab"><a href="#timedate">{{ _('Time & Date') }}</a></li>
|
||||
|
||||
<li class="tab"><a href="#proxies">{{ _('CAPTCHA & Proxies') }}</a></li>
|
||||
{% if plugin_tabs %}
|
||||
{% for tab in plugin_tabs %}
|
||||
@@ -309,7 +308,6 @@ nav
|
||||
</div>
|
||||
|
||||
</div>
|
||||
|
||||
<div class="tab-pane-inner" id="proxies">
|
||||
<div id="recommended-proxy">
|
||||
<div>
|
||||
|
||||
@@ -116,11 +116,11 @@ def _handle_operations(op, uuids, datastore, worker_pool, update_q, queuedWatchM
|
||||
for uuid in uuids:
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=None):
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_pool, queuedWatchMetaData, watch_check_update):
|
||||
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
|
||||
|
||||
# Register the edit blueprint
|
||||
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData, llm_summary_q=llm_summary_q)
|
||||
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData)
|
||||
ui_blueprint.register_blueprint(edit_blueprint)
|
||||
|
||||
# Register the notification blueprint
|
||||
|
||||
@@ -11,7 +11,7 @@ from changedetectionio.auth_decorator import login_optionally_required
|
||||
from changedetectionio.time_handler import is_within_schedule
|
||||
from changedetectionio import worker_pool
|
||||
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData, llm_summary_q=None):
|
||||
def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMetaData):
|
||||
edit_blueprint = Blueprint('ui_edit', __name__, template_folder="../ui/templates")
|
||||
|
||||
def _watch_has_tag_options_set(watch):
|
||||
@@ -404,47 +404,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
download_name=filename,
|
||||
mimetype='application/zip')
|
||||
|
||||
@edit_blueprint.route("/edit/<string:uuid>/regenerate-llm-summaries", methods=['GET'])
|
||||
@login_optionally_required
|
||||
def watch_regenerate_llm_summaries(uuid):
|
||||
"""Queue LLM summary generation for all history entries that don't yet have one."""
|
||||
from flask import flash
|
||||
from changedetectionio.llm.tokens import is_llm_data_ready
|
||||
watch = datastore.data['watching'].get(uuid)
|
||||
if not watch:
|
||||
abort(404)
|
||||
|
||||
if not llm_summary_q:
|
||||
flash(gettext("LLM summarisation is not configured."), 'error')
|
||||
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid))
|
||||
|
||||
history = watch.history
|
||||
history_keys = list(history.keys())
|
||||
|
||||
queued = 0
|
||||
# Skip the first entry — there is no prior snapshot to diff against
|
||||
for timestamp in history_keys[1:]:
|
||||
snapshot_fname = history[timestamp]
|
||||
snapshot_id = os.path.basename(snapshot_fname).split('.')[0] # always 32-char MD5
|
||||
|
||||
# Skip entries that already have a summary
|
||||
if is_llm_data_ready(watch.data_dir, snapshot_id):
|
||||
continue
|
||||
|
||||
llm_summary_q.put({
|
||||
'uuid': uuid,
|
||||
'snapshot_id': snapshot_id,
|
||||
'attempts': 0,
|
||||
})
|
||||
queued += 1
|
||||
|
||||
if queued:
|
||||
flash(gettext("Queued %(count)d LLM summaries for generation.", count=queued), 'success')
|
||||
else:
|
||||
flash(gettext("All history entries already have LLM summaries."), 'notice')
|
||||
|
||||
return redirect(url_for('ui.ui_edit.edit_page', uuid=uuid) + '#info')
|
||||
|
||||
# Ajax callback
|
||||
@edit_blueprint.route("/edit/<string:uuid>/preview-rendered", methods=['POST'])
|
||||
@login_optionally_required
|
||||
|
||||
@@ -489,9 +489,6 @@ Math: {{ 1 + 1 }}") }}
|
||||
<p>
|
||||
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
|
||||
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Download watch data package') }}</a>
|
||||
{% if watch.history_n > 1 %}
|
||||
<a href="{{url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid)}}" class="pure-button button-small">{{ _('Regenerate LLM summaries') }}</a>
|
||||
{% endif %}
|
||||
</p>
|
||||
{% endif %}
|
||||
|
||||
|
||||
@@ -15,7 +15,6 @@ from changedetectionio.strtobool import strtobool
|
||||
from threading import Event
|
||||
from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue
|
||||
from changedetectionio import worker_pool
|
||||
import changedetectionio.llm as llm
|
||||
|
||||
from flask import (
|
||||
Flask,
|
||||
@@ -57,7 +56,6 @@ extra_stylesheets = []
|
||||
# Use bulletproof janus-based queues for sync/async reliability
|
||||
update_q = RecheckPriorityQueue()
|
||||
notification_q = NotificationQueue()
|
||||
llm_summary_q = llm.create_queue()
|
||||
MAX_QUEUE_SIZE = 5000
|
||||
|
||||
app = Flask(__name__,
|
||||
@@ -853,7 +851,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
# watchlist UI buttons etc
|
||||
import changedetectionio.blueprint.ui as ui
|
||||
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update, llm_summary_q=llm_summary_q))
|
||||
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_pool, queuedWatchMetaData, watch_check_update))
|
||||
|
||||
import changedetectionio.blueprint.watchlist as watchlist
|
||||
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')
|
||||
@@ -976,17 +974,6 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
).start()
|
||||
logger.info(f"Started {notification_workers} notification worker(s)")
|
||||
|
||||
llm.start_workers(app=app, datastore=datastore, llm_q=llm_summary_q,
|
||||
n_workers=int(os.getenv("LLM_WORKERS", "1")))
|
||||
|
||||
# Register the LLM queue plugin so changes trigger summary jobs
|
||||
from changedetectionio.llm.plugin import LLMQueuePlugin
|
||||
from changedetectionio.pluggy_interface import plugin_manager
|
||||
plugin_manager.register(LLMQueuePlugin(llm_summary_q), 'llm_queue_plugin')
|
||||
|
||||
# Re-run template path configuration now that all plugins (including LLM) are registered
|
||||
_configure_plugin_templates()
|
||||
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest:
|
||||
@@ -1041,65 +1028,19 @@ def notification_runner(worker_id=0):
|
||||
|
||||
else:
|
||||
|
||||
# ── LLM deferred-send gate ─────────────────────────────────────────
|
||||
# If the notification was re-queued to wait for LLM data, honour the
|
||||
# scheduled retry time before doing any further processing.
|
||||
_llm_next_retry = n_object.get('_llm_next_retry_at', 0)
|
||||
if _llm_next_retry and _llm_next_retry > time.time():
|
||||
notification_q.put(n_object)
|
||||
app.config.exit.wait(min(_llm_next_retry - time.time(), 2))
|
||||
continue
|
||||
|
||||
# Apply system-config fallbacks first so we can scan the final body/title.
|
||||
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
|
||||
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
|
||||
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
|
||||
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
|
||||
|
||||
# If the body or title references llm_* tokens, wait until LLM data is ready.
|
||||
import re as _re
|
||||
_llm_scan = (n_object.get('notification_body') or '') + ' ' + (n_object.get('notification_title') or '')
|
||||
if _re.search(r'\bllm_(?:summary|headline|importance|sentiment|one_liner)\b', _llm_scan):
|
||||
from changedetectionio.llm.tokens import (
|
||||
is_llm_data_ready, read_llm_tokens,
|
||||
LLM_NOTIFICATION_RETRY_DELAY_SECONDS, LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS,
|
||||
)
|
||||
_llm_uuid = n_object.get('uuid')
|
||||
_llm_watch = datastore.data['watching'].get(_llm_uuid) if _llm_uuid else None
|
||||
_llm_snap_id = n_object.get('_llm_snapshot_id')
|
||||
|
||||
if _llm_watch and _llm_snap_id and not is_llm_data_ready(_llm_watch.data_dir, _llm_snap_id):
|
||||
_llm_attempts = n_object.get('_llm_wait_attempts', 0)
|
||||
if _llm_attempts < LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS:
|
||||
n_object['_llm_wait_attempts'] = _llm_attempts + 1
|
||||
n_object['_llm_next_retry_at'] = time.time() + LLM_NOTIFICATION_RETRY_DELAY_SECONDS
|
||||
notification_q.put(n_object)
|
||||
logger.debug(
|
||||
f"Notification gate: LLM data pending for {_llm_uuid} "
|
||||
f"(attempt {n_object['_llm_wait_attempts']}/{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS})"
|
||||
)
|
||||
continue
|
||||
else:
|
||||
logger.warning(
|
||||
f"Notification: LLM data never arrived for {_llm_uuid} after "
|
||||
f"{LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS} attempts — sending without LLM tokens"
|
||||
)
|
||||
elif _llm_watch and _llm_snap_id:
|
||||
# Data is ready — populate the LLM tokens into n_object
|
||||
_llm_data = read_llm_tokens(_llm_watch.data_dir, _llm_snap_id)
|
||||
n_object['llm_summary'] = _llm_data.get('summary', '')
|
||||
n_object['llm_headline'] = _llm_data.get('headline', '')
|
||||
n_object['llm_importance'] = _llm_data.get('importance')
|
||||
n_object['llm_sentiment'] = _llm_data.get('sentiment', '')
|
||||
n_object['llm_one_liner'] = _llm_data.get('one_liner', '')
|
||||
# ── end LLM gate ───────────────────────────────────────────────────
|
||||
|
||||
now = datetime.now()
|
||||
sent_obj = None
|
||||
|
||||
try:
|
||||
from changedetectionio.notification.handler import process_notification
|
||||
|
||||
# Fallback to system config if not set
|
||||
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
|
||||
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
|
||||
|
||||
if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'):
|
||||
n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title')
|
||||
|
||||
if not n_object.get('notification_format') and datastore.data['settings']['application'].get('notification_format'):
|
||||
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
|
||||
if n_object.get('notification_urls', {}):
|
||||
|
||||
@@ -1,64 +0,0 @@
|
||||
"""
|
||||
changedetectionio.llm
|
||||
~~~~~~~~~~~~~~~~~~~~~
|
||||
|
||||
LLM summary queue and workers.
|
||||
|
||||
Usage in flask_app.py
|
||||
---------------------
|
||||
|
||||
import changedetectionio.llm as llm
|
||||
|
||||
# At module level alongside notification_q:
|
||||
llm_summary_q = llm.create_queue()
|
||||
|
||||
# Inside changedetection_app(), after datastore is ready:
|
||||
llm.start_workers(
|
||||
app=app,
|
||||
datastore=datastore,
|
||||
llm_q=llm_summary_q,
|
||||
n_workers=int(os.getenv("LLM_WORKERS", "1")),
|
||||
)
|
||||
|
||||
Enqueueing a summary job (e.g. from the pluggy update_finalize hook)
|
||||
---------------------------------------------------------------------
|
||||
|
||||
if changed_detected and not processing_exception:
|
||||
llm_summary_q.put({
|
||||
'uuid': watch_uuid,
|
||||
'snapshot_id': snapshot_id,
|
||||
'attempts': 0,
|
||||
})
|
||||
"""
|
||||
|
||||
import queue
|
||||
import threading
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def create_queue() -> queue.Queue:
|
||||
"""Return a plain Queue for LLM summary jobs. No maxsize — jobs are small dicts."""
|
||||
return queue.Queue()
|
||||
|
||||
|
||||
def start_workers(app, datastore, llm_q: queue.Queue, n_workers: int = 1) -> None:
|
||||
"""
|
||||
Start N LLM summary worker threads.
|
||||
|
||||
Args:
|
||||
app: Flask application instance (for app_context and exit event)
|
||||
datastore: Application datastore
|
||||
llm_q: Queue returned by create_queue()
|
||||
n_workers: Number of parallel workers (default 1; increase for local Ollama)
|
||||
"""
|
||||
from changedetectionio.llm.queue_worker import llm_summary_runner
|
||||
|
||||
for i in range(n_workers):
|
||||
threading.Thread(
|
||||
target=llm_summary_runner,
|
||||
args=(i, app, datastore, llm_q),
|
||||
daemon=True,
|
||||
name=f"LLMSummaryWorker-{i}",
|
||||
).start()
|
||||
|
||||
logger.info(f"Started {n_workers} LLM summary worker(s)")
|
||||
@@ -1,104 +0,0 @@
|
||||
"""
|
||||
LLM plugin — provides settings tab and enqueues summary jobs on change detection.
|
||||
|
||||
Registered with the pluggy plugin manager at startup (flask_app.py).
|
||||
The worker (llm/queue_worker.py) drains the queue asynchronously.
|
||||
"""
|
||||
from loguru import logger
|
||||
from changedetectionio.pluggy_interface import hookimpl
|
||||
|
||||
|
||||
def get_llm_settings(datastore):
|
||||
"""Load LLM plugin settings with fallback to legacy datastore settings.
|
||||
|
||||
Tries the plugin settings file (llm.json) first.
|
||||
Falls back to the old storage location in datastore.data['settings']['application']
|
||||
for users upgrading from a version before LLM became a first-class plugin.
|
||||
"""
|
||||
from changedetectionio.pluggy_interface import load_plugin_settings
|
||||
settings = load_plugin_settings(datastore.datastore_path, 'llm')
|
||||
|
||||
if settings.get('llm_connection') is not None:
|
||||
return settings
|
||||
|
||||
# Legacy fallback: settings were stored in datastore application settings
|
||||
app_settings = datastore.data['settings']['application']
|
||||
connections_dict = app_settings.get('llm_connections') or {}
|
||||
connections_list = [
|
||||
{
|
||||
'connection_id': k,
|
||||
'name': v.get('name', ''),
|
||||
'model': v.get('model', ''),
|
||||
'api_key': v.get('api_key', ''),
|
||||
'api_base': v.get('api_base', ''),
|
||||
'tokens_per_minute': int(v.get('tokens_per_minute', 0) or 0),
|
||||
'is_default': bool(v.get('is_default', False)),
|
||||
}
|
||||
for k, v in connections_dict.items()
|
||||
]
|
||||
|
||||
return {
|
||||
'llm_connection': connections_list,
|
||||
'llm_summary_prompt': app_settings.get('llm_summary_prompt', ''),
|
||||
}
|
||||
|
||||
|
||||
def save_llm_settings(datastore, plugin_form):
|
||||
"""Custom save handler — strips the ephemeral new_connection staging fields
|
||||
so they are never persisted to llm.json."""
|
||||
from changedetectionio.pluggy_interface import save_plugin_settings
|
||||
data = {
|
||||
'llm_connection': plugin_form.llm_connection.data,
|
||||
'llm_summary_prompt': plugin_form.llm_summary_prompt.data or '',
|
||||
'llm_diff_context_lines': plugin_form.llm_diff_context_lines.data or 2,
|
||||
}
|
||||
save_plugin_settings(datastore.datastore_path, 'llm', data)
|
||||
|
||||
|
||||
class LLMQueuePlugin:
|
||||
"""Enqueues LLM summary jobs on successful change detection and provides settings tab."""
|
||||
|
||||
def __init__(self, llm_q):
|
||||
self.llm_q = llm_q
|
||||
|
||||
@hookimpl
|
||||
def plugin_settings_tab(self):
|
||||
from changedetectionio.llm.settings_form import LLMSettingsForm
|
||||
return {
|
||||
'plugin_id': 'llm',
|
||||
'tab_label': 'LLM',
|
||||
'form_class': LLMSettingsForm,
|
||||
'template_path': 'settings-llm.html',
|
||||
'save_fn': save_llm_settings,
|
||||
}
|
||||
|
||||
@hookimpl
|
||||
def update_finalize(self, update_handler, watch, datastore, processing_exception,
|
||||
changed_detected=False, snapshot_id=None):
|
||||
"""Queue an LLM summary job when a change was successfully detected."""
|
||||
|
||||
if not changed_detected or processing_exception or not snapshot_id:
|
||||
return
|
||||
|
||||
if watch is None:
|
||||
return
|
||||
|
||||
# Need ≥2 history entries — first entry has nothing to diff against
|
||||
if watch.history_n < 2:
|
||||
return
|
||||
|
||||
# Only queue when at least one LLM connection is configured
|
||||
llm_settings = get_llm_settings(datastore)
|
||||
has_connection = bool(
|
||||
llm_settings.get('llm_connection')
|
||||
or datastore.data['settings']['application'].get('llm_api_key') # legacy
|
||||
or datastore.data['settings']['application'].get('llm_model') # legacy
|
||||
or watch.get('llm_api_key')
|
||||
or watch.get('llm_model')
|
||||
)
|
||||
if not has_connection:
|
||||
return
|
||||
|
||||
uuid = watch.get('uuid')
|
||||
self.llm_q.put({'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0})
|
||||
logger.debug(f"LLM: queued summary for uuid={uuid} snapshot={snapshot_id}")
|
||||
@@ -1,544 +0,0 @@
|
||||
import fcntl
|
||||
import os
|
||||
import queue
|
||||
import re
|
||||
import threading
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from loguru import logger
|
||||
|
||||
from changedetectionio.llm.tokens import (
|
||||
STRUCTURED_OUTPUT_INSTRUCTION,
|
||||
parse_llm_response,
|
||||
write_llm_data,
|
||||
)
|
||||
|
||||
MAX_RETRIES = 5
|
||||
RETRY_BACKOFF_BASE_SECONDS = 60 # 1m, 2m, 4m, 8m, 16m
|
||||
|
||||
# Token thresholds that control which summarisation strategy is used.
|
||||
# Small diffs: single-pass summarise.
|
||||
# Larger diffs: two-pass (enumerate all changes first, then compress).
|
||||
# Very large diffs: map-reduce (chunk → enumerate per chunk → final synthesis).
|
||||
TOKEN_SINGLE_PASS_THRESHOLD = 5000 # below this: one call
|
||||
TOKEN_TWO_PASS_THRESHOLD = 15000 # below this: enumerate then summarise
|
||||
TOKEN_CHUNK_SIZE = 5000 # tokens per map-reduce chunk
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Proactive token-bucket rate limiter — shared across all workers in process
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class _RateLimitWait(Exception):
|
||||
"""Raised when the bucket is empty; worker re-queues without incrementing attempts."""
|
||||
def __init__(self, wait_seconds):
|
||||
self.wait_seconds = wait_seconds
|
||||
super().__init__(f"Rate limit: wait {wait_seconds:.1f}s")
|
||||
|
||||
|
||||
class _TokenBucket:
|
||||
"""Thread-safe continuous token bucket. tpm=0 means unlimited."""
|
||||
|
||||
def __init__(self, tpm):
|
||||
self._lock = threading.Lock()
|
||||
self._tpm = tpm
|
||||
self._tokens = float(tpm) # start full
|
||||
self._last_ts = time.monotonic()
|
||||
|
||||
def try_consume(self, n):
|
||||
"""Consume n tokens. Returns (True, 0.0) on success or (False, wait_secs) if dry."""
|
||||
if self._tpm == 0:
|
||||
return True, 0.0
|
||||
with self._lock:
|
||||
now = time.monotonic()
|
||||
elapsed = now - self._last_ts
|
||||
self._tokens = min(self._tpm, self._tokens + elapsed * (self._tpm / 60.0))
|
||||
self._last_ts = now
|
||||
if self._tokens >= n:
|
||||
self._tokens -= n
|
||||
return True, 0.0
|
||||
deficit = n - self._tokens
|
||||
return False, deficit / (self._tpm / 60.0)
|
||||
|
||||
|
||||
_rate_buckets = {}
|
||||
_rate_buckets_lock = threading.Lock()
|
||||
|
||||
|
||||
def _get_rate_bucket(conn_id, tpm):
|
||||
"""Return (or lazily create) the shared _TokenBucket for this connection."""
|
||||
with _rate_buckets_lock:
|
||||
if conn_id not in _rate_buckets:
|
||||
_rate_buckets[conn_id] = _TokenBucket(int(tpm or 0))
|
||||
return _rate_buckets[conn_id]
|
||||
|
||||
|
||||
def _parse_retry_after(exc):
|
||||
"""Extract a retry-after delay (seconds) from a litellm RateLimitError."""
|
||||
if hasattr(exc, 'retry_after') and exc.retry_after:
|
||||
try:
|
||||
return float(exc.retry_after)
|
||||
except (TypeError, ValueError):
|
||||
pass
|
||||
m = re.search(r'(?:try again in|retry after)\s*([\d.]+)\s*s', str(exc), re.IGNORECASE)
|
||||
return float(m.group(1)) + 1.0 if m else 60.0
|
||||
|
||||
|
||||
def _read_snapshot(watch, snapshot_fname):
|
||||
"""Read a snapshot file from disk, handling plain text and brotli compression."""
|
||||
path = os.path.join(watch.data_dir, snapshot_fname)
|
||||
if snapshot_fname.endswith('.br'):
|
||||
import brotli
|
||||
with open(path, 'rb') as f:
|
||||
return brotli.decompress(f.read()).decode('utf-8', errors='replace')
|
||||
else:
|
||||
with open(path, 'r', encoding='utf-8', errors='replace') as f:
|
||||
return f.read()
|
||||
|
||||
|
||||
def _append_llm_log(log_path, model, sent_tokens, recv_tokens, elapsed_ms):
|
||||
"""Append one line to the datastore-level LLM activity log.
|
||||
|
||||
Line format (tab-separated, LF terminated):
|
||||
ISO-8601-UTC fetched LLM via <model> sent=<N> recv=<N> ms=<N>
|
||||
|
||||
The file is flock-locked for the duration of the write so concurrent
|
||||
workers don't interleave lines.
|
||||
"""
|
||||
ts = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] # ms precision
|
||||
line = f"{ts}\tfetched LLM via {model}\tsent={sent_tokens}\trecv={recv_tokens}\tms={elapsed_ms}\n"
|
||||
try:
|
||||
with open(log_path, 'a', encoding='utf-8', newline='\n') as f:
|
||||
fcntl.flock(f, fcntl.LOCK_EX)
|
||||
try:
|
||||
f.write(line)
|
||||
f.flush()
|
||||
finally:
|
||||
fcntl.flock(f, fcntl.LOCK_UN)
|
||||
except Exception as exc:
|
||||
logger.warning(f"LLM log write failed: {exc}")
|
||||
|
||||
|
||||
def _call_llm(model, messages, api_key=None, api_base=None, max_tokens=600, conn_id=None, tpm=0, log_path=None):
|
||||
"""
|
||||
Thin wrapper around litellm.completion.
|
||||
Isolated as a named function so tests can mock.patch it without importing litellm.
|
||||
|
||||
Determinism settings
|
||||
--------------------
|
||||
temperature=0 — greedy decoding; same input produces the same output consistently.
|
||||
seed=0 — passed through to providers that support it (OpenAI, some others)
|
||||
for near-bit-identical reproducibility across calls.
|
||||
|
||||
Deliberately NOT set
|
||||
--------------------
|
||||
top_p — redundant at temperature=0 and can interact badly with some providers.
|
||||
frequency_penalty / presence_penalty — would penalise the model for repeating specific
|
||||
values (e.g. "$10 → $10") which is exactly wrong for change detection.
|
||||
|
||||
max_tokens — caller sets this based on the pass type:
|
||||
enumerate pass needs more room than the final summary pass.
|
||||
|
||||
conn_id / tpm — optional rate limiting; when both are set, a proactive token-bucket
|
||||
check is performed before calling the API. Raises _RateLimitWait if
|
||||
the bucket is empty so the worker can re-queue without retrying.
|
||||
|
||||
log_path — when set, each call is appended to the datastore LLM activity log.
|
||||
|
||||
Returns the response text string.
|
||||
"""
|
||||
import litellm
|
||||
|
||||
# Proactive rate check (skipped when tpm=0 or conn_id is None)
|
||||
if conn_id and tpm:
|
||||
prompt_tokens = litellm.token_counter(model=model, messages=messages)
|
||||
total_est = prompt_tokens + max_tokens
|
||||
bucket = _get_rate_bucket(conn_id, tpm)
|
||||
ok, wait = bucket.try_consume(total_est)
|
||||
if not ok:
|
||||
raise _RateLimitWait(wait)
|
||||
|
||||
kwargs = dict(
|
||||
model=model,
|
||||
messages=messages,
|
||||
temperature=0,
|
||||
seed=0,
|
||||
max_tokens=max_tokens,
|
||||
)
|
||||
if api_key:
|
||||
kwargs['api_key'] = api_key
|
||||
if api_base:
|
||||
kwargs['api_base'] = api_base
|
||||
|
||||
t0 = time.monotonic()
|
||||
response = litellm.completion(**kwargs)
|
||||
elapsed_ms = round((time.monotonic() - t0) * 1000)
|
||||
|
||||
if log_path:
|
||||
usage = getattr(response, 'usage', None)
|
||||
sent_tok = getattr(usage, 'prompt_tokens', 0) or 0
|
||||
recv_tok = getattr(usage, 'completion_tokens', 0) or 0
|
||||
_append_llm_log(log_path, model, sent_tok, recv_tok, elapsed_ms)
|
||||
|
||||
return response.choices[0].message.content.strip()
|
||||
|
||||
|
||||
|
||||
def _resolve_llm_connection(watch, datastore):
|
||||
"""Return (model, api_key, api_base, conn_id, tpm) for the given watch.
|
||||
|
||||
Resolution order:
|
||||
1. Watch-level connection_id pointing to a named entry in plugin settings.
|
||||
2. The default entry in plugin settings (is_default=True).
|
||||
3. Legacy flat fields on the watch or in global settings — backward compat.
|
||||
4. Hard-coded fallback: gpt-4o-mini with no key / base.
|
||||
"""
|
||||
from changedetectionio.llm.plugin import get_llm_settings
|
||||
from changedetectionio.llm.settings_form import sanitised_conn_id
|
||||
|
||||
llm_settings = get_llm_settings(datastore)
|
||||
connections = llm_settings.get('llm_connection') or []
|
||||
|
||||
# 1. Watch-level override by explicit connection_id
|
||||
watch_conn_id = watch.get('llm_connection_id')
|
||||
if watch_conn_id:
|
||||
for c in connections:
|
||||
if c.get('connection_id') == watch_conn_id:
|
||||
cid = sanitised_conn_id(c.get('connection_id', ''))
|
||||
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
|
||||
cid, int(c.get('tokens_per_minute', 0) or 0))
|
||||
|
||||
# 2. Global default connection
|
||||
for c in connections:
|
||||
if c.get('is_default'):
|
||||
cid = sanitised_conn_id(c.get('connection_id', ''))
|
||||
return (c.get('model', 'gpt-4o-mini'), c.get('api_key', ''), c.get('api_base', ''),
|
||||
cid, int(c.get('tokens_per_minute', 0) or 0))
|
||||
|
||||
# 3. Legacy flat fields (backward compat)
|
||||
app_settings = datastore.data['settings']['application']
|
||||
model = watch.get('llm_model') or app_settings.get('llm_model', 'gpt-4o-mini')
|
||||
api_key = watch.get('llm_api_key') or app_settings.get('llm_api_key', '')
|
||||
api_base = watch.get('llm_api_base') or app_settings.get('llm_api_base', '')
|
||||
return model, api_key, api_base, 'legacy', 0
|
||||
|
||||
|
||||
SYSTEM_PROMPT = (
|
||||
'You are a change detection assistant. '
|
||||
'Be precise and factual. Never speculate. '
|
||||
'Always use exact numbers, values, and quoted text when present in the diff. '
|
||||
'If nothing meaningful changed, say so explicitly.'
|
||||
)
|
||||
|
||||
|
||||
def _build_context_header(watch, datastore):
|
||||
"""Return a short multi-line string describing what this watch monitors.
|
||||
|
||||
Included lines (only when non-empty / non-redundant):
|
||||
URL: <url>
|
||||
Monitor: <user title or fetched page title> (omitted when same as URL)
|
||||
Tags: <comma-separated tag titles> (omitted when none)
|
||||
"""
|
||||
url = watch.get('url', '')
|
||||
title = watch.get('title', '') or watch.get('page_title', '')
|
||||
|
||||
lines = [f"URL: {url}"]
|
||||
if title and title != url:
|
||||
lines.append(f"Monitor: {title}")
|
||||
|
||||
tag_titles = []
|
||||
for tag_uuid in (watch.get('tags') or []):
|
||||
tag = datastore.data['settings']['application'].get('tags', {}).get(tag_uuid, {})
|
||||
t = tag.get('title', '').strip()
|
||||
if t:
|
||||
tag_titles.append(t)
|
||||
if tag_titles:
|
||||
lines.append(f"Tags: {', '.join(tag_titles)}")
|
||||
|
||||
return '\n'.join(lines)
|
||||
|
||||
|
||||
def _chunk_lines(lines, model, chunk_token_size):
|
||||
"""Split lines into chunks that each fit within chunk_token_size tokens."""
|
||||
import litellm
|
||||
chunks, current, current_tokens = [], [], 0
|
||||
for line in lines:
|
||||
line_tokens = litellm.token_counter(model=model, text=line)
|
||||
if current and current_tokens + line_tokens > chunk_token_size:
|
||||
chunks.append('\n'.join(current))
|
||||
current, current_tokens = [], 0
|
||||
current.append(line)
|
||||
current_tokens += line_tokens
|
||||
if current:
|
||||
chunks.append('\n'.join(current))
|
||||
return chunks
|
||||
|
||||
|
||||
def _enumerate_changes(diff_text, context_header, model, llm_kwargs):
|
||||
"""
|
||||
Pass 1 — ask the model to list every distinct change exhaustively, one per line.
|
||||
Returns a plain-text list string.
|
||||
This avoids compression decisions: the model just lists, it does not prioritise.
|
||||
"""
|
||||
messages = [
|
||||
{'role': 'system', 'content': SYSTEM_PROMPT},
|
||||
{
|
||||
'role': 'user',
|
||||
'content': (
|
||||
f"{context_header}\n"
|
||||
f"Diff:\n{diff_text}\n\n"
|
||||
"List every distinct change you see, one item per line. "
|
||||
"Be exhaustive — do not filter or prioritise. "
|
||||
"Use exact values from the diff (prices, dates, counts, quoted text)."
|
||||
),
|
||||
},
|
||||
]
|
||||
# Enumerate pass needs more output room than the final summary
|
||||
return _call_llm(model=model, messages=messages, max_tokens=1200, **llm_kwargs)
|
||||
|
||||
|
||||
def _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction=None):
|
||||
"""
|
||||
Pass 2 — compress the exhaustive enumeration into the final output.
|
||||
Operates on a small, structured input so nothing is lost that wasn't already listed.
|
||||
summary_instruction overrides the default STRUCTURED_OUTPUT_INSTRUCTION when set.
|
||||
"""
|
||||
instruction = summary_instruction or (
|
||||
"Now produce the final structured output for all of these changes.\n\n"
|
||||
+ STRUCTURED_OUTPUT_INSTRUCTION
|
||||
)
|
||||
messages = [
|
||||
{'role': 'system', 'content': SYSTEM_PROMPT},
|
||||
{
|
||||
'role': 'user',
|
||||
'content': (
|
||||
f"{context_header}\n"
|
||||
f"All changes detected:\n{enumerated}\n\n"
|
||||
+ instruction
|
||||
),
|
||||
},
|
||||
]
|
||||
return _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
|
||||
|
||||
|
||||
def process_llm_summary(item, datastore):
|
||||
"""
|
||||
Generate an LLM summary for a detected change and write {snapshot_id}-llm.txt.
|
||||
|
||||
item keys:
|
||||
uuid - watch UUID
|
||||
snapshot_id - the newer snapshot ID (md5 hex), maps to {snapshot_id}.txt[.br]
|
||||
attempts - retry counter
|
||||
|
||||
Summarisation strategy (chosen by diff token count):
|
||||
Small (< SINGLE_PASS_TOKEN_LIMIT): one call — enumerate + summarise together.
|
||||
Medium (< TWO_PASS_TOKEN_LIMIT): two calls — enumerate all changes, then compress.
|
||||
Large (≥ TWO_PASS_TOKEN_LIMIT): map-reduce — chunk → enumerate per chunk →
|
||||
synthesise chunk enumerations → final summary.
|
||||
|
||||
The two-pass / map-reduce approach prevents lossiness: temperature=0 causes the model
|
||||
to greedily commit to the most prominent change and drop the rest in a single pass.
|
||||
Enumerating first forces comprehensive coverage before any compression happens.
|
||||
|
||||
Split into _call_llm / _write_summary so each step is independently patchable in tests.
|
||||
"""
|
||||
import difflib
|
||||
import litellm
|
||||
|
||||
uuid = item['uuid']
|
||||
snapshot_id = item['snapshot_id']
|
||||
|
||||
watch = datastore.data['watching'].get(uuid)
|
||||
if not watch:
|
||||
raise ValueError(f"Watch {uuid} not found")
|
||||
|
||||
# Find this snapshot and the one before it in history
|
||||
history = watch.history
|
||||
history_keys = list(history.keys())
|
||||
|
||||
try:
|
||||
idx = next(
|
||||
i for i, k in enumerate(history_keys)
|
||||
if os.path.basename(history[k]).split('.')[0] == snapshot_id
|
||||
)
|
||||
except StopIteration:
|
||||
raise ValueError(f"snapshot_id {snapshot_id} not found in history for watch {uuid}")
|
||||
|
||||
if idx == 0:
|
||||
raise ValueError(f"snapshot_id {snapshot_id} is the first history entry — no prior to diff against")
|
||||
|
||||
before_text = _read_snapshot(watch, history[history_keys[idx - 1]])
|
||||
current_text = _read_snapshot(watch, history[history_keys[idx]])
|
||||
|
||||
# Resolve model / credentials via connections table (with legacy flat-field fallback)
|
||||
model, api_key, api_base, conn_id, tpm = _resolve_llm_connection(watch, datastore)
|
||||
url = watch.get('url', '')
|
||||
context_header = _build_context_header(watch, datastore)
|
||||
|
||||
llm_kwargs = {
|
||||
'log_path': os.path.join(datastore.datastore_path, 'llm-log.txt'),
|
||||
}
|
||||
if api_key:
|
||||
llm_kwargs['api_key'] = api_key
|
||||
if api_base:
|
||||
llm_kwargs['api_base'] = api_base
|
||||
if conn_id:
|
||||
llm_kwargs['conn_id'] = conn_id
|
||||
if tpm:
|
||||
llm_kwargs['tpm'] = tpm
|
||||
|
||||
# Use custom prompt / context-line setting if configured
|
||||
from changedetectionio.llm.plugin import get_llm_settings
|
||||
llm_settings = get_llm_settings(datastore)
|
||||
custom_prompt = (llm_settings.get('llm_summary_prompt') or '').strip()
|
||||
summary_instruction = custom_prompt if custom_prompt else (
|
||||
"Analyse all changes in this diff.\n\n" + STRUCTURED_OUTPUT_INSTRUCTION
|
||||
)
|
||||
context_n = int(llm_settings.get('llm_diff_context_lines') or 2)
|
||||
|
||||
diff_lines = list(difflib.unified_diff(
|
||||
before_text.splitlines(),
|
||||
current_text.splitlines(),
|
||||
lineterm='',
|
||||
n=context_n,
|
||||
))
|
||||
diff_text = '\n'.join(diff_lines)
|
||||
|
||||
if not diff_text.strip():
|
||||
logger.debug(f"LLM: no diff content for {uuid}/{snapshot_id}, skipping")
|
||||
return
|
||||
|
||||
diff_tokens = litellm.token_counter(model=model, text=diff_text)
|
||||
logger.debug(f"LLM: diff is {diff_tokens} tokens for {uuid}/{snapshot_id}")
|
||||
|
||||
if diff_tokens < TOKEN_SINGLE_PASS_THRESHOLD:
|
||||
# Small diff — single call, model can see everything at once
|
||||
messages = [
|
||||
{'role': 'system', 'content': SYSTEM_PROMPT},
|
||||
{
|
||||
'role': 'user',
|
||||
'content': (
|
||||
f"{context_header}\n"
|
||||
f"Diff:\n{diff_text}\n\n"
|
||||
+ summary_instruction
|
||||
),
|
||||
},
|
||||
]
|
||||
raw = _call_llm(model=model, messages=messages, max_tokens=500, **llm_kwargs)
|
||||
strategy = 'single'
|
||||
|
||||
elif diff_tokens < TOKEN_TWO_PASS_THRESHOLD:
|
||||
# Medium diff — two-pass: enumerate exhaustively, then compress
|
||||
enumerated = _enumerate_changes(diff_text, context_header, model, llm_kwargs)
|
||||
raw = _summarise_enumeration(enumerated, context_header, model, llm_kwargs, summary_instruction)
|
||||
strategy = 'two-pass'
|
||||
|
||||
else:
|
||||
# Large diff — map-reduce: chunk → enumerate per chunk → synthesise
|
||||
chunks = _chunk_lines(diff_lines, model, TOKEN_CHUNK_SIZE)
|
||||
logger.debug(f"LLM: map-reduce over {len(chunks)} chunks for {uuid}/{snapshot_id}")
|
||||
|
||||
chunk_enumerations = []
|
||||
for i, chunk in enumerate(chunks):
|
||||
logger.debug(f"LLM: enumerating chunk {i+1}/{len(chunks)}")
|
||||
chunk_enumerations.append(
|
||||
_enumerate_changes(chunk, context_header, model, llm_kwargs)
|
||||
)
|
||||
|
||||
combined = '\n'.join(chunk_enumerations)
|
||||
raw = _summarise_enumeration(combined, context_header, model, llm_kwargs, summary_instruction)
|
||||
strategy = 'map-reduce'
|
||||
|
||||
llm_data = parse_llm_response(raw)
|
||||
write_llm_data(watch.data_dir, snapshot_id, llm_data)
|
||||
logger.info(f"LLM tokens written for {uuid}/{snapshot_id} (strategy: {strategy}, tokens: {diff_tokens})")
|
||||
|
||||
|
||||
def llm_summary_runner(worker_id, app, datastore, llm_q):
|
||||
"""
|
||||
Sync LLM summary worker — mirrors the notification_runner pattern.
|
||||
|
||||
One worker is the right default (LLM API rate limits constrain throughput
|
||||
more than parallelism helps). Increase via LLM_WORKERS env var if using
|
||||
a local Ollama endpoint with no rate limits.
|
||||
|
||||
Failed items are re-queued with exponential backoff (see MAX_RETRIES /
|
||||
RETRY_BACKOFF_BASE_SECONDS). After MAX_RETRIES the item is dropped and
|
||||
the failure is recorded on the watch.
|
||||
"""
|
||||
with app.app_context():
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
item = llm_q.get(block=False)
|
||||
except queue.Empty:
|
||||
app.config.exit.wait(1)
|
||||
continue
|
||||
|
||||
# Honour retry delay — if the item isn't due yet, put it back
|
||||
# and sleep briefly rather than spinning.
|
||||
next_retry_at = item.get('next_retry_at', 0)
|
||||
if next_retry_at > time.time():
|
||||
llm_q.put(item)
|
||||
app.config.exit.wait(min(next_retry_at - time.time(), 5))
|
||||
continue
|
||||
|
||||
uuid = item.get('uuid')
|
||||
snapshot_id = item.get('snapshot_id')
|
||||
attempts = item.get('attempts', 0)
|
||||
|
||||
logger.debug(f"LLM worker {worker_id} processing uuid={uuid} snapshot={snapshot_id} attempt={attempts}")
|
||||
|
||||
try:
|
||||
process_llm_summary(item, datastore)
|
||||
logger.info(f"LLM worker {worker_id} completed summary for uuid={uuid} snapshot={snapshot_id}")
|
||||
|
||||
except NotImplementedError:
|
||||
# Silently drop until the processor is implemented
|
||||
logger.debug(f"LLM worker {worker_id} skipping — processor not yet implemented")
|
||||
|
||||
except _RateLimitWait as rw:
|
||||
# Proactive bucket empty — re-queue without counting as a failure
|
||||
item['next_retry_at'] = time.time() + rw.wait_seconds
|
||||
llm_q.put(item)
|
||||
logger.info(
|
||||
f"LLM worker {worker_id} rate-limited (proactive) for {rw.wait_seconds:.1f}s "
|
||||
f"uuid={uuid}"
|
||||
)
|
||||
|
||||
except Exception as e:
|
||||
# Reactive: check if the API itself returned a rate-limit error
|
||||
try:
|
||||
import litellm as _litellm
|
||||
if isinstance(e, _litellm.RateLimitError):
|
||||
wait = _parse_retry_after(e)
|
||||
item['next_retry_at'] = time.time() + wait
|
||||
llm_q.put(item)
|
||||
logger.warning(
|
||||
f"LLM worker {worker_id} API rate limit for uuid={uuid}, "
|
||||
f"retry in {wait:.1f}s"
|
||||
)
|
||||
continue
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
logger.error(f"LLM worker {worker_id} error for uuid={uuid} snapshot={snapshot_id}: {e}")
|
||||
|
||||
if attempts < MAX_RETRIES:
|
||||
backoff = RETRY_BACKOFF_BASE_SECONDS * (2 ** attempts)
|
||||
item['attempts'] = attempts + 1
|
||||
item['next_retry_at'] = time.time() + backoff
|
||||
llm_q.put(item)
|
||||
logger.info(
|
||||
f"LLM worker {worker_id} re-queued uuid={uuid} "
|
||||
f"attempt={item['attempts']}/{MAX_RETRIES} retry_in={backoff}s"
|
||||
)
|
||||
else:
|
||||
logger.error(
|
||||
f"LLM worker {worker_id} gave up on uuid={uuid} snapshot={snapshot_id} "
|
||||
f"after {MAX_RETRIES} attempts"
|
||||
)
|
||||
if uuid and uuid in datastore.data['watching']:
|
||||
datastore.update_watch(
|
||||
uuid=uuid,
|
||||
update_obj={'last_error': f"LLM summary failed after {MAX_RETRIES} attempts: {e}"}
|
||||
)
|
||||
@@ -1,139 +0,0 @@
|
||||
import re
|
||||
import uuid as _uuid
|
||||
|
||||
from flask_babel import lazy_gettext as _l
|
||||
from wtforms import (
|
||||
BooleanField,
|
||||
FieldList,
|
||||
Form,
|
||||
FormField,
|
||||
HiddenField,
|
||||
IntegerField,
|
||||
PasswordField,
|
||||
SelectField,
|
||||
StringField,
|
||||
TextAreaField,
|
||||
)
|
||||
from wtforms.validators import Length, NumberRange, Optional
|
||||
|
||||
from changedetectionio.llm.tokens import STRUCTURED_OUTPUT_INSTRUCTION
|
||||
|
||||
# The built-in instruction appended after the diff — shown as placeholder text.
|
||||
DEFAULT_SUMMARY_PROMPT = (
|
||||
"Analyse all changes in this diff.\n\n"
|
||||
+ STRUCTURED_OUTPUT_INSTRUCTION
|
||||
)
|
||||
|
||||
# Allowed characters for a connection ID coming from the browser.
|
||||
_CONN_ID_RE = re.compile(r'^[a-zA-Z0-9_-]{1,64}$')
|
||||
|
||||
|
||||
def sanitised_conn_id(raw):
|
||||
"""Return raw if it looks like a safe identifier, otherwise a fresh UUID."""
|
||||
s = (raw or '').strip()
|
||||
return s if _CONN_ID_RE.match(s) else str(_uuid.uuid4())
|
||||
|
||||
|
||||
class LLMConnectionEntryForm(Form):
|
||||
"""Schema for a single LLM connection.
|
||||
|
||||
Declaring every field here is what prevents arbitrary key injection:
|
||||
only these fields can ever reach the datastore from this form.
|
||||
"""
|
||||
connection_id = HiddenField()
|
||||
name = StringField(_l('Name'), validators=[Optional(), Length(max=100)])
|
||||
model = StringField(_l('Model string'), validators=[Optional(), Length(max=200)])
|
||||
api_key = StringField(_l('API Key'), validators=[Optional(), Length(max=500)])
|
||||
api_base = StringField(_l('API Endpoint'), validators=[Optional(), Length(max=500)])
|
||||
tokens_per_minute = IntegerField(_l('Tokens/min'), validators=[Optional(), NumberRange(min=0, max=10_000_000)], default=0)
|
||||
is_default = BooleanField(_l('Default'), validators=[Optional()])
|
||||
|
||||
|
||||
class LLMNewConnectionForm(Form):
|
||||
"""Staging fields for the 'Add a connection' UI.
|
||||
|
||||
These are read client-side by llm.js to build a new FieldList entry on click.
|
||||
They are never used server-side — render_kw sets the id attributes llm.js
|
||||
looks up with $('#llm-add-name') etc.
|
||||
"""
|
||||
preset = SelectField(
|
||||
_l('Provider template'),
|
||||
validate_choice=False,
|
||||
# WTForms 3.x uses a dict for optgroups (has_groups() checks isinstance(choices, dict)).
|
||||
# An empty-string key renders as <optgroup label=""> which browsers treat as ungrouped.
|
||||
choices={
|
||||
'': [('', '')],
|
||||
_l('Cloud'): [
|
||||
('openai-mini', 'OpenAI — gpt-4o-mini'),
|
||||
('openai-4o', 'OpenAI — gpt-4o'),
|
||||
('anthropic-haiku', 'Anthropic — claude-3-haiku'),
|
||||
('anthropic-sonnet', 'Anthropic — claude-3-5-sonnet'),
|
||||
('groq-8b', 'Groq — llama-3.1-8b-instant'),
|
||||
('groq-70b', 'Groq — llama-3.3-70b-versatile'),
|
||||
('gemini-flash', 'Google — gemini-1.5-flash'),
|
||||
('mistral-small', 'Mistral — mistral-small'),
|
||||
('deepseek', 'DeepSeek — deepseek-chat'),
|
||||
('openrouter', 'OpenRouter (custom model)'),
|
||||
],
|
||||
_l('Local'): [
|
||||
('ollama-llama', 'Ollama — llama3.1'),
|
||||
('ollama-mistral', 'Ollama — mistral'),
|
||||
('lmstudio', 'LM Studio'),
|
||||
],
|
||||
_l('Custom'): [
|
||||
('custom', _l('Manual entry')),
|
||||
],
|
||||
},
|
||||
render_kw={'id': 'llm-preset'},
|
||||
)
|
||||
name = StringField(_l('Name'),
|
||||
render_kw={'id': 'llm-add-name', 'size': 30,
|
||||
'autocomplete': 'off'})
|
||||
model = StringField(_l('Model string'),
|
||||
render_kw={'id': 'llm-add-model', 'size': 40,
|
||||
'placeholder': 'gpt-4o-mini', 'autocomplete': 'off'})
|
||||
api_key = PasswordField(_l('API Key'),
|
||||
render_kw={'id': 'llm-add-key', 'size': 40,
|
||||
'placeholder': 'sk-…', 'autocomplete': 'off'})
|
||||
api_base = StringField(_l('API Endpoint'),
|
||||
render_kw={'id': 'llm-add-base', 'size': 40,
|
||||
'placeholder': 'http://localhost:11434', 'autocomplete': 'off'})
|
||||
tokens_per_minute = IntegerField(_l('Tokens/min'), default=0,
|
||||
render_kw={'id': 'llm-add-tpm', 'style': 'width: 8em;',
|
||||
'min': '0', 'step': '1000'})
|
||||
|
||||
|
||||
class LLMSettingsForm(Form):
|
||||
"""WTForms form for the LLM settings tab.
|
||||
|
||||
llm_connection is a FieldList of LLMConnectionEntryForm entries.
|
||||
llm.js emits individual hidden inputs (llm_connection-N-fieldname) on submit
|
||||
instead of a JSON blob, so WTForms processes them through the declared schema.
|
||||
"""
|
||||
llm_connection = FieldList(FormField(LLMConnectionEntryForm), min_entries=0)
|
||||
new_connection = FormField(LLMNewConnectionForm)
|
||||
|
||||
llm_diff_context_lines = IntegerField(
|
||||
_l('Diff context lines'),
|
||||
validators=[Optional(), NumberRange(min=0, max=20)],
|
||||
default=2,
|
||||
description=_l(
|
||||
'Number of unchanged lines shown around each change in the diff. '
|
||||
'More lines give the LLM more context but increase token usage. (default: 2)'
|
||||
),
|
||||
render_kw={'style': 'width: 5em;', 'min': '0', 'max': '20'},
|
||||
)
|
||||
|
||||
llm_summary_prompt = TextAreaField(
|
||||
_l('Summary prompt'),
|
||||
validators=[Optional()],
|
||||
description=_l(
|
||||
'Override the instruction sent to the LLM after the diff. '
|
||||
'Leave blank to use the built-in default (structured JSON output).'
|
||||
),
|
||||
render_kw={
|
||||
'rows': 8,
|
||||
'placeholder': DEFAULT_SUMMARY_PROMPT,
|
||||
'class': 'pure-input-1',
|
||||
},
|
||||
)
|
||||
@@ -1,103 +0,0 @@
|
||||
<script src="{{url_for('static_content', group='js', filename='llm.js')}}" defer></script>
|
||||
<script>
|
||||
var LLM_CONNECTIONS = (function () {
|
||||
var list = {{ plugin_form.llm_connection.data|tojson }};
|
||||
var out = {};
|
||||
(list || []).forEach(function (c) { if (c && c.connection_id) out[c.connection_id] = c; });
|
||||
return out;
|
||||
}());
|
||||
var LLM_I18N = {
|
||||
noConnections: '{{ _("No connections configured yet.") }}',
|
||||
setDefault: '{{ _("Set as default") }}',
|
||||
remove: '{{ _("Remove") }}',
|
||||
show: '{{ _("show") }}',
|
||||
hide: '{{ _("hide") }}',
|
||||
nameModelRequired: '{{ _("Name and Model string are required.") }}'
|
||||
};
|
||||
</script>
|
||||
|
||||
{# ── Configured connections table ──────────────────── #}
|
||||
<fieldset>
|
||||
<legend>{{ _('LLM Connections') }}</legend>
|
||||
|
||||
<table class="pure-table pure-table-horizontal llm-connections">
|
||||
<thead>
|
||||
<tr>
|
||||
<th class="llm-col-def" title="{{ _('Default') }}">{{ _('Default') }}</th>
|
||||
<th class="llm-col-name">{{ _('Name') }}</th>
|
||||
<th class="llm-col-model">{{ _('Model') }}</th>
|
||||
<th class="llm-col-key">{{ _('API Key') }}</th>
|
||||
<th class="llm-col-tpm" title="{{ _('Tokens per minute limit (0 = unlimited)') }}">{{ _('TPM') }}</th>
|
||||
<th class="llm-col-del"></th>
|
||||
</tr>
|
||||
</thead>
|
||||
<tbody id="llm-connections-tbody">
|
||||
</tbody>
|
||||
</table>
|
||||
</fieldset>
|
||||
|
||||
{# ── Add connection ─────────────────────────────────── #}
|
||||
{% set nf = plugin_form.new_connection.form %}
|
||||
<fieldset>
|
||||
<legend>{{ _('Add a connection') }}</legend>
|
||||
|
||||
<div class="pure-control-group">
|
||||
{{ nf.preset.label }}
|
||||
{{ nf.preset() }}
|
||||
</div>
|
||||
|
||||
<div class="pure-control-group">
|
||||
{{ nf.name.label }}
|
||||
{{ nf.name(placeholder=_('e.g. My OpenAI')) }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ nf.model.label }}
|
||||
{{ nf.model() }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
<label for="llm-add-key">
|
||||
{{ _('API Key') }}
|
||||
<span class="pure-form-message-inline">({{ _('leave blank for local') }})</span>
|
||||
</label>
|
||||
<div class="llm-key-wrap">
|
||||
{{ nf.api_key() }}
|
||||
<button type="button" id="llm-key-toggle" class="pure-button">{{ _('show') }}</button>
|
||||
</div>
|
||||
</div>
|
||||
<div class="pure-control-group" id="llm-base-group" style="display:none">
|
||||
<label for="llm-add-base">
|
||||
{{ _('API Endpoint') }}
|
||||
<span class="pure-form-message-inline">({{ _('optional') }})</span>
|
||||
</label>
|
||||
{{ nf.api_base() }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
<label for="llm-add-tpm">
|
||||
{{ _('Tokens/min limit') }}
|
||||
<span class="pure-form-message-inline">({{ _('0 = unlimited') }})</span>
|
||||
</label>
|
||||
{{ nf.tokens_per_minute() }}
|
||||
</div>
|
||||
|
||||
<div class="pure-controls">
|
||||
<button type="button" id="llm-btn-add" class="pure-button pure-button-primary">{{ _('+ Add connection') }}</button>
|
||||
</div>
|
||||
</fieldset>
|
||||
|
||||
{# ── Prompt configuration ────────────────────────────────── #}
|
||||
<fieldset>
|
||||
<legend>{{ _('Summary Prompt') }}</legend>
|
||||
<div class="pure-control-group">
|
||||
{{ plugin_form.llm_diff_context_lines.label }}
|
||||
{{ plugin_form.llm_diff_context_lines() }}
|
||||
<span class="pure-form-message-inline">
|
||||
{{ _('Unchanged lines shown around each change in the diff sent to the LLM. More lines = more context but higher token cost. (default: 2)') }}
|
||||
</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(plugin_form.llm_summary_prompt) }}
|
||||
<span class="pure-form-message-inline">
|
||||
{{ _('Instruction appended after the diff in every LLM call. Leave blank to use the built-in default (structured JSON output).') }}
|
||||
</span>
|
||||
</div>
|
||||
</fieldset>
|
||||
@@ -1,197 +0,0 @@
|
||||
"""
|
||||
LLM notification token definitions and file I/O helpers.
|
||||
|
||||
All LLM data for a snapshot is stored under a dedicated subdirectory:
|
||||
{data_dir}/llm/{snapshot_id}-llm.json
|
||||
|
||||
A plain-text {snapshot_id}-llm.txt is also written containing just the
|
||||
summary field, for backward compatibility with any code that already reads it.
|
||||
|
||||
Token catalogue
|
||||
---------------
|
||||
llm_summary 1-3 sentence description of all changes, exact values.
|
||||
llm_headline 5-8 word punchy title — ideal for the notification subject line.
|
||||
llm_importance Numeric 1-10 significance score; enables routing rules like
|
||||
"only escalate if llm_importance >= 8".
|
||||
llm_sentiment Machine-readable: "positive", "negative", or "neutral".
|
||||
Useful for trend tracking and coloured alert styling.
|
||||
llm_one_liner Shortest useful summary — one sentence for SMS, Pushover,
|
||||
and other character-limited channels.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
from loguru import logger
|
||||
|
||||
# ── Constants ──────────────────────────────────────────────────────────────
|
||||
|
||||
LLM_TOKEN_NAMES = (
|
||||
'llm_summary',
|
||||
'llm_headline',
|
||||
'llm_importance',
|
||||
'llm_sentiment',
|
||||
'llm_one_liner',
|
||||
)
|
||||
|
||||
# How long the notification runner waits for LLM data before giving up.
|
||||
LLM_NOTIFICATION_RETRY_DELAY_SECONDS = int(os.getenv('LLM_NOTIFICATION_RETRY_DELAY', '10'))
|
||||
LLM_NOTIFICATION_MAX_WAIT_ATTEMPTS = int(os.getenv('LLM_NOTIFICATION_MAX_WAIT', '18')) # 18 × 10s = 3 min
|
||||
|
||||
# JSON prompt fragment — embedded in the final summarisation call.
|
||||
STRUCTURED_OUTPUT_INSTRUCTION = (
|
||||
'Return ONLY a valid JSON object — no markdown fences, no extra text — using exactly these keys:\n'
|
||||
'{"summary":"1-3 sentences covering ALL changes; use exact values from the diff.","headline":"5-8 word punchy title for this specific change","importance":7,"sentiment":"positive","one_liner":"One sentence for SMS/push character limits."}\n'
|
||||
'importance: 1=trivial whitespace, 5=moderate content change, 10=critical price/availability change.\n'
|
||||
'sentiment: "positive" (desirable for the user), "negative" (undesirable), or "neutral" (informational only).'
|
||||
)
|
||||
|
||||
|
||||
# ── File I/O ───────────────────────────────────────────────────────────────
|
||||
|
||||
def llm_subdir(data_dir: str) -> str:
|
||||
"""Return the llm/ subdirectory path (does not create it)."""
|
||||
return os.path.join(data_dir, 'llm')
|
||||
|
||||
|
||||
def llm_json_path(data_dir: str, snapshot_id: str) -> str:
|
||||
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.json")
|
||||
|
||||
|
||||
def llm_txt_path(data_dir: str, snapshot_id: str) -> str:
|
||||
return os.path.join(llm_subdir(data_dir), f"{snapshot_id}-llm.txt")
|
||||
|
||||
|
||||
def is_llm_data_ready(data_dir: str, snapshot_id: str) -> bool:
|
||||
"""Return True if LLM data has been written for this snapshot."""
|
||||
return os.path.exists(llm_json_path(data_dir, snapshot_id)) or \
|
||||
os.path.exists(llm_txt_path(data_dir, snapshot_id))
|
||||
|
||||
|
||||
def read_llm_tokens(data_dir: str, snapshot_id: str) -> dict:
|
||||
"""
|
||||
Read LLM token data for a snapshot.
|
||||
|
||||
Tries JSON first (new format), falls back to plain .txt (old format).
|
||||
Returns an empty dict if no data is available yet.
|
||||
"""
|
||||
json_file = llm_json_path(data_dir, snapshot_id)
|
||||
if os.path.exists(json_file):
|
||||
try:
|
||||
with open(json_file, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
if isinstance(data, dict):
|
||||
return _normalise(data)
|
||||
except Exception as exc:
|
||||
logger.warning(f"LLM tokens: failed to read {json_file}: {exc}")
|
||||
|
||||
txt_file = llm_txt_path(data_dir, snapshot_id)
|
||||
if os.path.exists(txt_file):
|
||||
try:
|
||||
with open(txt_file, 'r', encoding='utf-8') as f:
|
||||
summary = f.read().strip()
|
||||
return _normalise({'summary': summary, 'one_liner': summary[:200]})
|
||||
except Exception as exc:
|
||||
logger.warning(f"LLM tokens: failed to read {txt_file}: {exc}")
|
||||
|
||||
return {}
|
||||
|
||||
|
||||
def write_llm_data(data_dir: str, snapshot_id: str, data: dict) -> str:
|
||||
"""
|
||||
Atomically write LLM data to the llm/ subdirectory.
|
||||
|
||||
Writes:
|
||||
llm/{snapshot_id}-llm.json — full structured data (all tokens)
|
||||
llm/{snapshot_id}-llm.txt — plain summary text (backward compat)
|
||||
|
||||
Returns the path of the JSON file.
|
||||
"""
|
||||
normalised = _normalise(data)
|
||||
|
||||
subdir = llm_subdir(data_dir)
|
||||
os.makedirs(subdir, exist_ok=True)
|
||||
|
||||
json_file = llm_json_path(data_dir, snapshot_id)
|
||||
_atomic_write_text(json_file, json.dumps(normalised, ensure_ascii=False))
|
||||
|
||||
txt_file = llm_txt_path(data_dir, snapshot_id)
|
||||
_atomic_write_text(txt_file, normalised.get('summary', ''))
|
||||
|
||||
return json_file
|
||||
|
||||
|
||||
def parse_llm_response(response: str) -> dict:
|
||||
"""
|
||||
Parse a structured JSON response from the LLM.
|
||||
|
||||
Tries strict JSON parse, then extracts from markdown code fences,
|
||||
then a bare object search. Falls back to treating the whole response
|
||||
as the 'summary' field if nothing parses.
|
||||
"""
|
||||
import re
|
||||
text = response.strip()
|
||||
|
||||
# 1. Direct JSON parse
|
||||
try:
|
||||
obj = json.loads(text)
|
||||
if isinstance(obj, dict):
|
||||
return _normalise(obj)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# 2. Markdown code fence: ```json { ... } ```
|
||||
m = re.search(r'```(?:json)?\s*(\{.*?\})\s*```', text, re.DOTALL)
|
||||
if m:
|
||||
try:
|
||||
obj = json.loads(m.group(1))
|
||||
if isinstance(obj, dict):
|
||||
return _normalise(obj)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# 3. Bare JSON object anywhere in the response
|
||||
m = re.search(r'\{[^{}]*\}', text, re.DOTALL)
|
||||
if m:
|
||||
try:
|
||||
obj = json.loads(m.group(0))
|
||||
if isinstance(obj, dict):
|
||||
return _normalise(obj)
|
||||
except (json.JSONDecodeError, ValueError):
|
||||
pass
|
||||
|
||||
# 4. Fallback — treat entire response as summary
|
||||
logger.debug("LLM response was not valid JSON — using raw text as summary")
|
||||
return _normalise({'summary': text, 'one_liner': text[:200] if len(text) > 200 else text})
|
||||
|
||||
|
||||
# ── Internal helpers ───────────────────────────────────────────────────────
|
||||
|
||||
def _normalise(data: dict) -> dict:
|
||||
"""Return a clean token dict with all expected keys present."""
|
||||
importance = data.get('importance')
|
||||
if importance is not None:
|
||||
try:
|
||||
importance = max(1, min(10, int(float(importance))))
|
||||
except (TypeError, ValueError):
|
||||
importance = None
|
||||
|
||||
sentiment = str(data.get('sentiment', '')).lower().strip()
|
||||
if sentiment not in ('positive', 'negative', 'neutral'):
|
||||
sentiment = ''
|
||||
|
||||
return {
|
||||
'summary': str(data.get('summary', '') or '').strip(),
|
||||
'headline': str(data.get('headline', '') or '').strip(),
|
||||
'importance': importance,
|
||||
'sentiment': sentiment,
|
||||
'one_liner': str(data.get('one_liner', '') or '').strip(),
|
||||
}
|
||||
|
||||
|
||||
def _atomic_write_text(path: str, text: str) -> None:
|
||||
tmp = path + '.tmp'
|
||||
with open(tmp, 'w', encoding='utf-8') as f:
|
||||
f.write(text)
|
||||
f.flush()
|
||||
os.fsync(f.fileno())
|
||||
os.replace(tmp, path)
|
||||
@@ -1009,31 +1009,14 @@ class model(EntityPersistenceMixin, watch_base):
|
||||
|
||||
|
||||
def extra_notification_token_values(self):
|
||||
from changedetectionio.llm.tokens import read_llm_tokens
|
||||
history = self.history
|
||||
if not history:
|
||||
return {}
|
||||
latest_fname = history[list(history.keys())[-1]]
|
||||
snapshot_id = os.path.basename(latest_fname).split('.')[0] # always 32-char MD5
|
||||
data = read_llm_tokens(self.data_dir, snapshot_id)
|
||||
if not data:
|
||||
return {}
|
||||
return {
|
||||
'llm_summary': data.get('summary', ''),
|
||||
'llm_headline': data.get('headline', ''),
|
||||
'llm_importance': data.get('importance'),
|
||||
'llm_sentiment': data.get('sentiment', ''),
|
||||
'llm_one_liner': data.get('one_liner', ''),
|
||||
}
|
||||
# Used for providing extra tokens
|
||||
# return {'widget': 555}
|
||||
return {}
|
||||
|
||||
def extra_notification_token_placeholder_info(self):
|
||||
return [
|
||||
('llm_summary', "LLM: 1-3 sentence summary of all changes with exact values"),
|
||||
('llm_headline', "LLM: 5-8 word punchy title for this specific change"),
|
||||
('llm_importance', "LLM: Significance score 1-10 (1=trivial, 10=critical)"),
|
||||
('llm_sentiment', "LLM: Change sentiment — positive, negative, or neutral"),
|
||||
('llm_one_liner', "LLM: One sentence for SMS/push character limits"),
|
||||
]
|
||||
# Used for providing extra tokens
|
||||
# return [('widget', "Get widget amounts")]
|
||||
return []
|
||||
|
||||
|
||||
def extract_regex_from_all_history(self, regex):
|
||||
|
||||
@@ -6,7 +6,6 @@ Extracted from update_worker.py to provide standalone notification functionality
|
||||
for both sync and async workers
|
||||
"""
|
||||
import datetime
|
||||
import os
|
||||
|
||||
import pytz
|
||||
from loguru import logger
|
||||
@@ -83,12 +82,6 @@ class NotificationContextData(dict):
|
||||
'watch_tag': None,
|
||||
'watch_title': None,
|
||||
'watch_url': 'https://WATCH-PLACE-HOLDER/',
|
||||
# LLM-generated tokens (populated by notification_runner once LLM data is ready)
|
||||
'llm_headline': None,
|
||||
'llm_importance': None,
|
||||
'llm_one_liner': None,
|
||||
'llm_sentiment': None,
|
||||
'llm_summary': None,
|
||||
})
|
||||
|
||||
# Apply any initial data passed in
|
||||
@@ -281,11 +274,6 @@ class NotificationService:
|
||||
timestamp_changed=dates[date_index_to]))
|
||||
|
||||
if self.notification_q:
|
||||
# Store snapshot_id hint so notification_runner can gate on LLM data readiness
|
||||
if watch and len(dates) > 0:
|
||||
latest_fname = watch.history.get(dates[date_index_to], '')
|
||||
if latest_fname:
|
||||
n_object['_llm_snapshot_id'] = os.path.basename(latest_fname).split('.')[0]
|
||||
logger.debug("Queued notification for sending")
|
||||
self.notification_q.put(n_object)
|
||||
else:
|
||||
|
||||
@@ -151,8 +151,7 @@ class ChangeDetectionSpec:
|
||||
pass
|
||||
|
||||
@hookspec
|
||||
def update_finalize(update_handler, watch, datastore, processing_exception,
|
||||
changed_detected=False, snapshot_id=None):
|
||||
def update_finalize(update_handler, watch, datastore, processing_exception):
|
||||
"""Called after watch processing completes (success or failure).
|
||||
|
||||
This hook is called in the finally block after all processing is complete,
|
||||
@@ -169,10 +168,6 @@ class ChangeDetectionSpec:
|
||||
processing_exception: The exception from the main processing block, or None if successful.
|
||||
This does NOT include cleanup exceptions - only exceptions from
|
||||
the actual watch processing (fetch, diff, etc).
|
||||
changed_detected: True when the processor detected a content change (default False).
|
||||
snapshot_id: MD5 hex string of the new snapshot, matches the prefix of the history
|
||||
filename (e.g. 'abc123…' → 'abc123….txt[.br]'). None when no snapshot
|
||||
was saved (first run, error, same content).
|
||||
|
||||
Returns:
|
||||
None: This hook doesn't return a value
|
||||
@@ -585,8 +580,7 @@ def apply_update_handler_alter(update_handler, watch, datastore):
|
||||
return current_handler
|
||||
|
||||
|
||||
def apply_update_finalize(update_handler, watch, datastore, processing_exception,
|
||||
changed_detected=False, snapshot_id=None):
|
||||
def apply_update_finalize(update_handler, watch, datastore, processing_exception):
|
||||
"""Apply update_finalize hooks from all plugins.
|
||||
|
||||
Called in the finally block after watch processing completes, allowing plugins
|
||||
@@ -597,8 +591,6 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
|
||||
watch: The watch dict that was processed (may be None)
|
||||
datastore: The application datastore
|
||||
processing_exception: The exception from processing, or None if successful
|
||||
changed_detected: True when the processor detected a content change.
|
||||
snapshot_id: MD5 hex string of the new snapshot, or None.
|
||||
|
||||
Returns:
|
||||
None
|
||||
@@ -609,9 +601,7 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
|
||||
update_handler=update_handler,
|
||||
watch=watch,
|
||||
datastore=datastore,
|
||||
processing_exception=processing_exception,
|
||||
changed_detected=changed_detected,
|
||||
snapshot_id=snapshot_id,
|
||||
processing_exception=processing_exception
|
||||
)
|
||||
except Exception as e:
|
||||
# Don't let plugin errors crash the worker
|
||||
|
||||
@@ -1,187 +0,0 @@
|
||||
/* llm.js — LLM Connections management (settings page)
|
||||
* Depends on: jQuery (global), LLM_CONNECTIONS + LLM_I18N injected by Jinja2 template.
|
||||
*/
|
||||
(function ($) {
|
||||
'use strict';
|
||||
|
||||
// Provider presets: [value, label, model, api_base, tpm]
|
||||
// tpm = tokens-per-minute limit (0 = unlimited / local).
|
||||
// Defaults reflect free-tier or conservative tier-1 limits.
|
||||
var LLM_PRESETS = [
|
||||
['openai-mini', 'OpenAI — gpt-4o-mini', 'gpt-4o-mini', '', 200000],
|
||||
['openai-4o', 'OpenAI — gpt-4o', 'gpt-4o', '', 30000],
|
||||
['anthropic-haiku', 'Anthropic — claude-3-haiku', 'anthropic/claude-3-haiku-20240307', '', 100000],
|
||||
['anthropic-sonnet', 'Anthropic — claude-3-5-sonnet', 'anthropic/claude-3-5-sonnet-20241022', '', 40000],
|
||||
['groq-8b', 'Groq — llama-3.1-8b-instant', 'groq/llama-3.1-8b-instant', '', 6000],
|
||||
['groq-70b', 'Groq — llama-3.3-70b-versatile', 'groq/llama-3.3-70b-versatile', '', 6000],
|
||||
['gemini-flash', 'Google — gemini-1.5-flash', 'gemini/gemini-1.5-flash', '', 1000000],
|
||||
['mistral-small', 'Mistral — mistral-small', 'mistral/mistral-small-latest', '', 500000],
|
||||
['deepseek', 'DeepSeek — deepseek-chat', 'deepseek/deepseek-chat', '', 50000],
|
||||
['openrouter', 'OpenRouter (custom model)', 'openrouter/', '', 20000],
|
||||
['ollama-llama', 'Ollama — llama3.1 (local)', 'ollama/llama3.1', 'http://localhost:11434', 0],
|
||||
['ollama-mistral', 'Ollama — mistral (local)', 'ollama/mistral', 'http://localhost:11434', 0],
|
||||
['lmstudio', 'LM Studio (local)', 'openai/local', 'http://localhost:1234/v1', 0],
|
||||
];
|
||||
|
||||
var presetMap = {};
|
||||
$.each(LLM_PRESETS, function (_, p) { presetMap[p[0]] = p; });
|
||||
|
||||
function escHtml(s) {
|
||||
return $('<div>').text(String(s)).html();
|
||||
}
|
||||
|
||||
function maskKey(k) {
|
||||
if (!k) return '<span style="color:var(--color-grey-700)">—</span>';
|
||||
return escHtml(k.substring(0, 4)) + '••••';
|
||||
}
|
||||
|
||||
// Emit WTForms FieldList hidden inputs (llm_connection-N-fieldname) so the
|
||||
// server processes connections through the declared schema — no arbitrary keys.
|
||||
function serialise() {
|
||||
var $form = $('form.settings');
|
||||
$form.find('input[data-llm-gen]').remove();
|
||||
|
||||
var ids = Object.keys(LLM_CONNECTIONS);
|
||||
$.each(ids, function (i, id) {
|
||||
var c = LLM_CONNECTIONS[id];
|
||||
var prefix = 'llm_connection-' + i + '-';
|
||||
var fields = {
|
||||
connection_id: id,
|
||||
name: c.name || '',
|
||||
model: c.model || '',
|
||||
api_key: c.api_key || '',
|
||||
api_base: c.api_base || '',
|
||||
tokens_per_minute: parseInt(c.tokens_per_minute || 0, 10)
|
||||
};
|
||||
$.each(fields, function (field, value) {
|
||||
$('<input>').attr({ type: 'hidden', name: prefix + field, value: value, 'data-llm-gen': '1' }).appendTo($form);
|
||||
});
|
||||
// BooleanField: only emit when true (absence == false in WTForms)
|
||||
if (c.is_default) {
|
||||
$('<input>').attr({ type: 'hidden', name: prefix + 'is_default', value: 'y', 'data-llm-gen': '1' }).appendTo($form);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
function renderTable() {
|
||||
var $tbody = $('#llm-connections-tbody');
|
||||
$tbody.empty();
|
||||
var ids = Object.keys(LLM_CONNECTIONS);
|
||||
if (!ids.length) {
|
||||
$tbody.html('<tr class="llm-empty"><td colspan="6">' + escHtml(LLM_I18N.noConnections) + '</td></tr>');
|
||||
return;
|
||||
}
|
||||
$.each(ids, function (_, id) {
|
||||
var c = LLM_CONNECTIONS[id];
|
||||
var tpm = parseInt(c.tokens_per_minute || 0, 10);
|
||||
var tpmLabel = tpm ? tpm.toLocaleString() : '<span style="color:var(--color-grey-700)">∞</span>';
|
||||
$tbody.append(
|
||||
'<tr>' +
|
||||
'<td class="llm-col-def">' +
|
||||
'<input type="radio" class="llm-default-radio" name="llm_default_radio"' +
|
||||
' title="' + escHtml(LLM_I18N.setDefault) + '"' +
|
||||
(c.is_default ? ' checked' : '') +
|
||||
' data-id="' + escHtml(id) + '">' +
|
||||
'</td>' +
|
||||
'<td class="llm-col-name">' + escHtml(c.name) + '</td>' +
|
||||
'<td class="llm-col-model">' + escHtml(c.model) + '</td>' +
|
||||
'<td class="llm-col-key">' + maskKey(c.api_key) + '</td>' +
|
||||
'<td class="llm-col-tpm">' + tpmLabel + '</td>' +
|
||||
'<td class="llm-col-del">' +
|
||||
'<button type="button" class="llm-del"' +
|
||||
' title="' + escHtml(LLM_I18N.remove) + '"' +
|
||||
' data-id="' + escHtml(id) + '">×</button>' +
|
||||
'</td>' +
|
||||
'</tr>'
|
||||
);
|
||||
});
|
||||
}
|
||||
|
||||
$(function () {
|
||||
// Event delegation on tbody — survives re-renders
|
||||
$('#llm-connections-tbody')
|
||||
.on('change', '.llm-default-radio', function () {
|
||||
var chosen = String($(this).data('id'));
|
||||
$.each(LLM_CONNECTIONS, function (k) {
|
||||
LLM_CONNECTIONS[k].is_default = (k === chosen);
|
||||
});
|
||||
serialise();
|
||||
})
|
||||
.on('click', '.llm-del', function () {
|
||||
var id = String($(this).data('id'));
|
||||
delete LLM_CONNECTIONS[id];
|
||||
var remaining = Object.keys(LLM_CONNECTIONS);
|
||||
if (remaining.length && !remaining.some(function (k) { return LLM_CONNECTIONS[k].is_default; })) {
|
||||
LLM_CONNECTIONS[remaining[0]].is_default = true;
|
||||
}
|
||||
renderTable();
|
||||
serialise();
|
||||
});
|
||||
|
||||
function updateBaseVisibility() {
|
||||
var val = $('#llm-preset').val();
|
||||
var preset = presetMap[val];
|
||||
var hasBase = preset ? !!preset[3] : (val === 'custom');
|
||||
var show = (val === 'custom') || hasBase;
|
||||
$('#llm-base-group').toggle(show);
|
||||
}
|
||||
|
||||
// Preset dropdown pre-fills add form
|
||||
$('#llm-preset').on('change', function () {
|
||||
var val = $(this).val();
|
||||
var p = presetMap[val];
|
||||
if (p) {
|
||||
$('#llm-add-name').val(p[1].replace(/\s*—.*/, '').trim());
|
||||
$('#llm-add-model').val(p[2]);
|
||||
$('#llm-add-base').val(p[3]);
|
||||
$('#llm-add-tpm').val(p[4] !== undefined ? p[4] : 0);
|
||||
$('#llm-add-key').val('');
|
||||
}
|
||||
updateBaseVisibility();
|
||||
});
|
||||
|
||||
// Add connection
|
||||
$('#llm-btn-add').on('click', function () {
|
||||
var name = $.trim($('#llm-add-name').val());
|
||||
var model = $.trim($('#llm-add-model').val());
|
||||
var key = $.trim($('#llm-add-key').val());
|
||||
var base = $.trim($('#llm-add-base').val());
|
||||
var tpm = parseInt($('#llm-add-tpm').val(), 10) || 0;
|
||||
if (!name || !model) {
|
||||
alert(LLM_I18N.nameModelRequired);
|
||||
return;
|
||||
}
|
||||
var id = 'llm-' + Date.now();
|
||||
var isFirst = !Object.keys(LLM_CONNECTIONS).length;
|
||||
LLM_CONNECTIONS[id] = {
|
||||
name: name, model: model, api_key: key, api_base: base,
|
||||
tokens_per_minute: tpm, is_default: isFirst
|
||||
};
|
||||
$('#llm-preset, #llm-add-name, #llm-add-model, #llm-add-key, #llm-add-base').val('');
|
||||
$('#llm-add-tpm').val('0');
|
||||
$('#llm-base-group').hide();
|
||||
renderTable();
|
||||
serialise();
|
||||
});
|
||||
|
||||
// Show/hide API key visibility
|
||||
$('#llm-key-toggle').on('click', function () {
|
||||
var $inp = $('#llm-add-key');
|
||||
if ($inp.attr('type') === 'password') {
|
||||
$inp.attr('type', 'text');
|
||||
$(this).text(LLM_I18N.hide);
|
||||
} else {
|
||||
$inp.attr('type', 'password');
|
||||
$(this).text(LLM_I18N.show);
|
||||
}
|
||||
});
|
||||
|
||||
// Serialise connections to hidden field before form submit
|
||||
$('form.settings').on('submit', serialise);
|
||||
|
||||
// Init
|
||||
renderTable();
|
||||
serialise();
|
||||
});
|
||||
|
||||
}(jQuery));
|
||||
@@ -1,57 +0,0 @@
|
||||
#llm {
|
||||
// ── Key field wrapper — input + show/hide toggle inline ───────────────
|
||||
.llm-key-wrap {
|
||||
display: flex;
|
||||
gap: 0.3em;
|
||||
align-items: center;
|
||||
|
||||
input { flex: 1; min-width: 0; }
|
||||
|
||||
button {
|
||||
flex: 0 0 auto;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Pure-grid column padding consistency ──────────────────────────────
|
||||
.pure-u-md-1-2 {
|
||||
.pure-control-group {
|
||||
padding-right: 1em;
|
||||
}
|
||||
}
|
||||
|
||||
// ── Connections table ─────────────────────────────────────────────────
|
||||
table.llm-connections {
|
||||
width: 100%;
|
||||
|
||||
.llm-col-def { width: 3em; text-align: center; }
|
||||
.llm-col-name { font-weight: 500; }
|
||||
.llm-col-model { font-family: monospace; font-size: 0.85em; color: var(--color-grey-400); }
|
||||
.llm-col-key {
|
||||
font-family: monospace; font-size: 0.82em; color: var(--color-grey-600);
|
||||
max-width: 140px; overflow: hidden; text-overflow: ellipsis; white-space: nowrap;
|
||||
}
|
||||
.llm-col-del { width: 2.5em; text-align: center; }
|
||||
|
||||
.llm-del {
|
||||
background: none;
|
||||
border: none;
|
||||
cursor: pointer;
|
||||
color: var(--color-grey-600);
|
||||
padding: 0.15em 0.4em;
|
||||
border-radius: 3px;
|
||||
font-size: 1.1em;
|
||||
line-height: 1;
|
||||
&:hover { color: var(--color-dark-red); background: #ffeaea; }
|
||||
}
|
||||
|
||||
.llm-empty td {
|
||||
text-align: center;
|
||||
color: var(--color-grey-600);
|
||||
padding: 1.8em;
|
||||
font-style: italic;
|
||||
font-size: 0.92em;
|
||||
}
|
||||
}
|
||||
|
||||
.llm-default-radio { cursor: pointer; }
|
||||
}
|
||||
@@ -32,7 +32,6 @@
|
||||
@use "parts/toast";
|
||||
@use "parts/login_form";
|
||||
@use "parts/tabs";
|
||||
@use "parts/llm";
|
||||
|
||||
// Smooth transitions for theme switching
|
||||
body,
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -249,14 +249,13 @@ def prepare_test_function(live_server, datastore_path):
|
||||
# CRITICAL: Get datastore and stop it from writing stale data
|
||||
datastore = live_server.app.config.get('DATASTORE')
|
||||
|
||||
# Clear the queues before starting the test to prevent state leakage
|
||||
from changedetectionio.flask_app import update_q, llm_summary_q
|
||||
for q in (update_q, llm_summary_q):
|
||||
while not q.empty():
|
||||
try:
|
||||
q.get_nowait()
|
||||
except:
|
||||
break
|
||||
# Clear the queue before starting the test to prevent state leakage
|
||||
from changedetectionio.flask_app import update_q
|
||||
while not update_q.empty():
|
||||
try:
|
||||
update_q.get_nowait()
|
||||
except:
|
||||
break
|
||||
|
||||
# Add test helper methods to the app for worker management
|
||||
def set_workers(count):
|
||||
|
||||
@@ -1,260 +0,0 @@
|
||||
"""
|
||||
Tests for LLM summary queue, worker, and regenerate route.
|
||||
|
||||
Mocking strategy
|
||||
----------------
|
||||
- `_call_llm` is patched at the module level so no real LiteLLM/API calls are made.
|
||||
- `_write_summary` is left un-patched so we can assert the file was actually written.
|
||||
- `process_llm_summary` is called directly in unit tests (no worker thread needed).
|
||||
"""
|
||||
|
||||
import os
|
||||
import queue
|
||||
import time
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
import pytest
|
||||
from flask import url_for
|
||||
|
||||
from changedetectionio.tests.util import set_original_response, set_modified_response, wait_for_all_checks
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit tests — process_llm_summary directly, no HTTP, no worker thread
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestProcessLlmSummary:
|
||||
|
||||
def _make_watch_with_two_snapshots(self, client, datastore_path):
|
||||
"""Helper: returns (datastore, uuid, snapshot_id) with 2 history entries."""
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
datastore = client.application.config['DATASTORE']
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
uuid = datastore.add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watch = datastore.data['watching'][uuid]
|
||||
history_keys = list(watch.history.keys())
|
||||
snapshot_id = os.path.basename(watch.history[history_keys[1]]).split('.')[0]
|
||||
return datastore, uuid, snapshot_id
|
||||
|
||||
def test_writes_summary_file(self, client, live_server, datastore_path):
|
||||
"""process_llm_summary writes {snapshot_id}-llm.txt when _call_llm succeeds."""
|
||||
datastore, uuid, snapshot_id = self._make_watch_with_two_snapshots(client, datastore_path)
|
||||
watch = datastore.data['watching'][uuid]
|
||||
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
|
||||
|
||||
from changedetectionio.llm.queue_worker import process_llm_summary
|
||||
with patch('changedetectionio.llm.queue_worker._call_llm', return_value='Price dropped from $10 to $8.') as mock_llm:
|
||||
process_llm_summary(item, datastore)
|
||||
|
||||
assert mock_llm.called
|
||||
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
|
||||
assert os.path.exists(summary_path), "Summary file was not written"
|
||||
assert open(summary_path).read() == 'Price dropped from $10 to $8.'
|
||||
|
||||
def test_call_llm_uses_temperature_zero_and_seed(self, client, live_server, datastore_path):
|
||||
"""_call_llm always passes temperature=0 and seed=0 to litellm for determinism."""
|
||||
import litellm
|
||||
from changedetectionio.llm.queue_worker import _call_llm
|
||||
|
||||
messages = [{'role': 'user', 'content': 'hello'}]
|
||||
mock_response = MagicMock()
|
||||
mock_response.choices[0].message.content = 'ok'
|
||||
|
||||
with patch('litellm.completion', return_value=mock_response) as mock_completion:
|
||||
_call_llm(model='gpt-4o-mini', messages=messages)
|
||||
|
||||
call_kwargs = mock_completion.call_args.kwargs
|
||||
assert call_kwargs['temperature'] == 0, "temperature must be 0"
|
||||
assert call_kwargs['seed'] == 0, "seed must be 0 for reproducibility"
|
||||
assert 'top_p' not in call_kwargs, "top_p must not be set (redundant at temp=0)"
|
||||
assert 'frequency_penalty' not in call_kwargs, "frequency_penalty must not be set"
|
||||
assert 'presence_penalty' not in call_kwargs, "presence_penalty must not be set"
|
||||
|
||||
def test_skips_first_history_entry(self, client, live_server, datastore_path):
|
||||
"""process_llm_summary raises ValueError for the first history entry (no prior to diff)."""
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
datastore = client.application.config['DATASTORE']
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
uuid = datastore.add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watch = datastore.data['watching'][uuid]
|
||||
history_keys = list(watch.history.keys())
|
||||
first_fname = watch.history[history_keys[0]]
|
||||
snapshot_id = os.path.basename(first_fname).split('.')[0]
|
||||
|
||||
item = {'uuid': uuid, 'snapshot_id': snapshot_id, 'attempts': 0}
|
||||
|
||||
from changedetectionio.llm.queue_worker import process_llm_summary
|
||||
with pytest.raises(ValueError, match="first history entry"):
|
||||
process_llm_summary(item, datastore)
|
||||
|
||||
def test_raises_for_unknown_watch(self, client, live_server, datastore_path):
|
||||
"""process_llm_summary raises ValueError if the watch UUID doesn't exist."""
|
||||
datastore = client.application.config['DATASTORE']
|
||||
item = {'uuid': 'does-not-exist', 'snapshot_id': 'abc123', 'attempts': 0}
|
||||
|
||||
from changedetectionio.llm.queue_worker import process_llm_summary
|
||||
with pytest.raises(ValueError, match="not found"):
|
||||
process_llm_summary(item, datastore)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Unit tests — worker retry logic, no HTTP
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWorkerRetry:
|
||||
|
||||
def test_requeues_on_failure_with_backoff(self, client, live_server, datastore_path):
|
||||
"""Worker re-queues a failed item with incremented attempts and future next_retry_at."""
|
||||
from changedetectionio.llm.queue_worker import MAX_RETRIES, RETRY_BACKOFF_BASE_SECONDS
|
||||
|
||||
llm_q = queue.Queue()
|
||||
app = client.application
|
||||
datastore = client.application.config['DATASTORE']
|
||||
|
||||
item = {'uuid': 'fake-uuid', 'snapshot_id': 'abc123', 'attempts': 0}
|
||||
llm_q.put(item)
|
||||
|
||||
from changedetectionio.llm.queue_worker import process_llm_summary
|
||||
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("API down")):
|
||||
# Run one iteration manually (don't start the full runner thread)
|
||||
from changedetectionio.llm import queue_worker
|
||||
got = llm_q.get(block=False)
|
||||
try:
|
||||
queue_worker.process_llm_summary(got, datastore)
|
||||
except Exception as e:
|
||||
got['attempts'] += 1
|
||||
got['next_retry_at'] = time.time() + RETRY_BACKOFF_BASE_SECONDS * (2 ** (got['attempts'] - 1))
|
||||
llm_q.put(got)
|
||||
|
||||
assert llm_q.qsize() == 1
|
||||
requeued = llm_q.get_nowait()
|
||||
assert requeued['attempts'] == 1
|
||||
assert requeued['next_retry_at'] > time.time()
|
||||
|
||||
def test_drops_after_max_retries(self, client, live_server, datastore_path):
|
||||
"""Worker drops item and records last_error after MAX_RETRIES exhausted."""
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
datastore = client.application.config['DATASTORE']
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = datastore.add_watch(url=test_url)
|
||||
|
||||
from changedetectionio.llm.queue_worker import MAX_RETRIES
|
||||
item = {'uuid': uuid, 'snapshot_id': 'abc123', 'attempts': MAX_RETRIES}
|
||||
|
||||
llm_q = queue.Queue()
|
||||
llm_q.put(item)
|
||||
|
||||
with patch('changedetectionio.llm.queue_worker.process_llm_summary', side_effect=RuntimeError("still down")):
|
||||
from changedetectionio.llm import queue_worker
|
||||
got = llm_q.get(block=False)
|
||||
try:
|
||||
queue_worker.process_llm_summary(got, datastore)
|
||||
except Exception as e:
|
||||
if got['attempts'] < MAX_RETRIES:
|
||||
llm_q.put(got)
|
||||
else:
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
||||
|
||||
# Queue should be empty — item was dropped
|
||||
assert llm_q.empty()
|
||||
watch = datastore.data['watching'][uuid]
|
||||
assert 'still down' in (watch.get('last_error') or '')
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Route tests — GET /edit/<uuid>/regenerate-llm-summaries
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRegenerateLlmSummariesRoute:
|
||||
|
||||
def test_queues_missing_summaries(self, client, live_server, datastore_path):
|
||||
"""Route queues one item per history entry that lacks a -llm.txt file."""
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
datastore = client.application.config['DATASTORE']
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
uuid = datastore.add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watch = datastore.data['watching'][uuid]
|
||||
assert watch.history_n >= 2
|
||||
|
||||
from changedetectionio.flask_app import llm_summary_q
|
||||
|
||||
res = client.get(
|
||||
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
|
||||
follow_redirects=True,
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
# history_n - 1 items queued (first entry skipped, no prior to diff)
|
||||
expected = watch.history_n - 1
|
||||
assert llm_summary_q.qsize() == expected
|
||||
|
||||
# Each item has the right shape
|
||||
items = []
|
||||
while not llm_summary_q.empty():
|
||||
items.append(llm_summary_q.get_nowait())
|
||||
|
||||
for item in items:
|
||||
assert item['uuid'] == uuid
|
||||
assert item['attempts'] == 0
|
||||
assert len(item['snapshot_id']) == 32 # MD5 hex
|
||||
|
||||
def test_skips_already_summarised_entries(self, client, live_server, datastore_path):
|
||||
"""Route skips entries where {snapshot_id}-llm.txt already exists."""
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
datastore = client.application.config['DATASTORE']
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
uuid = datastore.add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
set_modified_response(datastore_path=datastore_path)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watch = datastore.data['watching'][uuid]
|
||||
history_keys = list(watch.history.keys())
|
||||
second_fname = watch.history[history_keys[1]]
|
||||
snapshot_id = os.path.basename(second_fname).split('.')[0]
|
||||
|
||||
# Pre-write a summary file
|
||||
summary_path = os.path.join(watch.data_dir, f"{snapshot_id}-llm.txt")
|
||||
with open(summary_path, 'w') as f:
|
||||
f.write('already done')
|
||||
|
||||
from changedetectionio.flask_app import llm_summary_q
|
||||
|
||||
client.get(
|
||||
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid=uuid),
|
||||
follow_redirects=True,
|
||||
)
|
||||
|
||||
# That entry should have been skipped — queue should be empty
|
||||
assert llm_summary_q.empty()
|
||||
|
||||
def test_404_for_unknown_watch(self, client, live_server, datastore_path):
|
||||
res = client.get(
|
||||
url_for('ui.ui_edit.watch_regenerate_llm_summaries', uuid='does-not-exist'),
|
||||
follow_redirects=False,
|
||||
)
|
||||
assert res.status_code == 404
|
||||
@@ -518,8 +518,6 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
# (cleanup may delete these variables, but plugins need the original references)
|
||||
finalize_handler = update_handler # Capture now, before cleanup deletes it
|
||||
finalize_watch = watch # Capture now, before any modifications
|
||||
finalize_changed_detected = locals().get('changed_detected', False)
|
||||
finalize_snapshot_id = (locals().get('update_obj') or {}).get('previous_md5') or ''
|
||||
|
||||
# Call quit() as backup (Puppeteer/Playwright have internal cleanup, but this acts as safety net)
|
||||
try:
|
||||
@@ -560,9 +558,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
update_handler=finalize_handler,
|
||||
watch=finalize_watch,
|
||||
datastore=datastore,
|
||||
processing_exception=processing_exception,
|
||||
changed_detected=finalize_changed_detected,
|
||||
snapshot_id=finalize_snapshot_id,
|
||||
processing_exception=processing_exception
|
||||
)
|
||||
except Exception as finalize_error:
|
||||
logger.error(f"Worker {worker_id} error in finalize hook: {finalize_error}")
|
||||
|
||||
@@ -97,8 +97,12 @@ pytest ~=9.0
|
||||
pytest-flask ~=1.3
|
||||
pytest-mock ~=3.15
|
||||
|
||||
# Anything 4.0 and up but not 5.0
|
||||
jsonschema ~= 4.26
|
||||
|
||||
# OpenAPI validation support
|
||||
openapi-core[flask] ~= 0.22
|
||||
openapi-core[flask] >= 0.19.0
|
||||
|
||||
|
||||
loguru
|
||||
|
||||
@@ -120,7 +124,8 @@ greenlet >= 3.0.3
|
||||
# Default SOCKETIO_MODE=threading is recommended for better compatibility
|
||||
gevent
|
||||
|
||||
referencing # Don't pin — jsonschema-path (required by openapi-core>=0.18) caps referencing<0.37.0, so pinning 0.37.0 forces openapi-core back to 0.17.2. Revisit once jsonschema-path>=0.3.5 relaxes the cap.
|
||||
# Previously pinned for flask_expects_json (removed 2026-02). Unpinning for now.
|
||||
referencing
|
||||
|
||||
# For conditions
|
||||
panzi-json-logic
|
||||
@@ -151,10 +156,3 @@ blinker
|
||||
pytest-xdist
|
||||
|
||||
|
||||
litellm
|
||||
# pydantic-core >=2.41 imports typing_extensions.Sentinel, which is absent in the
|
||||
# system-installed typing_extensions on many Linux distros (e.g. Ubuntu 22/24).
|
||||
# When the system path leaks into sys.path before the venv, the system copy is
|
||||
# cached first and the import fails at runtime inside the LLM worker thread.
|
||||
pydantic-core<2.41
|
||||
pydantic<2.12
|
||||
|
||||
Reference in New Issue
Block a user