mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-20 06:00:20 +00:00
Compare commits
2 Commits
master
...
3779-autod
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4c1687633d | ||
|
|
87e686ff27 |
@@ -68,17 +68,13 @@ class Watch(Resource):
|
||||
import time
|
||||
from copy import deepcopy
|
||||
watch = None
|
||||
# Retry up to 20 times if dict is being modified
|
||||
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
|
||||
for attempt in range(20):
|
||||
for _ in range(20):
|
||||
try:
|
||||
watch = deepcopy(self.datastore.data['watching'].get(uuid))
|
||||
break
|
||||
except RuntimeError:
|
||||
# Dict changed during deepcopy, retry after yielding to scheduler
|
||||
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
|
||||
if attempt < 19: # Don't yield on last attempt
|
||||
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
|
||||
# Incase dict changed, try again
|
||||
time.sleep(0.01)
|
||||
|
||||
if not watch:
|
||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||
@@ -130,31 +126,17 @@ class Watch(Resource):
|
||||
|
||||
if request.json.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not plist or request.json.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
if not request.json.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(request.json)
|
||||
if validation_error:
|
||||
return validation_error, 400
|
||||
|
||||
# XSS etc protection - validate URL if it's being updated
|
||||
if 'url' in request.json:
|
||||
new_url = request.json.get('url')
|
||||
|
||||
# URL must be a non-empty string
|
||||
if new_url is None:
|
||||
return "URL cannot be null", 400
|
||||
|
||||
if not isinstance(new_url, str):
|
||||
return "URL must be a string", 400
|
||||
|
||||
if not new_url.strip():
|
||||
return "URL cannot be empty or whitespace only", 400
|
||||
|
||||
if not is_safe_valid_url(new_url.strip()):
|
||||
return "Invalid or unsupported URL format. URL must use http://, https://, or ftp:// protocol", 400
|
||||
# XSS etc protection
|
||||
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
|
||||
return "Invalid URL", 400
|
||||
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
from changedetectionio import processors
|
||||
@@ -250,10 +232,6 @@ class WatchSingleHistory(Resource):
|
||||
if timestamp == 'latest':
|
||||
timestamp = list(watch.history.keys())[-1]
|
||||
|
||||
# Validate that the timestamp exists in history
|
||||
if timestamp not in watch.history:
|
||||
abort(404, message=f"No history snapshot found for timestamp '{timestamp}'")
|
||||
|
||||
if request.args.get('html'):
|
||||
content = watch.get_fetched_html(timestamp)
|
||||
if content:
|
||||
@@ -441,9 +419,8 @@ class CreateWatch(Resource):
|
||||
|
||||
if json_data.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not plist or json_data.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
if not json_data.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(json_data)
|
||||
|
||||
@@ -69,19 +69,6 @@
|
||||
}
|
||||
});
|
||||
|
||||
// Handle Enter key in search input
|
||||
if (searchInput) {
|
||||
searchInput.addEventListener('keydown', function(e) {
|
||||
if (e.key === 'Enter') {
|
||||
e.preventDefault();
|
||||
if (searchForm) {
|
||||
// Trigger form submission programmatically
|
||||
searchForm.dispatchEvent(new Event('submit'));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Handle form submission
|
||||
if (searchForm) {
|
||||
searchForm.addEventListener('submit', function(e) {
|
||||
@@ -101,8 +88,8 @@
|
||||
params.append('tags', tags);
|
||||
}
|
||||
|
||||
// Navigate to search results (always redirect to watchlist home)
|
||||
window.location.href = '/?' + params.toString();
|
||||
// Navigate to search results
|
||||
window.location.href = '?' + params.toString();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -348,8 +348,7 @@ class ChangeDetectionStore:
|
||||
r = requests.request(method="GET",
|
||||
url=url,
|
||||
# So we know to return the JSON instead of the human-friendly "help" page
|
||||
headers={'App-Guid': self.__data['app_guid']},
|
||||
timeout=5.0) # 5 second timeout to prevent blocking
|
||||
headers={'App-Guid': self.__data['app_guid']})
|
||||
res = r.json()
|
||||
|
||||
# List of permissible attributes we accept from the wild internet
|
||||
|
||||
@@ -58,7 +58,7 @@ def is_valid_uuid(val):
|
||||
|
||||
|
||||
def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
@@ -506,7 +506,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Enable password check and diff page access bypass
|
||||
@@ -548,172 +548,3 @@ def test_api_conflict_UI_password(client, live_server, measure_memory_usage, dat
|
||||
assert len(res.json)
|
||||
|
||||
|
||||
def test_api_url_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test URL validation for edge cases in both CREATE and UPDATE endpoints.
|
||||
Addresses security issues where empty/null/invalid URLs could bypass validation.
|
||||
|
||||
This test ensures that:
|
||||
- CREATE endpoint rejects null, empty, and invalid URLs
|
||||
- UPDATE endpoint rejects attempts to change URL to null, empty, or invalid
|
||||
- UPDATE endpoint allows updating other fields without touching URL
|
||||
- URL validation properly checks protocol, format, and safety
|
||||
"""
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: CREATE with null URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": None}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with null URL should fail"
|
||||
|
||||
# Test 2: CREATE with empty string URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with empty string URL should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'required' in res.data.lower()
|
||||
|
||||
# Test 3: CREATE with whitespace-only URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": " "}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with whitespace-only URL should fail"
|
||||
|
||||
# Test 4: CREATE with invalid protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "javascript:alert(1)"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with javascript: protocol should fail"
|
||||
|
||||
# Test 5: CREATE with missing protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "example.com"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch without protocol should fail"
|
||||
|
||||
# Test 6: CREATE with valid URL should succeed (baseline)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid URL test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 201, "Creating watch with valid URL should succeed"
|
||||
assert is_valid_uuid(res.json.get('uuid'))
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 7: UPDATE to null URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": None}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to null should fail"
|
||||
# Accept either OpenAPI validation error or our custom validation error
|
||||
assert b'URL cannot be null' in res.data or b'OpenAPI validation failed' in res.data or b'validation error' in res.data.lower()
|
||||
|
||||
# Test 8: UPDATE to empty string URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": ""}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to empty string should fail"
|
||||
# Accept either our custom validation error or OpenAPI/schema validation error
|
||||
assert b'URL cannot be empty' in res.data or b'OpenAPI validation' in res.data or b'Invalid or unsupported URL' in res.data
|
||||
|
||||
# Test 9: UPDATE to whitespace-only URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": " \t\n "}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to whitespace should fail"
|
||||
# Accept either our custom validation error or generic validation error
|
||||
assert b'URL cannot be empty' in res.data or b'Invalid or unsupported URL' in res.data or b'validation' in res.data.lower()
|
||||
|
||||
# Test 10: UPDATE to invalid protocol should fail (javascript:)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "javascript:alert(document.domain)"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to XSS attempt should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'protocol' in res.data.lower()
|
||||
|
||||
# Test 11: UPDATE to file:// protocol should fail (unless ALLOW_FILE_URI is set)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "file:///etc/passwd"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to file:// should fail by default"
|
||||
|
||||
# Test 12: UPDATE other fields without URL should succeed
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"title": "Updated title without URL change"}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating other fields without URL should succeed"
|
||||
|
||||
# Test 13: Verify URL is still valid after non-URL update
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == test_url, "URL should remain unchanged"
|
||||
assert res.json.get('title') == "Updated title without URL change"
|
||||
|
||||
# Test 14: UPDATE to valid different URL should succeed
|
||||
new_valid_url = test_url + "?new=param"
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": new_valid_url}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating to valid different URL should succeed"
|
||||
|
||||
# Test 15: Verify URL was actually updated
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == new_valid_url, "URL should be updated to new valid URL"
|
||||
|
||||
# Test 16: CREATE with XSS in URL parameters should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "http://example.com?xss=<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
# This should fail because of suspicious characters check
|
||||
assert res.status_code == 400, "Creating watch with XSS in URL params should fail"
|
||||
|
||||
# Cleanup
|
||||
client.delete(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -1,805 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive security and edge case tests for the API.
|
||||
Tests critical areas that were identified as gaps in the existing test suite.
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import threading
|
||||
import uuid as uuid_module
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
def set_original_response(datastore_path):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>Which is across multiple lines</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_uuid(val):
|
||||
try:
|
||||
uuid_module.UUID(str(val))
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 1: CRITICAL SECURITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_path_traversal_in_uuids(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that path traversal attacks via UUID parameter are blocked.
|
||||
Addresses CVE-like vulnerabilities where ../../../ in UUID could access arbitrary files.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a valid watch first
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid watch"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
valid_uuid = res.json.get('uuid')
|
||||
|
||||
# Test 1: Path traversal with ../../../
|
||||
res = client.get(
|
||||
f"/api/v1/watch/../../etc/passwd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Path traversal should be rejected"
|
||||
|
||||
# Test 2: Encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/..%2F..%2F..%2Fetc%2Fpasswd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Encoded path traversal should be rejected"
|
||||
|
||||
# Test 3: Double-encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/%2e%2e%2f%2e%2e%2f%2e%2e%2f",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Double-encoded traversal should be rejected"
|
||||
|
||||
# Test 4: Try to access datastore file
|
||||
res = client.get(
|
||||
"/api/v1/watch/../url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Access to datastore should be blocked"
|
||||
|
||||
# Test 5: Null byte injection
|
||||
res = client.get(
|
||||
f"/api/v1/watch/{valid_uuid}%00.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (ignoring null byte) or reject - but not crash
|
||||
assert res.status_code in [200, 400, 404]
|
||||
|
||||
# Test 6: DELETE with path traversal
|
||||
res = client.delete(
|
||||
"/api/v1/watch/../../datastore/url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404, 405], "DELETE with traversal should be blocked (405=method not allowed is also acceptable)"
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=valid_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_injection_via_headers_and_proxy(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that injection attacks via headers and proxy fields are properly sanitized.
|
||||
Addresses XSS and injection vulnerabilities.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: XSS in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {
|
||||
"User-Agent": "<script>alert(1)</script>",
|
||||
"X-Custom": "'; DROP TABLE watches; --"
|
||||
}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Headers are metadata used for HTTP requests, not HTML rendering
|
||||
# Storing them as-is is expected behavior
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
# Verify headers are stored (API returns JSON, not HTML, so no XSS risk)
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Null bytes in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {"X-Test": "value\x00null"}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle null bytes gracefully (reject or sanitize)
|
||||
assert res.status_code in [201, 400]
|
||||
|
||||
# Test 3: Malformed proxy string
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"proxy": "http://evil.com:8080@victim.com"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject invalid proxy format
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Control characters in notification title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_title": "Test\r\nInjected-Header: value"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize control characters
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_large_payload_dos(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that excessively large payloads are rejected to prevent DoS.
|
||||
Addresses memory leak issues found in changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Huge ignore_text array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"ignore_text": ["a" * 10000] * 100 # 1MB of data
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Massive headers object
|
||||
huge_headers = {f"X-Header-{i}": "x" * 1000 for i in range(100)}
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": huge_headers
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or truncate
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Huge browser_steps array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test" * 1000, "optional_value": ""}
|
||||
] * 100
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or limit
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Extremely long title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "x" * 100000 # 100KB title
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject (exceeds maxLength: 5000)
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_utf8_encoding_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test UTF-8 encoding edge cases that have caused bugs on Windows.
|
||||
Addresses 18+ encoding bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Unicode in title (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test 中文 Ελληνικά 日本語 🔥"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
|
||||
# Verify it round-trips correctly
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
assert "中文" in res.json.get('title')
|
||||
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Unicode in URL query parameters
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url + "?search=日本語"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle URL encoding properly
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Null byte in title (should be rejected or sanitized)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test\x00Title"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle gracefully
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: BOM (Byte Order Mark) in title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "\ufeffTest with BOM"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_concurrency_race_conditions(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test concurrent API requests to detect race conditions.
|
||||
Addresses 20+ concurrency bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Concurrency test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Concurrent updates to same watch
|
||||
# Note: Flask test client is not thread-safe, so we test sequential updates instead
|
||||
# Real concurrency issues would be caught in integration tests with actual HTTP requests
|
||||
results = []
|
||||
for i in range(10):
|
||||
try:
|
||||
r = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": f"Title {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
results.append(r.status_code)
|
||||
except Exception as e:
|
||||
results.append(str(e))
|
||||
|
||||
# All updates should succeed (200) without crashes
|
||||
assert all(r == 200 for r in results), f"Some updates failed: {results}"
|
||||
|
||||
# Test 2: Update while watch is being checked
|
||||
# Queue a recheck
|
||||
client.get(
|
||||
url_for("watch", uuid=watch_uuid, recheck=True),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
|
||||
# Immediately update it
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": "Updated during check"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should succeed without error
|
||||
assert res.status_code == 200
|
||||
|
||||
# Test 3: Delete watch that's being processed
|
||||
# Create another watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid2 = res.json.get('uuid')
|
||||
|
||||
# Queue it for checking
|
||||
client.get(url_for("watch", uuid=watch_uuid2, recheck=True), headers={'x-api-key': api_key})
|
||||
|
||||
# Immediately delete it
|
||||
res = client.delete(url_for("watch", uuid=watch_uuid2), headers={'x-api-key': api_key})
|
||||
# Should succeed or return appropriate error
|
||||
assert res.status_code in [204, 404, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 2: IMPORTANT FUNCTIONALITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_time_validation_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test time_between_check validation edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Zero interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 0}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Zero interval should be rejected"
|
||||
|
||||
# Test 2: Negative interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": -100}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Negative interval should be rejected"
|
||||
|
||||
# Test 3: All fields null with use_default=false
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": None, "days": None, "hours": None, "minutes": None, "seconds": None}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "All null intervals should be rejected when not using default"
|
||||
|
||||
# Test 4: Extremely large interval (overflow risk)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": 999999999}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 5: Valid minimal interval (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 60}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_browser_steps_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test browser_steps validation for invalid operations and structures.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Empty browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "", "selector": "", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid as null)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid operation type
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "invalid_operation", "selector": "#test", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (validation happens at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Missing required fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click"} # Missing selector and optional_value
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to schema validation
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Extra fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test", "optional_value": "", "extra_field": "value"}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to additionalProperties: false
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_queue_manipulation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test queue behavior under stress and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Create many watches rapidly
|
||||
watch_uuids = []
|
||||
for i in range(20):
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": f"Watch {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
if res.status_code == 201:
|
||||
watch_uuids.append(res.json.get('uuid'))
|
||||
|
||||
assert len(watch_uuids) == 20, "Should be able to create 20 watches"
|
||||
|
||||
# Test 2: Recheck all when watches exist
|
||||
res = client.get(
|
||||
url_for("createwatch", recheck_all='1'),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
# Should return success (200 or 202 for background processing)
|
||||
assert res.status_code in [200, 202]
|
||||
|
||||
# Test 3: Verify queue doesn't overflow with moderate load
|
||||
# The app has MAX_QUEUE_SIZE = 5000, we're well below that
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Cleanup
|
||||
for uuid in watch_uuids:
|
||||
client.delete(url_for("watch", uuid=uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 3: EDGE CASES & POLISH
|
||||
# ============================================================================
|
||||
|
||||
def test_api_history_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test history API with invalid timestamps and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create watch and generate history
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Get history with invalid timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="invalid"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Invalid timestamp should return 404"
|
||||
|
||||
# Test 2: Future timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="9999999999"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Future timestamp should return 404"
|
||||
|
||||
# Test 3: Negative timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="-1"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Negative timestamp should return 404"
|
||||
|
||||
# Test 4: Diff with reversed timestamps (from > to)
|
||||
# First get actual timestamps
|
||||
res = client.get(
|
||||
url_for("watchhistory", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
if len(res.json) >= 2:
|
||||
timestamps = sorted(res.json.keys())
|
||||
# Try reversed order
|
||||
res = client.get(
|
||||
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp=timestamps[-1], to_timestamp=timestamps[0]),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (show reverse diff) or return error
|
||||
assert res.status_code in [200, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_notification_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test notification configuration edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Invalid notification URL
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": ["invalid://url", "ftp://test.com"]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (apprise validates at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid notification format
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_format": "invalid_format"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected by schema
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 3: Empty notification arrays
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": []
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_tag_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test tag/group API edge cases including XSS and path traversal.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Test 1: Empty tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (empty title)
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 2: XSS in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
# Verify title is stored safely
|
||||
res = client.get(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
# Should be escaped or sanitized
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Path traversal in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "../../etc/passwd"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (it's just a string, not a path)
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Very long tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "x" * 10000}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (exceeds maxLength)
|
||||
assert res.status_code == 400
|
||||
|
||||
|
||||
def test_api_authentication_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test API authentication edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Missing API key
|
||||
res = client.get(url_for("createwatch"))
|
||||
assert res.status_code == 403, "Missing API key should be forbidden"
|
||||
|
||||
# Test 2: Invalid API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "invalid_key_12345"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 3: API key with special characters
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "key<script>alert(1)</script>"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 4: Very long API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "x" * 10000}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 5: Case sensitivity of API key
|
||||
wrong_case_key = api_key.upper() if api_key.islower() else api_key.lower()
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': wrong_case_key}
|
||||
)
|
||||
# Should be forbidden (keys are case-sensitive)
|
||||
assert res.status_code == 403, "Wrong case API key should be forbidden"
|
||||
|
||||
# Test 6: Valid API key should work
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 200, "Valid API key should work"
|
||||
@@ -64,19 +64,6 @@ def is_safe_valid_url(test_url):
|
||||
import re
|
||||
import validators
|
||||
|
||||
# Validate input type first - must be a non-empty string
|
||||
if test_url is None:
|
||||
logger.warning('URL validation failed: URL is None')
|
||||
return False
|
||||
|
||||
if not isinstance(test_url, str):
|
||||
logger.warning(f'URL validation failed: URL must be a string, got {type(test_url).__name__}')
|
||||
return False
|
||||
|
||||
if not test_url.strip():
|
||||
logger.warning('URL validation failed: URL is empty or whitespace only')
|
||||
return False
|
||||
|
||||
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
|
||||
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
|
||||
|
||||
|
||||
@@ -183,30 +183,15 @@ components:
|
||||
properties:
|
||||
weeks:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 52000
|
||||
nullable: true
|
||||
days:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 365000
|
||||
nullable: true
|
||||
hours:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 8760000
|
||||
nullable: true
|
||||
minutes:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 525600000
|
||||
nullable: true
|
||||
seconds:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 31536000000
|
||||
nullable: true
|
||||
description: Time intervals between checks. All fields must be non-negative. At least one non-zero value required when not using default settings.
|
||||
description: Time intervals between checks
|
||||
time_between_check_use_default:
|
||||
type: boolean
|
||||
default: true
|
||||
@@ -215,9 +200,7 @@ components:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
maxLength: 1000
|
||||
maxItems: 100
|
||||
description: Notification URLs for this web page change monitor (watch). Maximum 100 URLs.
|
||||
description: Notification URLs for this web page change monitor (watch)
|
||||
notification_title:
|
||||
type: string
|
||||
description: Custom notification title
|
||||
@@ -241,19 +224,14 @@ components:
|
||||
operation:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
selector:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
optional_value:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
required: [operation, selector, optional_value]
|
||||
additionalProperties: false
|
||||
maxItems: 100
|
||||
description: Browser automation steps. Maximum 100 steps allowed.
|
||||
description: Browser automation steps
|
||||
processor:
|
||||
type: string
|
||||
enum: [restock_diff, text_json_diff]
|
||||
|
||||
Reference in New Issue
Block a user