mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-05-25 11:01:23 +00:00
664 lines
27 KiB
Python
664 lines
27 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Security tests: LLM API key must never appear in any API response.
|
|
|
|
The LLM API key is a secret credential stored in
|
|
datastore.data['settings']['application']['llm']['api_key'].
|
|
It must never be leaked through any API endpoint — watch GET/list,
|
|
tag GET/list, system-info, notifications — even when the calling client
|
|
has a valid API token (which is a different kind of credential).
|
|
|
|
These tests set a recognisable fake key and then exhaustively check every
|
|
API endpoint's response body for the key string.
|
|
"""
|
|
|
|
import json
|
|
|
|
from flask import url_for
|
|
|
|
from changedetectionio.tests.util import live_server_setup, delete_all_watches
|
|
|
|
CANARY_KEY = 'sk-CANARY-SECRET-DO-NOT-EXPOSE-12345'
|
|
|
|
|
|
def _configure_llm(datastore, api_key=CANARY_KEY):
|
|
"""Inject a recognisable API key into the datastore LLM settings."""
|
|
app = datastore.data['settings']['application']
|
|
if 'llm' not in app:
|
|
app['llm'] = {}
|
|
app['llm'].update({
|
|
'model': 'gpt-4o-mini',
|
|
'api_key': api_key,
|
|
})
|
|
|
|
|
|
def _api_token(client):
|
|
return client.application.config.get('DATASTORE').data['settings']['application'].get('api_access_token')
|
|
|
|
|
|
def _key_in_response(response, key=CANARY_KEY) -> bool:
|
|
"""Return True if the canary key appears anywhere in the response body."""
|
|
body = response.data.decode('utf-8', errors='replace')
|
|
return key in body
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Watch endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_watch_get_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/watch/<uuid> must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
'/api/v1/watch',
|
|
data=json.dumps({'url': test_url}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_token},
|
|
follow_redirects=True,
|
|
)
|
|
assert res.status_code == 201
|
|
uuid = res.json.get('uuid')
|
|
|
|
res = client.get(
|
|
f'/api/v1/watch/{uuid}',
|
|
headers={'x-api-key': api_token},
|
|
)
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/watch/<uuid> response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_watch_list_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/watches must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
client.post(
|
|
'/api/v1/watch',
|
|
data=json.dumps({'url': test_url}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_token},
|
|
follow_redirects=True,
|
|
)
|
|
|
|
res = client.get('/api/v1/watch', headers={'x-api-key': api_token})
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/watch (list) response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_watch_put_response_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""PUT /api/v1/watch/<uuid> response must not echo back the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
'/api/v1/watch',
|
|
data=json.dumps({'url': test_url}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_token},
|
|
follow_redirects=True,
|
|
)
|
|
assert res.status_code == 201
|
|
uuid = res.json.get('uuid')
|
|
|
|
res = client.put(
|
|
f'/api/v1/watch/{uuid}',
|
|
headers={'x-api-key': api_token, 'content-type': 'application/json'},
|
|
data=json.dumps({'url': test_url, 'title': 'updated'}),
|
|
)
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in PUT /api/v1/watch/<uuid> response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Tag endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_tag_get_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/tag/<uuid> must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
tag_uuid = ds.add_tag('security-test-tag')
|
|
|
|
res = client.get(
|
|
f'/api/v1/tag/{tag_uuid}',
|
|
headers={'x-api-key': api_token},
|
|
)
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/tag/<uuid> response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_tag_list_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/tags must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
res = client.get('/api/v1/tags', headers={'x-api-key': api_token})
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/tags response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System / global endpoints
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_system_info_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/systeminfo must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
res = client.get('/api/v1/systeminfo', headers={'x-api-key': api_token})
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/systeminfo response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_notifications_api_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET/POST/PUT /api/v1/notifications must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
# GET
|
|
res = client.get('/api/v1/notifications', headers={'x-api-key': api_token})
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/notifications response"
|
|
|
|
# POST — add a notification URL; response must not echo back LLM config
|
|
res = client.post(
|
|
'/api/v1/notifications',
|
|
headers={'x-api-key': api_token, 'content-type': 'application/json'},
|
|
data=json.dumps({'notification_urls': ['json://localhost/']}),
|
|
)
|
|
assert res.status_code in (200, 201, 400) # 400 if URL invalid on server; still no key
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in POST /api/v1/notifications response"
|
|
|
|
# PUT — replace notification URLs; response must not include LLM config
|
|
res = client.put(
|
|
'/api/v1/notifications',
|
|
headers={'x-api-key': api_token, 'content-type': 'application/json'},
|
|
data=json.dumps({'notification_urls': ['json://localhost/']}),
|
|
)
|
|
assert res.status_code in (200, 201, 400)
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in PUT /api/v1/notifications response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_search_api_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""GET /api/v1/search must not contain the LLM API key."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
client.post(
|
|
'/api/v1/watch',
|
|
data=json.dumps({'url': test_url}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_token},
|
|
follow_redirects=True,
|
|
)
|
|
|
|
res = client.get('/api/v1/search?q=endpoint', headers={'x-api-key': api_token})
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/search response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_openapi_spec_does_not_expose_llm_api_key(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
GET /api/v1/full-spec returns the static OpenAPI schema YAML.
|
|
It must not embed any runtime secrets (LLM API key).
|
|
"""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
# Spec endpoint has no auth requirement, but test with and without key
|
|
res = client.get('/api/v1/full-spec')
|
|
assert res.status_code == 200
|
|
assert not _key_in_response(res), \
|
|
"LLM API key leaked in GET /api/v1/full-spec response"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_no_api_settings_endpoint_exists(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
There is currently no /api/v1/settings endpoint.
|
|
If one is added in the future it must be covered by its own
|
|
security tests before reaching production.
|
|
This test acts as a canary — it should FAIL if a settings endpoint
|
|
is accidentally wired up without review.
|
|
"""
|
|
api_token = _api_token(client)
|
|
|
|
# GET and POST to /api/v1/settings must not succeed — no settings endpoint exists.
|
|
# 404 = route not found; 405 = route exists for some methods but not this one.
|
|
# Either means there is no working read/write settings endpoint.
|
|
# A 200/201/400 would indicate a real endpoint was wired up.
|
|
res_get = client.get('/api/v1/settings', headers={'x-api-key': api_token})
|
|
assert res_get.status_code in (404, 405), \
|
|
(f"Unexpected /api/v1/settings GET returned {res_get.status_code}. "
|
|
"A settings endpoint must have explicit LLM key security tests before shipping.")
|
|
|
|
res_post = client.post(
|
|
'/api/v1/settings',
|
|
headers={'x-api-key': api_token, 'content-type': 'application/json'},
|
|
data=json.dumps({}),
|
|
)
|
|
assert res_post.status_code in (404, 405), \
|
|
(f"Unexpected /api/v1/settings POST returned {res_post.status_code}. "
|
|
"A settings endpoint must have explicit LLM key security tests before shipping.")
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Settings HTML page — key must not appear in the form source HTML
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_settings_page_does_not_render_llm_api_key_in_plaintext(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
The settings page renders the API key form. Because the field uses
|
|
PasswordField, WTForms must NOT embed the current key value in the HTML
|
|
(PasswordField intentionally omits the value attribute for security).
|
|
"""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
|
|
res = client.get(url_for('settings.settings_page'))
|
|
assert res.status_code == 200
|
|
body = res.data.decode('utf-8', errors='replace')
|
|
assert CANARY_KEY not in body, \
|
|
"LLM API key appeared in plaintext in the settings page HTML source. " \
|
|
"The llm_api_key field must be a PasswordField so the value is never rendered."
|
|
|
|
|
|
def test_settings_form_preserves_api_key_when_submitted_blank(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""
|
|
When the settings form is saved with an empty llm_api_key (which happens
|
|
every time because PasswordField never pre-populates), the existing key
|
|
must be preserved rather than cleared.
|
|
"""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds, api_key='sk-should-be-kept')
|
|
|
|
res = client.post(
|
|
url_for('settings.settings_page'),
|
|
data={
|
|
'llm-llm_model': 'gpt-4o',
|
|
'llm-llm_api_key': '', # blank — PasswordField behaviour
|
|
'llm-llm_api_base': '',
|
|
'application-pager_size': '50',
|
|
'application-notification_format': 'System default',
|
|
'requests-time_between_check-days': '0',
|
|
'requests-time_between_check-hours': '0',
|
|
'requests-time_between_check-minutes': '5',
|
|
'requests-time_between_check-seconds': '0',
|
|
'requests-time_between_check-weeks': '0',
|
|
'requests-workers': '10',
|
|
'requests-timeout': '60',
|
|
},
|
|
follow_redirects=True,
|
|
)
|
|
assert res.status_code == 200
|
|
|
|
saved_key = ds.data['settings']['application'].get('llm', {}).get('api_key', '')
|
|
assert saved_key == 'sk-should-be-kept', \
|
|
f"Blank PasswordField submission must not clear the existing API key (got '{saved_key}')"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# SSRF — api_base must reject private/loopback/reserved hosts (GHSA-jrxm-qjfh-g54f)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Hosts that is_private_hostname() must classify as restricted.
|
|
# 169.254.169.254 is the cloud metadata service (AWS/GCP IMDSv1).
|
|
_SSRF_PRIVATE_HOSTS = [
|
|
'http://127.0.0.1:6379',
|
|
'http://localhost:11434',
|
|
'http://10.0.0.5:8080',
|
|
'http://192.168.1.1',
|
|
'http://169.254.169.254',
|
|
]
|
|
|
|
|
|
def test_llm_models_endpoint_blocks_private_api_base(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""GET /settings/llm/models must refuse api_base pointing at private/loopback
|
|
hosts and must never reach litellm."""
|
|
# Default state — protection ON
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
|
|
for bad in _SSRF_PRIVATE_HOSTS:
|
|
res = client.get(
|
|
url_for('settings.llm.llm_get_models'),
|
|
query_string={'provider': 'openai_compatible', 'api_base': bad},
|
|
)
|
|
assert res.status_code == 400, \
|
|
f"api_base={bad!r} should have been rejected by SSRF guard"
|
|
body = res.get_json()
|
|
assert body['models'] == []
|
|
assert 'ALLOW_IANA_RESTRICTED_ADDRESSES' in body['error'], \
|
|
f"Error message should mention the env-var bypass: {body['error']!r}"
|
|
# The raw attacker-controlled api_base must never be reflected back
|
|
# (avoids XSS when JS renders the error into the DOM).
|
|
assert bad not in body['error']
|
|
|
|
|
|
def test_llm_test_endpoint_blocks_private_api_base(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""GET /settings/llm/test must refuse api_base pointing at private/loopback
|
|
hosts and must never reach litellm.completion()."""
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
|
|
for bad in _SSRF_PRIVATE_HOSTS:
|
|
res = client.get(
|
|
url_for('settings.llm.llm_test'),
|
|
query_string={'model': 'openai/gpt-4', 'api_base': bad},
|
|
)
|
|
assert res.status_code == 400, \
|
|
f"api_base={bad!r} should have been rejected by SSRF guard"
|
|
body = res.get_json()
|
|
assert body['ok'] is False
|
|
assert 'ALLOW_IANA_RESTRICTED_ADDRESSES' in body['error']
|
|
assert bad not in body['error']
|
|
|
|
|
|
def test_llm_endpoints_allow_api_base_when_iana_bypass_enabled(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""When ALLOW_IANA_RESTRICTED_ADDRESSES=true the SSRF guard is bypassed so
|
|
operators can intentionally point at a local Ollama / vLLM endpoint.
|
|
We patch litellm so the test doesn't actually need a live model server —
|
|
we only need to confirm the guard didn't short-circuit."""
|
|
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'true')
|
|
|
|
# Stub get_valid_models so the call returns successfully without network.
|
|
import litellm
|
|
monkeypatch.setattr(litellm, 'get_valid_models',
|
|
lambda **kwargs: ['llama3.2'])
|
|
|
|
# Supply api_key explicitly so we aren't tripped by the credential-exfil
|
|
# guard (which refuses to substitute the stored key for a non-stored api_base).
|
|
res = client.get(
|
|
url_for('settings.llm.llm_get_models'),
|
|
query_string={'provider': 'openai_compatible',
|
|
'api_base': 'http://127.0.0.1:11434',
|
|
'api_key': 'sk-test-explicit'},
|
|
)
|
|
assert res.status_code == 200, \
|
|
"With ALLOW_IANA_RESTRICTED_ADDRESSES=true, private api_base must be allowed"
|
|
body = res.get_json()
|
|
assert body['error'] is None
|
|
assert body['models'], "Stubbed model list should be returned"
|
|
|
|
|
|
def test_settings_form_rejects_private_api_base(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""The globalSettingsLLMForm validator must block private api_base values
|
|
when ALLOW_IANA_RESTRICTED_ADDRESSES is not set, and must NOT persist them
|
|
to the datastore."""
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
|
|
ds = client.application.config.get('DATASTORE')
|
|
# Make sure no stale api_base exists from previous tests.
|
|
ds.data['settings']['application'].pop('llm', None)
|
|
|
|
res = client.post(
|
|
url_for('settings.settings_page'),
|
|
data={
|
|
'llm-llm_model': 'gpt-4o',
|
|
'llm-llm_api_key': '',
|
|
'llm-llm_api_base': 'http://127.0.0.1:11434',
|
|
'application-pager_size': '50',
|
|
'application-notification_format': 'System default',
|
|
'requests-time_between_check-days': '0',
|
|
'requests-time_between_check-hours': '0',
|
|
'requests-time_between_check-minutes': '5',
|
|
'requests-time_between_check-seconds': '0',
|
|
'requests-time_between_check-weeks': '0',
|
|
'requests-workers': '10',
|
|
'requests-timeout': '60',
|
|
},
|
|
follow_redirects=True,
|
|
)
|
|
# Form re-renders with the validation error — page itself returns 200.
|
|
assert res.status_code == 200
|
|
body = res.data.decode('utf-8', errors='replace')
|
|
assert 'ALLOW_IANA_RESTRICTED_ADDRESSES' in body, \
|
|
"Settings page should surface the SSRF guard's bypass-env-var hint"
|
|
|
|
saved = ds.data['settings']['application'].get('llm', {}).get('api_base', '')
|
|
assert saved != 'http://127.0.0.1:11434', \
|
|
f"Private api_base must not have been persisted (got {saved!r})"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Credential exfiltration — stored api_key must NOT be auto-substituted when
|
|
# the caller points api_base at a different (potentially attacker-controlled)
|
|
# endpoint. GHSA-g36r-fm2p-87xm.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_llm_models_refuses_to_leak_stored_key_to_different_api_base(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""If the request supplies an api_base that differs from the saved one but
|
|
omits api_key, the endpoint must refuse — otherwise CSRF can ship the
|
|
stored Authorization: Bearer <key> to an attacker-controlled URL."""
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds) # stores CANARY_KEY, leaves api_base unset
|
|
|
|
# Patch litellm.get_valid_models so that if the guard ever lets us through
|
|
# we'd see it called — and we can assert it wasn't.
|
|
import litellm
|
|
calls = []
|
|
monkeypatch.setattr(litellm, 'get_valid_models',
|
|
lambda **kwargs: calls.append(kwargs) or [])
|
|
|
|
res = client.get(
|
|
url_for('settings.llm.llm_get_models'),
|
|
query_string={
|
|
'provider': 'openai',
|
|
'api_base': 'https://attacker.example/v1',
|
|
# api_key intentionally omitted — this is the CSRF case
|
|
},
|
|
)
|
|
assert res.status_code == 400, \
|
|
"Endpoint should refuse to substitute stored key to a mismatched api_base"
|
|
body = res.get_json()
|
|
assert 'api_key' in body['error'], \
|
|
f"Error should call out that api_key is required: {body['error']!r}"
|
|
assert calls == [], "litellm must not have been invoked at all"
|
|
|
|
|
|
def test_llm_test_refuses_to_leak_stored_key_to_different_api_base(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""Same guard on /settings/llm/test — attacker-supplied api_base + missing
|
|
api_key must not result in the stored key being sent to that URL."""
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds) # stores CANARY_KEY, no stored api_base
|
|
|
|
calls = []
|
|
# Patch the completion wrapper so we'd notice if litellm were invoked.
|
|
import changedetectionio.llm.client as llm_client
|
|
monkeypatch.setattr(llm_client, 'completion',
|
|
lambda **kw: calls.append(kw) or ('', 0, 0, 0))
|
|
|
|
res = client.get(
|
|
url_for('settings.llm.llm_test'),
|
|
query_string={
|
|
'model': 'gpt-4o-mini',
|
|
'api_base': 'https://attacker.example/v1',
|
|
# api_key intentionally omitted
|
|
},
|
|
)
|
|
assert res.status_code == 400
|
|
body = res.get_json()
|
|
assert body['ok'] is False
|
|
assert 'api_key' in body['error']
|
|
assert calls == [], "completion() must not have been invoked"
|
|
|
|
|
|
def test_llm_models_allows_stored_key_when_api_base_matches_saved(
|
|
client, live_server, measure_memory_usage, datastore_path, monkeypatch):
|
|
"""Regression: the legit UI flow (test saved config without retyping the key)
|
|
must still work — i.e. when request api_base matches the stored api_base,
|
|
the stored key IS substituted."""
|
|
monkeypatch.delenv('ALLOW_IANA_RESTRICTED_ADDRESSES', raising=False)
|
|
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'true') # so localhost passes SSRF
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
ds.data['settings']['application']['llm']['api_base'] = 'http://localhost:11434'
|
|
|
|
received = []
|
|
import litellm
|
|
monkeypatch.setattr(litellm, 'get_valid_models',
|
|
lambda **kwargs: (received.append(kwargs), ['llama3.2'])[1])
|
|
|
|
res = client.get(
|
|
url_for('settings.llm.llm_get_models'),
|
|
query_string={
|
|
'provider': 'openai_compatible',
|
|
'api_base': 'http://localhost:11434', # matches saved
|
|
# api_key omitted — should fall back to stored CANARY_KEY
|
|
},
|
|
)
|
|
assert res.status_code == 200, res.get_json()
|
|
assert received and received[0].get('api_key') == CANARY_KEY, \
|
|
"When api_base matches saved, the stored api_key should be used"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CSRF — /clear and /clear-summary-cache must not mutate state on GET
|
|
# (GHSA-g36r-fm2p-87xm). The <img src=...> CSRF vector relies on GET firing the
|
|
# mutation; the production guard is "POST only + Flask-WTF CSRF token". The
|
|
# test config disables WTF_CSRF_ENABLED, so we verify the GET vector by
|
|
# asserting the mutation didn't happen, and verify POST routing by exercising
|
|
# the legit confirm-then-POST flow.
|
|
#
|
|
# NB: the app registers a catch-all '/<path:filename>' static route, which
|
|
# intercepts any GET that isn't claimed by a method-matching rule and returns
|
|
# 404 — so we can't simply assert on status code. The behaviour test below is
|
|
# the actual security property.
|
|
# ---------------------------------------------------------------------------
|
|
|
|
def test_llm_clear_get_does_not_wipe_config(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""The CSRF surface is GET → mutation. After this fix the endpoint is
|
|
POST-only, so a GET must leave LLM config intact."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
assert ds.data['settings']['application'].get('llm', {}).get('api_key') == CANARY_KEY
|
|
|
|
client.get(url_for('settings.llm.llm_clear'))
|
|
|
|
# Mutation must not have happened — that's what defeats <img src=...> CSRF.
|
|
assert ds.data['settings']['application'].get('llm', {}).get('api_key') == CANARY_KEY, \
|
|
"GET /settings/llm/clear must not wipe LLM config (CSRF guard)"
|
|
|
|
|
|
def test_llm_clear_summary_cache_get_does_not_wipe_cache(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""Same property for the cache wipe endpoint — GET must not delete the
|
|
change-summary-*.txt files the endpoint targets. To exercise the actual
|
|
deletion path we have to create a real watch (so a real data_dir exists)
|
|
and drop a real change-summary-*.txt inside it. POST should remove it;
|
|
GET must not."""
|
|
import os
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
api_token = _api_token(client)
|
|
|
|
# Create a real watch — required to exercise llm_clear_summary_cache's
|
|
# iteration over datastore.data['watching'].values().
|
|
test_url = url_for('test_endpoint', _external=True)
|
|
res = client.post(
|
|
'/api/v1/watch',
|
|
data=json.dumps({'url': test_url}),
|
|
headers={'content-type': 'application/json', 'x-api-key': api_token},
|
|
follow_redirects=True,
|
|
)
|
|
assert res.status_code == 201
|
|
uuid = res.json.get('uuid')
|
|
|
|
watch = ds.data['watching'][uuid]
|
|
data_dir = watch.data_dir
|
|
assert data_dir, "Watch must have a data_dir for this test to be meaningful"
|
|
os.makedirs(data_dir, exist_ok=True)
|
|
|
|
summary_file = os.path.join(data_dir, 'change-summary-csrf-canary.txt')
|
|
with open(summary_file, 'w') as f:
|
|
f.write('do-not-delete-via-GET')
|
|
|
|
# GET must NOT trigger the wipe — this is the CSRF surface that was open
|
|
# via <img src="/settings/llm/clear-summary-cache">.
|
|
client.get(url_for('settings.llm.llm_clear_summary_cache'))
|
|
assert os.path.exists(summary_file), \
|
|
"GET on /settings/llm/clear-summary-cache must not invoke the cache wipe"
|
|
|
|
# Sanity check: POST does remove it — confirms our test actually exercises
|
|
# the deletion path the GET test is guarding against.
|
|
client.post(url_for('settings.llm.llm_clear_summary_cache'))
|
|
assert not os.path.exists(summary_file), \
|
|
"POST on /settings/llm/clear-summary-cache should remove change-summary-*.txt"
|
|
|
|
delete_all_watches(client)
|
|
|
|
|
|
def test_llm_clear_via_post_still_works(
|
|
client, live_server, measure_memory_usage, datastore_path):
|
|
"""Confirm the legit confirm-then-POST flow still wipes LLM config."""
|
|
ds = client.application.config.get('DATASTORE')
|
|
_configure_llm(ds)
|
|
assert ds.data['settings']['application'].get('llm', {}).get('api_key') == CANARY_KEY
|
|
|
|
res = client.post(url_for('settings.llm.llm_clear'), follow_redirects=True)
|
|
assert res.status_code == 200
|
|
assert 'llm' not in ds.data['settings']['application']
|