mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-20 06:00:20 +00:00
Compare commits
2 Commits
API-valida
...
3779-autod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c1687633d | ||
|
|
87e686ff27 |
@@ -68,17 +68,13 @@ class Watch(Resource):
|
||||
import time
|
||||
from copy import deepcopy
|
||||
watch = None
|
||||
# Retry up to 20 times if dict is being modified
|
||||
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
|
||||
for attempt in range(20):
|
||||
for _ in range(20):
|
||||
try:
|
||||
watch = deepcopy(self.datastore.data['watching'].get(uuid))
|
||||
break
|
||||
except RuntimeError:
|
||||
# Dict changed during deepcopy, retry after yielding to scheduler
|
||||
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
|
||||
if attempt < 19: # Don't yield on last attempt
|
||||
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
|
||||
# Incase dict changed, try again
|
||||
time.sleep(0.01)
|
||||
|
||||
if not watch:
|
||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||
@@ -130,31 +126,17 @@ class Watch(Resource):
|
||||
|
||||
if request.json.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not plist or request.json.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
if not request.json.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(request.json)
|
||||
if validation_error:
|
||||
return validation_error, 400
|
||||
|
||||
# XSS etc protection - validate URL if it's being updated
|
||||
if 'url' in request.json:
|
||||
new_url = request.json.get('url')
|
||||
|
||||
# URL must be a non-empty string
|
||||
if new_url is None:
|
||||
return "URL cannot be null", 400
|
||||
|
||||
if not isinstance(new_url, str):
|
||||
return "URL must be a string", 400
|
||||
|
||||
if not new_url.strip():
|
||||
return "URL cannot be empty or whitespace only", 400
|
||||
|
||||
if not is_safe_valid_url(new_url.strip()):
|
||||
return "Invalid or unsupported URL format. URL must use http://, https://, or ftp:// protocol", 400
|
||||
# XSS etc protection
|
||||
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
|
||||
return "Invalid URL", 400
|
||||
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
from changedetectionio import processors
|
||||
@@ -250,10 +232,6 @@ class WatchSingleHistory(Resource):
|
||||
if timestamp == 'latest':
|
||||
timestamp = list(watch.history.keys())[-1]
|
||||
|
||||
# Validate that the timestamp exists in history
|
||||
if timestamp not in watch.history:
|
||||
abort(404, message=f"No history snapshot found for timestamp '{timestamp}'")
|
||||
|
||||
if request.args.get('html'):
|
||||
content = watch.get_fetched_html(timestamp)
|
||||
if content:
|
||||
@@ -441,9 +419,8 @@ class CreateWatch(Resource):
|
||||
|
||||
if json_data.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not plist or json_data.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
if not json_data.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(json_data)
|
||||
|
||||
@@ -400,11 +400,27 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
language_codes = get_language_codes()
|
||||
|
||||
def get_locale():
|
||||
# Locale aliases: map browser language codes to translation directory names
|
||||
# This handles cases where browsers send standard codes (e.g., zh-TW)
|
||||
# but our translations use more specific codes (e.g., zh_Hant_TW)
|
||||
locale_aliases = {
|
||||
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
|
||||
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
|
||||
}
|
||||
|
||||
# 1. Try to get locale from session (user explicitly selected)
|
||||
if 'locale' in session:
|
||||
return session['locale']
|
||||
|
||||
# 2. Fall back to Accept-Language header
|
||||
return request.accept_languages.best_match(language_codes)
|
||||
# Get the best match from browser's Accept-Language header
|
||||
browser_locale = request.accept_languages.best_match(language_codes + list(locale_aliases.keys()))
|
||||
|
||||
# 3. Check if we need to map the browser locale to our internal locale
|
||||
if browser_locale in locale_aliases:
|
||||
return locale_aliases[browser_locale]
|
||||
|
||||
return browser_locale
|
||||
|
||||
# Initialize Babel with locale selector
|
||||
babel = Babel(app, locale_selector=get_locale)
|
||||
|
||||
@@ -348,8 +348,7 @@ class ChangeDetectionStore:
|
||||
r = requests.request(method="GET",
|
||||
url=url,
|
||||
# So we know to return the JSON instead of the human-friendly "help" page
|
||||
headers={'App-Guid': self.__data['app_guid']},
|
||||
timeout=5.0) # 5 second timeout to prevent blocking
|
||||
headers={'App-Guid': self.__data['app_guid']})
|
||||
res = r.json()
|
||||
|
||||
# List of permissible attributes we accept from the wild internet
|
||||
|
||||
@@ -58,7 +58,7 @@ def is_valid_uuid(val):
|
||||
|
||||
|
||||
def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
@@ -506,7 +506,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Enable password check and diff page access bypass
|
||||
@@ -548,172 +548,3 @@ def test_api_conflict_UI_password(client, live_server, measure_memory_usage, dat
|
||||
assert len(res.json)
|
||||
|
||||
|
||||
def test_api_url_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test URL validation for edge cases in both CREATE and UPDATE endpoints.
|
||||
Addresses security issues where empty/null/invalid URLs could bypass validation.
|
||||
|
||||
This test ensures that:
|
||||
- CREATE endpoint rejects null, empty, and invalid URLs
|
||||
- UPDATE endpoint rejects attempts to change URL to null, empty, or invalid
|
||||
- UPDATE endpoint allows updating other fields without touching URL
|
||||
- URL validation properly checks protocol, format, and safety
|
||||
"""
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: CREATE with null URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": None}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with null URL should fail"
|
||||
|
||||
# Test 2: CREATE with empty string URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with empty string URL should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'required' in res.data.lower()
|
||||
|
||||
# Test 3: CREATE with whitespace-only URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": " "}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with whitespace-only URL should fail"
|
||||
|
||||
# Test 4: CREATE with invalid protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "javascript:alert(1)"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with javascript: protocol should fail"
|
||||
|
||||
# Test 5: CREATE with missing protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "example.com"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch without protocol should fail"
|
||||
|
||||
# Test 6: CREATE with valid URL should succeed (baseline)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid URL test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 201, "Creating watch with valid URL should succeed"
|
||||
assert is_valid_uuid(res.json.get('uuid'))
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 7: UPDATE to null URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": None}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to null should fail"
|
||||
# Accept either OpenAPI validation error or our custom validation error
|
||||
assert b'URL cannot be null' in res.data or b'OpenAPI validation failed' in res.data or b'validation error' in res.data.lower()
|
||||
|
||||
# Test 8: UPDATE to empty string URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": ""}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to empty string should fail"
|
||||
# Accept either our custom validation error or OpenAPI/schema validation error
|
||||
assert b'URL cannot be empty' in res.data or b'OpenAPI validation' in res.data or b'Invalid or unsupported URL' in res.data
|
||||
|
||||
# Test 9: UPDATE to whitespace-only URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": " \t\n "}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to whitespace should fail"
|
||||
# Accept either our custom validation error or generic validation error
|
||||
assert b'URL cannot be empty' in res.data or b'Invalid or unsupported URL' in res.data or b'validation' in res.data.lower()
|
||||
|
||||
# Test 10: UPDATE to invalid protocol should fail (javascript:)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "javascript:alert(document.domain)"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to XSS attempt should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'protocol' in res.data.lower()
|
||||
|
||||
# Test 11: UPDATE to file:// protocol should fail (unless ALLOW_FILE_URI is set)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "file:///etc/passwd"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to file:// should fail by default"
|
||||
|
||||
# Test 12: UPDATE other fields without URL should succeed
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"title": "Updated title without URL change"}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating other fields without URL should succeed"
|
||||
|
||||
# Test 13: Verify URL is still valid after non-URL update
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == test_url, "URL should remain unchanged"
|
||||
assert res.json.get('title') == "Updated title without URL change"
|
||||
|
||||
# Test 14: UPDATE to valid different URL should succeed
|
||||
new_valid_url = test_url + "?new=param"
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": new_valid_url}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating to valid different URL should succeed"
|
||||
|
||||
# Test 15: Verify URL was actually updated
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == new_valid_url, "URL should be updated to new valid URL"
|
||||
|
||||
# Test 16: CREATE with XSS in URL parameters should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "http://example.com?xss=<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
# This should fail because of suspicious characters check
|
||||
assert res.status_code == 400, "Creating watch with XSS in URL params should fail"
|
||||
|
||||
# Cleanup
|
||||
client.delete(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -1,805 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive security and edge case tests for the API.
|
||||
Tests critical areas that were identified as gaps in the existing test suite.
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import threading
|
||||
import uuid as uuid_module
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
def set_original_response(datastore_path):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>Which is across multiple lines</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_uuid(val):
|
||||
try:
|
||||
uuid_module.UUID(str(val))
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 1: CRITICAL SECURITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_path_traversal_in_uuids(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that path traversal attacks via UUID parameter are blocked.
|
||||
Addresses CVE-like vulnerabilities where ../../../ in UUID could access arbitrary files.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a valid watch first
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid watch"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
valid_uuid = res.json.get('uuid')
|
||||
|
||||
# Test 1: Path traversal with ../../../
|
||||
res = client.get(
|
||||
f"/api/v1/watch/../../etc/passwd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Path traversal should be rejected"
|
||||
|
||||
# Test 2: Encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/..%2F..%2F..%2Fetc%2Fpasswd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Encoded path traversal should be rejected"
|
||||
|
||||
# Test 3: Double-encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/%2e%2e%2f%2e%2e%2f%2e%2e%2f",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Double-encoded traversal should be rejected"
|
||||
|
||||
# Test 4: Try to access datastore file
|
||||
res = client.get(
|
||||
"/api/v1/watch/../url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Access to datastore should be blocked"
|
||||
|
||||
# Test 5: Null byte injection
|
||||
res = client.get(
|
||||
f"/api/v1/watch/{valid_uuid}%00.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (ignoring null byte) or reject - but not crash
|
||||
assert res.status_code in [200, 400, 404]
|
||||
|
||||
# Test 6: DELETE with path traversal
|
||||
res = client.delete(
|
||||
"/api/v1/watch/../../datastore/url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404, 405], "DELETE with traversal should be blocked (405=method not allowed is also acceptable)"
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=valid_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_injection_via_headers_and_proxy(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that injection attacks via headers and proxy fields are properly sanitized.
|
||||
Addresses XSS and injection vulnerabilities.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: XSS in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {
|
||||
"User-Agent": "<script>alert(1)</script>",
|
||||
"X-Custom": "'; DROP TABLE watches; --"
|
||||
}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Headers are metadata used for HTTP requests, not HTML rendering
|
||||
# Storing them as-is is expected behavior
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
# Verify headers are stored (API returns JSON, not HTML, so no XSS risk)
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Null bytes in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {"X-Test": "value\x00null"}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle null bytes gracefully (reject or sanitize)
|
||||
assert res.status_code in [201, 400]
|
||||
|
||||
# Test 3: Malformed proxy string
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"proxy": "http://evil.com:8080@victim.com"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject invalid proxy format
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Control characters in notification title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_title": "Test\r\nInjected-Header: value"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize control characters
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_large_payload_dos(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that excessively large payloads are rejected to prevent DoS.
|
||||
Addresses memory leak issues found in changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Huge ignore_text array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"ignore_text": ["a" * 10000] * 100 # 1MB of data
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Massive headers object
|
||||
huge_headers = {f"X-Header-{i}": "x" * 1000 for i in range(100)}
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": huge_headers
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or truncate
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Huge browser_steps array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test" * 1000, "optional_value": ""}
|
||||
] * 100
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or limit
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Extremely long title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "x" * 100000 # 100KB title
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject (exceeds maxLength: 5000)
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_utf8_encoding_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test UTF-8 encoding edge cases that have caused bugs on Windows.
|
||||
Addresses 18+ encoding bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Unicode in title (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test 中文 Ελληνικά 日本語 🔥"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
|
||||
# Verify it round-trips correctly
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
assert "中文" in res.json.get('title')
|
||||
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Unicode in URL query parameters
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url + "?search=日本語"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle URL encoding properly
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Null byte in title (should be rejected or sanitized)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test\x00Title"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle gracefully
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: BOM (Byte Order Mark) in title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "\ufeffTest with BOM"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_concurrency_race_conditions(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test concurrent API requests to detect race conditions.
|
||||
Addresses 20+ concurrency bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Concurrency test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Concurrent updates to same watch
|
||||
# Note: Flask test client is not thread-safe, so we test sequential updates instead
|
||||
# Real concurrency issues would be caught in integration tests with actual HTTP requests
|
||||
results = []
|
||||
for i in range(10):
|
||||
try:
|
||||
r = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": f"Title {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
results.append(r.status_code)
|
||||
except Exception as e:
|
||||
results.append(str(e))
|
||||
|
||||
# All updates should succeed (200) without crashes
|
||||
assert all(r == 200 for r in results), f"Some updates failed: {results}"
|
||||
|
||||
# Test 2: Update while watch is being checked
|
||||
# Queue a recheck
|
||||
client.get(
|
||||
url_for("watch", uuid=watch_uuid, recheck=True),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
|
||||
# Immediately update it
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": "Updated during check"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should succeed without error
|
||||
assert res.status_code == 200
|
||||
|
||||
# Test 3: Delete watch that's being processed
|
||||
# Create another watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid2 = res.json.get('uuid')
|
||||
|
||||
# Queue it for checking
|
||||
client.get(url_for("watch", uuid=watch_uuid2, recheck=True), headers={'x-api-key': api_key})
|
||||
|
||||
# Immediately delete it
|
||||
res = client.delete(url_for("watch", uuid=watch_uuid2), headers={'x-api-key': api_key})
|
||||
# Should succeed or return appropriate error
|
||||
assert res.status_code in [204, 404, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 2: IMPORTANT FUNCTIONALITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_time_validation_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test time_between_check validation edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Zero interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 0}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Zero interval should be rejected"
|
||||
|
||||
# Test 2: Negative interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": -100}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Negative interval should be rejected"
|
||||
|
||||
# Test 3: All fields null with use_default=false
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": None, "days": None, "hours": None, "minutes": None, "seconds": None}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "All null intervals should be rejected when not using default"
|
||||
|
||||
# Test 4: Extremely large interval (overflow risk)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": 999999999}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 5: Valid minimal interval (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 60}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_browser_steps_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test browser_steps validation for invalid operations and structures.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Empty browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "", "selector": "", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid as null)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid operation type
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "invalid_operation", "selector": "#test", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (validation happens at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Missing required fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click"} # Missing selector and optional_value
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to schema validation
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Extra fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test", "optional_value": "", "extra_field": "value"}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to additionalProperties: false
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_queue_manipulation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test queue behavior under stress and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Create many watches rapidly
|
||||
watch_uuids = []
|
||||
for i in range(20):
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": f"Watch {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
if res.status_code == 201:
|
||||
watch_uuids.append(res.json.get('uuid'))
|
||||
|
||||
assert len(watch_uuids) == 20, "Should be able to create 20 watches"
|
||||
|
||||
# Test 2: Recheck all when watches exist
|
||||
res = client.get(
|
||||
url_for("createwatch", recheck_all='1'),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
# Should return success (200 or 202 for background processing)
|
||||
assert res.status_code in [200, 202]
|
||||
|
||||
# Test 3: Verify queue doesn't overflow with moderate load
|
||||
# The app has MAX_QUEUE_SIZE = 5000, we're well below that
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Cleanup
|
||||
for uuid in watch_uuids:
|
||||
client.delete(url_for("watch", uuid=uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 3: EDGE CASES & POLISH
|
||||
# ============================================================================
|
||||
|
||||
def test_api_history_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test history API with invalid timestamps and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create watch and generate history
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Get history with invalid timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="invalid"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Invalid timestamp should return 404"
|
||||
|
||||
# Test 2: Future timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="9999999999"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Future timestamp should return 404"
|
||||
|
||||
# Test 3: Negative timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="-1"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Negative timestamp should return 404"
|
||||
|
||||
# Test 4: Diff with reversed timestamps (from > to)
|
||||
# First get actual timestamps
|
||||
res = client.get(
|
||||
url_for("watchhistory", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
if len(res.json) >= 2:
|
||||
timestamps = sorted(res.json.keys())
|
||||
# Try reversed order
|
||||
res = client.get(
|
||||
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp=timestamps[-1], to_timestamp=timestamps[0]),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (show reverse diff) or return error
|
||||
assert res.status_code in [200, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_notification_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test notification configuration edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Invalid notification URL
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": ["invalid://url", "ftp://test.com"]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (apprise validates at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid notification format
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_format": "invalid_format"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected by schema
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 3: Empty notification arrays
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": []
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_tag_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test tag/group API edge cases including XSS and path traversal.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Test 1: Empty tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (empty title)
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 2: XSS in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
# Verify title is stored safely
|
||||
res = client.get(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
# Should be escaped or sanitized
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Path traversal in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "../../etc/passwd"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (it's just a string, not a path)
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Very long tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "x" * 10000}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (exceeds maxLength)
|
||||
assert res.status_code == 400
|
||||
|
||||
|
||||
def test_api_authentication_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test API authentication edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Missing API key
|
||||
res = client.get(url_for("createwatch"))
|
||||
assert res.status_code == 403, "Missing API key should be forbidden"
|
||||
|
||||
# Test 2: Invalid API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "invalid_key_12345"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 3: API key with special characters
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "key<script>alert(1)</script>"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 4: Very long API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "x" * 10000}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 5: Case sensitivity of API key
|
||||
wrong_case_key = api_key.upper() if api_key.islower() else api_key.lower()
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': wrong_case_key}
|
||||
)
|
||||
# Should be forbidden (keys are case-sensitive)
|
||||
assert res.status_code == 403, "Wrong case API key should be forbidden"
|
||||
|
||||
# Test 6: Valid API key should work
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 200, "Valid API key should work"
|
||||
@@ -325,3 +325,274 @@ def test_time_unit_translations(client, live_server, measure_memory_usage, datas
|
||||
assert b"Time Between Check" not in res.data, "Should not have English 'Time Between Check'"
|
||||
assert "Chrome 請求".encode() not in res.data, "Should not have incorrect 'Chrome 請求' (Chrome requests)"
|
||||
assert "使用預設通知".encode() not in res.data, "Should not have incorrect '使用預設通知' (Use default notification)"
|
||||
|
||||
|
||||
def test_accept_language_header_zh_tw(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending zh-TW in Accept-Language header get Traditional Chinese.
|
||||
This tests the locale alias mapping for issue #3779.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Clear any session data to simulate a fresh visitor
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
# Request the index page with zh-TW in Accept-Language header (what browsers send)
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Traditional Chinese content, not Simplified Chinese
|
||||
# Traditional: 選擇語言, Simplified: 选择语言
|
||||
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' (Select Language)"
|
||||
assert '选择语言'.encode() not in res.data, "Should not get Simplified Chinese '选择语言'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format
|
||||
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected BCP 47 language tag zh-Hant-TW in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (Taiwan flag for Traditional Chinese)
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Taiwan flag 'fi fi-tw' for Traditional Chinese"
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show China flag 'fi fi-cn' for Traditional Chinese"
|
||||
|
||||
# Verify we're getting Traditional Chinese text throughout the page
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Traditional Chinese translations (not English)
|
||||
assert "小時".encode() in res.data, "Expected Traditional Chinese '小時' for Hours"
|
||||
assert "分鐘".encode() in res.data, "Expected Traditional Chinese '分鐘' for Minutes"
|
||||
assert b"Hours" not in res.data or "小時".encode() in res.data, "Should have Traditional Chinese, not English"
|
||||
|
||||
|
||||
def test_accept_language_header_en_variants(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending en-GB and en-US in Accept-Language header get the correct English variant.
|
||||
This ensures the locale selector works properly for English variants.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Test 1: British English (en-GB)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en-GB,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format with hyphen
|
||||
assert b'<html lang="en-GB"' in res.data, "Expected BCP 47 language tag en-GB in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (UK flag for en-GB)
|
||||
assert b'<span class="fi fi-gb fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected UK flag 'fi fi-gb' for British English"
|
||||
|
||||
# Test 2: American English (en-US)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en-US,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format with hyphen
|
||||
assert b'<html lang="en-US"' in res.data, "Expected BCP 47 language tag en-US in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (US flag for en-US)
|
||||
assert b'<span class="fi fi-us fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected US flag 'fi fi-us' for American English"
|
||||
|
||||
# Test 3: Generic 'en' should fall back to one of the English variants
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content (either variant is fine)
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
|
||||
def test_accept_language_header_zh_simplified(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending zh or zh-CN in Accept-Language header get Simplified Chinese.
|
||||
This ensures Simplified Chinese still works correctly and doesn't get confused with Traditional.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Test 1: Generic 'zh' should get Simplified Chinese
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Simplified Chinese content, not Traditional Chinese
|
||||
# Simplified: 选择语言, Traditional: 選擇語言
|
||||
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' (Select Language)"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese '選擇語言'"
|
||||
|
||||
# Check HTML lang attribute
|
||||
assert b'<html lang="zh"' in res.data, "Expected language tag zh in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (China flag for Simplified Chinese)
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected China flag 'fi fi-cn' for Simplified Chinese"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show Taiwan flag 'fi fi-tw' for Simplified Chinese"
|
||||
|
||||
# Test 2: 'zh-CN' should also get Simplified Chinese
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Simplified Chinese content
|
||||
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' with zh-CN header"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese with zh-CN header"
|
||||
|
||||
# Check that the correct flag icon is shown (China flag for zh-CN)
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected China flag 'fi fi-cn' for zh-CN header"
|
||||
|
||||
# Verify Simplified Chinese in settings page
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Simplified Chinese translations (not Traditional or English)
|
||||
# Simplified: 小时, Traditional: 小時
|
||||
assert "小时".encode() in res.data, "Expected Simplified Chinese '小时' for Hours"
|
||||
assert "分钟".encode() in res.data, "Expected Simplified Chinese '分钟' for Minutes"
|
||||
assert "秒".encode() in res.data, "Expected Simplified Chinese '秒' for Seconds"
|
||||
# Make sure it's not Traditional Chinese
|
||||
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時'"
|
||||
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘'"
|
||||
|
||||
|
||||
def test_session_locale_overrides_accept_language(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that session locale preference overrides browser Accept-Language header.
|
||||
|
||||
Scenario:
|
||||
1. Browser auto-detects zh-TW (Traditional Chinese) from Accept-Language header
|
||||
2. User explicitly selects Korean language
|
||||
3. On subsequent page loads, Korean should be shown (not Traditional Chinese)
|
||||
even though the Accept-Language header still says zh-TW
|
||||
|
||||
This tests the session override behavior for issue #3779.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Step 1: Clear session and make first request with zh-TW header (auto-detect)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should initially get Traditional Chinese from auto-detect
|
||||
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' from auto-detect"
|
||||
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected zh-Hant-TW language tag"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Taiwan flag 'fi fi-tw' from auto-detect"
|
||||
|
||||
# Step 2: User explicitly selects Korean language
|
||||
res = client.get(
|
||||
url_for("set_language", locale="ko"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Browser still sends zh-TW
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Step 3: Make another request with same zh-TW header
|
||||
# Session should override the Accept-Language header
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still sending zh-TW!
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should now get Korean (session overrides auto-detect)
|
||||
# Korean: 언어 선택, Traditional Chinese: 選擇語言
|
||||
assert '언어 선택'.encode() in res.data, "Expected Korean '언어 선택' (Select Language) from session"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese when Korean is set in session"
|
||||
|
||||
# Check HTML lang attribute is Korean
|
||||
assert b'<html lang="ko"' in res.data, "Expected Korean language tag 'ko' in HTML"
|
||||
|
||||
# Check that Korean flag is shown (not Taiwan flag)
|
||||
assert b'<span class="fi fi-kr fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Korean flag 'fi fi-kr' from session preference"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show Taiwan flag when Korean is set in session"
|
||||
|
||||
# Verify Korean text on settings page as well
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still zh-TW!
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Korean translations (not Traditional Chinese or English)
|
||||
# Korean: 시간 (Hours), 분 (Minutes), 초 (Seconds)
|
||||
# Traditional Chinese: 小時, 分鐘, 秒
|
||||
assert "시간".encode() in res.data, "Expected Korean '시간' for Hours"
|
||||
assert "분".encode() in res.data, "Expected Korean '분' for Minutes"
|
||||
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時' when Korean is set"
|
||||
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘' when Korean is set"
|
||||
|
||||
@@ -64,19 +64,6 @@ def is_safe_valid_url(test_url):
|
||||
import re
|
||||
import validators
|
||||
|
||||
# Validate input type first - must be a non-empty string
|
||||
if test_url is None:
|
||||
logger.warning('URL validation failed: URL is None')
|
||||
return False
|
||||
|
||||
if not isinstance(test_url, str):
|
||||
logger.warning(f'URL validation failed: URL must be a string, got {type(test_url).__name__}')
|
||||
return False
|
||||
|
||||
if not test_url.strip():
|
||||
logger.warning('URL validation failed: URL is empty or whitespace only')
|
||||
return False
|
||||
|
||||
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
|
||||
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
|
||||
|
||||
|
||||
@@ -183,30 +183,15 @@ components:
|
||||
properties:
|
||||
weeks:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 52000
|
||||
nullable: true
|
||||
days:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 365000
|
||||
nullable: true
|
||||
hours:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 8760000
|
||||
nullable: true
|
||||
minutes:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 525600000
|
||||
nullable: true
|
||||
seconds:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 31536000000
|
||||
nullable: true
|
||||
description: Time intervals between checks. All fields must be non-negative. At least one non-zero value required when not using default settings.
|
||||
description: Time intervals between checks
|
||||
time_between_check_use_default:
|
||||
type: boolean
|
||||
default: true
|
||||
@@ -215,9 +200,7 @@ components:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
maxLength: 1000
|
||||
maxItems: 100
|
||||
description: Notification URLs for this web page change monitor (watch). Maximum 100 URLs.
|
||||
description: Notification URLs for this web page change monitor (watch)
|
||||
notification_title:
|
||||
type: string
|
||||
description: Custom notification title
|
||||
@@ -241,19 +224,14 @@ components:
|
||||
operation:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
selector:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
optional_value:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
required: [operation, selector, optional_value]
|
||||
additionalProperties: false
|
||||
maxItems: 100
|
||||
description: Browser automation steps. Maximum 100 steps allowed.
|
||||
description: Browser automation steps
|
||||
processor:
|
||||
type: string
|
||||
enum: [restock_diff, text_json_diff]
|
||||
|
||||
Reference in New Issue
Block a user