Compare commits

...

2 Commits

Author SHA1 Message Date
dgtlmoon e75e28dac7 Adding missing file 2026-05-21 14:10:39 +02:00
dgtlmoon 6765125206 LLM - Plugin for altering queries and recording query result/token stats etc 2026-05-21 14:06:37 +02:00
8 changed files with 505 additions and 10 deletions
+60
View File
@@ -38,6 +38,66 @@ def ui_edit_stats_extras(watch):
3. The HTML you return will be included in the Stats tab.
## LLM Query Hooks
External packages can observe and modify every LiteLLM call (intent evaluation,
change summaries, restock extraction, connection tests, etc.).
### `llm_query_alter` — before the request
Return a dict of keys to merge into the call context (`messages`, `model`,
`max_tokens`, `api_key`, `api_base`, `extra_body`, …).
```python
from changedetectionio.pluggy_interface import hookimpl
@hookimpl
def llm_query_alter(llm_context):
# llm_context includes:
# purpose, watch, datastore, app_guid, watch_uuid, timestamp_utc,
# settings (full application settings copy), model, messages, ...
if llm_context.get('purpose') != 'evaluate_change':
return None
messages = list(llm_context['messages'])
messages.append({'role': 'user', 'content': 'Extra auditing instruction.'})
return {'messages': messages}
```
### `llm_query_finalize` — after success or failure
Use for token/cost accounting (MySQL, Prometheus, billing exports, etc.).
```python
@hookimpl
def llm_query_finalize(llm_context, result, error):
if error:
log_failure(llm_context['app_guid'], llm_context['watch_uuid'], error)
return
# result keys: text, total_tokens, input_tokens, output_tokens,
# cost_usd, litellm_response_cost_usd, model, finish_reason, duration_seconds
record_usage(
app_guid=llm_context['app_guid'],
watch_uuid=llm_context['watch_uuid'],
purpose=llm_context['purpose'],
tokens=result['total_tokens'],
cost_usd=result['cost_usd'],
at=llm_context['timestamp_utc'],
)
```
Register via setuptools entry point (namespace `changedetectionio`), same as other plugins:
```python
entry_points={
'changedetectionio': [
'llm_accounting = my_package.llm_plugin',
],
},
```
**Purpose values** (call-site identifiers): `evaluate_change`, `summarise_change`,
`run_setup`, `preview_extract`, `restock_extract`, `connection_test`.
## Plugin Loading
Plugins can be loaded from:
+5 -2
View File
@@ -134,7 +134,7 @@ def construct_llm_blueprint(datastore: ChangeDetectionStore):
@login_optionally_required
def llm_test():
from flask import request
from changedetectionio.llm.client import completion
from changedetectionio.llm.invocation import llm_completion
from changedetectionio.validate_url import is_llm_api_base_safe
# Pull stored config as the fallback, then override with anything the
@@ -194,7 +194,10 @@ def construct_llm_blueprint(datastore: ChangeDetectionStore):
# cloud reasoning models (e.g. ollama.com hosting qwen3.5:397b takes ~60s on
# first hit) even though the same call succeeds in production.
from changedetectionio.llm.evaluator import apply_local_token_multiplier
text, total_tokens, input_tokens, output_tokens = completion(
text, total_tokens, input_tokens, output_tokens = llm_completion(
'connection_test',
watch=None,
datastore=datastore,
model=model,
messages=[{'role': 'user', 'content':
'Respond with just the word: ready'}],
+21 -1
View File
@@ -54,12 +54,26 @@ def _install_litellm_debug():
logger.info("LLM client: litellm debug logging routed through loguru")
def _litellm_response_cost_usd(response) -> float | None:
"""Extract provider/litellm-reported cost from a completion response, if present."""
try:
from litellm.cost_calculator import get_response_cost_from_hidden_params
hidden = getattr(response, '_hidden_params', None) or {}
cost = get_response_cost_from_hidden_params(hidden)
if cost is not None:
return float(cost)
except Exception:
pass
return None
def completion(model: str, messages: list, api_key: str = None,
api_base: str = None, timeout: int = DEFAULT_TIMEOUT,
max_tokens: int = None, extra_body: dict = None,
debug: bool = False) -> tuple[str, int, int, int]:
debug: bool = False, return_metadata: bool = False):
"""
Call the LLM and return (response_text, total_tokens, input_tokens, output_tokens).
When return_metadata=True, appends a dict with finish_reason and litellm cost fields.
Retries up to DEFAULT_RETRIES times on timeout or connection errors.
Token counts are 0 if the provider doesn't return usage data.
Raises on network/auth errors — callers handle gracefully.
@@ -134,6 +148,12 @@ def completion(model: str, messages: list, api_key: str = None,
f"tokens={total_tokens} (in={input_tokens} out={output_tokens}) "
f"text_len={len(text)}"
)
if return_metadata:
metadata = {'finish_reason': finish}
litellm_cost = _litellm_response_cost_usd(response)
if litellm_cost is not None:
metadata['litellm_response_cost_usd'] = litellm_cost
return text, total_tokens, input_tokens, output_tokens, metadata
return text, total_tokens, input_tokens, output_tokens
except _retryable as e:
+17 -5
View File
@@ -22,7 +22,7 @@ from loguru import logger
from changedetectionio.strtobool import strtobool
from . import client as llm_client
from .invocation import llm_completion
from .prompt_builder import (
build_change_summary_prompt, build_change_summary_system_prompt,
build_eval_prompt, build_eval_system_prompt,
@@ -425,7 +425,10 @@ def run_setup(watch, datastore, snapshot_text: str) -> None:
user_prompt = build_setup_prompt(intent, snapshot_text, url=url)
try:
raw, tokens, *_ = llm_client.completion(
raw, tokens, *_ = llm_completion(
'run_setup',
watch=watch,
datastore=datastore,
model=cfg['model'],
messages=[
_cached_system(system_prompt, model=cfg['model']),
@@ -577,7 +580,10 @@ def summarise_change(watch, datastore, diff: str, current_snapshot: str = '') ->
_extra_body = _thinking_extra_body(cfg['model'], _thinking_budget)
try:
_resp = llm_client.completion(
_resp = llm_completion(
'summarise_change',
watch=watch,
datastore=datastore,
model=cfg['model'],
messages=[
_cached_system(system_prompt, model=cfg['model']),
@@ -646,7 +652,10 @@ def preview_extract(watch, datastore, content: str) -> dict | None:
user_prompt = build_preview_prompt(intent, content, url=url, title=title)
try:
raw, tokens, *_ = llm_client.completion(
raw, tokens, *_ = llm_completion(
'preview_extract',
watch=watch,
datastore=datastore,
model=cfg['model'],
messages=[
_cached_system(system_prompt, model=cfg['model']),
@@ -731,7 +740,10 @@ def evaluate_change(watch, datastore, diff: str, current_snapshot: str = '') ->
)
try:
_resp = llm_client.completion(
_resp = llm_completion(
'evaluate_change',
watch=watch,
datastore=datastore,
model=cfg['model'],
messages=[
_cached_system(system_prompt, model=cfg['model']),
+151
View File
@@ -0,0 +1,151 @@
"""
Central LLM invocation path with pluggy hooks.
All production litellm calls should go through llm_completion() so external plugins
can alter requests (llm_query_alter) and record usage afterward (llm_query_finalize).
"""
import time
from copy import deepcopy
from datetime import datetime, timezone
from loguru import logger
from changedetectionio.pluggy_interface import apply_llm_query_alter, apply_llm_query_finalize
from . import client as llm_client
def build_llm_context(
purpose: str,
*,
watch=None,
datastore=None,
model: str,
messages: list,
api_key: str = None,
api_base: str = None,
timeout: int = None,
max_tokens: int = None,
extra_body: dict = None,
debug: bool = False,
) -> dict:
"""Build the context dict for llm_query_alter / llm_query_finalize.
See ChangeDetectionSpec.llm_query_finalize in pluggy_interface.py for the
full field reference (purpose, app_guid, watch_uuid, settings, result keys, ).
"""
app_guid = None
settings = None
if datastore is not None:
try:
app_guid = datastore.data.get('app_guid')
settings = deepcopy(datastore.data.get('settings') or {})
except Exception:
pass
watch_uuid = None
if watch is not None:
watch_uuid = watch.get('uuid') if isinstance(watch, dict) else getattr(watch, 'uuid', None)
return {
'purpose': purpose,
'watch': watch,
'datastore': datastore,
'app_guid': app_guid,
'watch_uuid': watch_uuid,
'timestamp_utc': datetime.now(timezone.utc).isoformat(),
'settings': settings,
'model': model,
'messages': messages,
'api_key': api_key,
'api_base': api_base,
'timeout': timeout,
'max_tokens': max_tokens,
'extra_body': extra_body,
'debug': debug,
}
def _completion_cost_usd(model: str, input_tokens: int, output_tokens: int, metadata: dict) -> float:
"""Prefer litellm's response cost when present, else use the app's pricing estimate."""
litellm_cost = (metadata or {}).get('litellm_response_cost_usd')
if litellm_cost is not None:
try:
return float(litellm_cost)
except (TypeError, ValueError):
pass
from changedetectionio.llm.evaluator import _estimate_cost_usd
return _estimate_cost_usd(model, input_tokens, output_tokens)
def llm_completion(
purpose: str,
*,
watch=None,
datastore=None,
model: str,
messages: list,
api_key: str = None,
api_base: str = None,
timeout: int = None,
max_tokens: int = None,
extra_body: dict = None,
debug: bool = False,
) -> tuple[str, int, int, int]:
"""
Run litellm.completion with pluggy alter/finalize hooks.
Returns (response_text, total_tokens, input_tokens, output_tokens) same as
llm.client.completion for drop-in replacement at call sites.
"""
llm_context = build_llm_context(
purpose,
watch=watch,
datastore=datastore,
model=model,
messages=messages,
api_key=api_key,
api_base=api_base,
timeout=timeout,
max_tokens=max_tokens,
extra_body=extra_body,
debug=debug,
)
llm_context = apply_llm_query_alter(llm_context)
started = time.monotonic()
result = None
error = None
try:
text, total_tokens, input_tokens, output_tokens, metadata = llm_client.completion(
model=llm_context['model'],
messages=llm_context['messages'],
api_key=llm_context.get('api_key'),
api_base=llm_context.get('api_base'),
timeout=llm_context.get('timeout'),
max_tokens=llm_context.get('max_tokens'),
extra_body=llm_context.get('extra_body'),
debug=bool(llm_context.get('debug')),
return_metadata=True,
)
cost_usd = _completion_cost_usd(
llm_context['model'], input_tokens, output_tokens, metadata,
)
result = {
'text': text,
'total_tokens': total_tokens,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'cost_usd': cost_usd,
'litellm_response_cost_usd': (metadata or {}).get('litellm_response_cost_usd'),
'model': llm_context['model'],
'finish_reason': (metadata or {}).get('finish_reason'),
'duration_seconds': time.monotonic() - started,
}
return text, total_tokens, input_tokens, output_tokens
except Exception as e:
error = e
raise
finally:
apply_llm_query_finalize(llm_context, result, error)
+110
View File
@@ -175,6 +175,75 @@ class ChangeDetectionSpec:
"""
pass
@hookspec
def llm_query_alter(llm_context):
"""Modify an LLM request before litellm.completion is called.
Called for every LLM invocation (intent evaluation, change summaries,
restock extraction, connection tests, etc.). Plugins can adjust messages,
model, max_tokens, or other completion kwargs.
Args:
llm_context: dict describing the call. Common keys:
purpose (str): call-site id, e.g. 'evaluate_change', 'summarise_change'
watch (dict|None): watch being processed, when applicable
datastore: ChangeDetectionStore instance, when available
app_guid (str|None): application GUID from datastore
watch_uuid (str|None): watch UUID
timestamp_utc (str): ISO-8601 UTC time when the call started
settings (dict): copy of datastore.data['settings'] when datastore set
model, messages, api_key, api_base, timeout, max_tokens, extra_body, debug
Returns:
dict or None: Keys to merge into llm_context (later plugins see merged state).
Return None to leave the context unchanged.
"""
pass
@hookspec
def llm_query_finalize(llm_context, result, error):
"""Called after each litellm.completion attempt finishes (success or failure).
Use for external accounting (MySQL, Prometheus, billing exports, etc.).
Args:
llm_context: dict describing the call (same object passed to llm_query_alter,
after any plugin merges). Keys always present when built by the app:
purpose (str): call-site id one of:
'evaluate_change', 'summarise_change', 'run_setup',
'preview_extract', 'restock_extract', 'connection_test'
app_guid (str|None): stable application GUID (datastore.data['app_guid'])
watch_uuid (str|None): watch UUID, or None when no watch (e.g. connection test)
timestamp_utc (str): ISO-8601 UTC time when the request started
settings (dict|None): deep copy of datastore.data['settings'] (application,
tags, notification profiles, llm config, etc.)
watch (dict|None): watch dict under processing, when applicable
datastore: ChangeDetectionStore instance, when available
model (str): model string sent to litellm (after alter hooks)
messages (list): chat messages sent to litellm (after alter hooks)
api_key, api_base, timeout, max_tokens, extra_body, debug: completion kwargs
result: dict on success, None on failure:
{
'text': str, # model response body
'total_tokens': int,
'input_tokens': int,
'output_tokens': int,
'cost_usd': float, # litellm response cost if reported,
# else litellm cost_per_token estimate
'litellm_response_cost_usd': float|None, # provider-reported only
'model': str,
'finish_reason': str|None, # e.g. 'stop', 'length'
'duration_seconds': float, # wall time for the completion call
}
error: Exception instance if the call failed, else None
Returns:
None
"""
pass
@hookspec
def get_html_head_extras():
"""Return HTML to inject into the <head> of every page via base.html.
@@ -691,6 +760,47 @@ def apply_update_finalize(update_handler, watch, datastore, processing_exception
logger.exception(f"update_finalize hook exception details:")
_LLM_CONTEXT_KEYS = frozenset({
'model', 'messages', 'api_key', 'api_base', 'timeout', 'max_tokens', 'extra_body', 'debug',
})
def apply_llm_query_alter(llm_context: dict) -> dict:
"""Apply llm_query_alter hooks; merge plugin overrides into the call context."""
current = dict(llm_context)
try:
results = plugin_manager.hook.llm_query_alter(llm_context=current)
except Exception as e:
logger.error(f"Error in llm_query_alter hook: {e}")
logger.exception("llm_query_alter hook exception details:")
return current
if results:
for result in results:
if result and isinstance(result, dict):
for key, value in result.items():
if key in _LLM_CONTEXT_KEYS or key in current:
current[key] = value
logger.debug(
f"LLM query altered by plugin (purpose={current.get('purpose')!r} "
f"watch={current.get('watch_uuid')!r})"
)
return current
def apply_llm_query_finalize(llm_context: dict, result: dict | None, error: Exception | None) -> None:
"""Apply llm_query_finalize hooks from all plugins."""
try:
plugin_manager.hook.llm_query_finalize(
llm_context=llm_context,
result=result,
error=error,
)
except Exception as e:
logger.error(f"Error in llm_query_finalize hook: {e}")
logger.exception("llm_query_finalize hook exception details:")
def collect_html_head_extras():
"""Collect and combine HTML head extras from all plugins.
@@ -204,7 +204,7 @@ def get_itemprop_availability_override(content, fetcher_name, fetcher_instance,
try:
from changedetectionio.llm.evaluator import _runtime_llm_config, accumulate_global_tokens
from changedetectionio.llm import client as llm_client
from changedetectionio.llm.invocation import llm_completion
except ImportError as e:
logger.debug(f"LLM restock fallback: LLM libraries not available ({e})")
return None
@@ -229,7 +229,10 @@ def get_itemprop_availability_override(content, fetcher_name, fetcher_instance,
user_prompt += f'\n\nUser notification intent: {llm_intent}'
try:
raw, tokens, input_tokens, output_tokens = llm_client.completion(
raw, tokens, input_tokens, output_tokens = llm_completion(
'restock_extract',
watch=None,
datastore=datastore,
model=llm_cfg['model'],
messages=[
{'role': 'system', 'content': SYSTEM_PROMPT},
@@ -0,0 +1,136 @@
"""Tests for llm_query_alter and llm_query_finalize pluggy hooks."""
import pytest
from changedetectionio.pluggy_interface import hookimpl, plugin_manager
class _AlterPlugin:
@hookimpl
def llm_query_alter(self, llm_context):
messages = list(llm_context.get('messages') or [])
if messages:
messages[-1] = dict(messages[-1])
messages[-1]['content'] = (messages[-1].get('content') or '') + ' [altered]'
return {'messages': messages, 'max_tokens': 99}
class _FinalizePlugin:
def __init__(self):
self.calls = []
@hookimpl
def llm_query_finalize(self, llm_context, result, error):
self.calls.append({
'purpose': llm_context.get('purpose'),
'app_guid': llm_context.get('app_guid'),
'watch_uuid': llm_context.get('watch_uuid'),
'result': result,
'error': error,
})
@pytest.fixture
def alter_plugin():
plugin_manager.register(_AlterPlugin(), name='test_llm_alter')
yield
plugin_manager.unregister(name='test_llm_alter')
@pytest.fixture
def finalize_plugin():
plugin = _FinalizePlugin()
plugin_manager.register(plugin, name='test_llm_finalize')
yield plugin
plugin_manager.unregister(name='test_llm_finalize')
def test_llm_query_alter_modifies_messages(client, live_server, measure_memory_usage, datastore_path, alter_plugin, monkeypatch):
from changedetectionio.llm import invocation as inv
captured = {}
def fake_completion(**kwargs):
captured.update(kwargs)
return 'ok', 10, 6, 4, {'finish_reason': 'stop'}
monkeypatch.setattr(inv.llm_client, 'completion', fake_completion)
ds = client.application.config.get('DATASTORE')
uuid = ds.add_watch(url='http://example.com', extras={'title': 'Hook test'})
watch = ds.data['watching'][uuid]
text, total, inp, out = inv.llm_completion(
'test_purpose',
watch=watch,
datastore=ds,
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': 'hello'}],
)
assert text == 'ok'
assert total == 10
assert '[altered]' in captured['messages'][-1]['content']
assert captured['max_tokens'] == 99
def test_llm_query_finalize_receives_context_and_result(
client, live_server, measure_memory_usage, datastore_path, finalize_plugin, monkeypatch):
from changedetectionio.llm import invocation as inv
def fake_completion(**kwargs):
return 'done', 42, 30, 12, {
'finish_reason': 'stop',
'litellm_response_cost_usd': 0.00123,
}
monkeypatch.setattr(inv.llm_client, 'completion', fake_completion)
ds = client.application.config.get('DATASTORE')
uuid = ds.add_watch(url='http://example.com', extras={'title': 'Finalize test'})
watch = ds.data['watching'][uuid]
app_guid = ds.data.get('app_guid')
inv.llm_completion(
'evaluate_change',
watch=watch,
datastore=ds,
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': 'ping'}],
)
assert len(finalize_plugin.calls) == 1
call = finalize_plugin.calls[0]
assert call['purpose'] == 'evaluate_change'
assert call['app_guid'] == app_guid
assert call['watch_uuid'] == uuid
assert call['error'] is None
assert call['result']['total_tokens'] == 42
assert call['result']['input_tokens'] == 30
assert call['result']['output_tokens'] == 12
assert call['result']['cost_usd'] > 0
assert call['result']['litellm_response_cost_usd'] == 0.00123
def test_llm_query_finalize_on_error(
client, live_server, measure_memory_usage, datastore_path, finalize_plugin, monkeypatch):
from changedetectionio.llm import invocation as inv
def fake_completion(**kwargs):
raise RuntimeError('provider down')
monkeypatch.setattr(inv.llm_client, 'completion', fake_completion)
ds = client.application.config.get('DATASTORE')
with pytest.raises(RuntimeError, match='provider down'):
inv.llm_completion(
'connection_test',
watch=None,
datastore=ds,
model='gpt-4o-mini',
messages=[{'role': 'user', 'content': 'x'}],
)
assert len(finalize_plugin.calls) == 1
assert finalize_plugin.calls[0]['result'] is None
assert str(finalize_plugin.calls[0]['error']) == 'provider down'