mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-20 22:20:47 +00:00
Compare commits
7 Commits
0.52.7
...
memory-imp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbdd993c04 | ||
|
|
499f480950 | ||
|
|
b2e75c312e | ||
|
|
103e2d879b | ||
|
|
151e603af7 | ||
|
|
7311af4b58 | ||
|
|
af193e8d7a |
@@ -16,6 +16,7 @@ recursive-include changedetectionio/widgets *
|
||||
prune changedetectionio/static/package-lock.json
|
||||
prune changedetectionio/static/styles/node_modules
|
||||
prune changedetectionio/static/styles/package-lock.json
|
||||
include changedetectionio/favicon_utils.py
|
||||
include changedetection.py
|
||||
include requirements.txt
|
||||
include README-pip.md
|
||||
|
||||
@@ -2,6 +2,7 @@ import os
|
||||
import threading
|
||||
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
from changedetectionio.favicon_utils import get_favicon_mime_type
|
||||
|
||||
from . import auth
|
||||
from changedetectionio import queuedWatchMetaData, strtobool
|
||||
@@ -68,13 +69,17 @@ class Watch(Resource):
|
||||
import time
|
||||
from copy import deepcopy
|
||||
watch = None
|
||||
for _ in range(20):
|
||||
# Retry up to 20 times if dict is being modified
|
||||
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
|
||||
for attempt in range(20):
|
||||
try:
|
||||
watch = deepcopy(self.datastore.data['watching'].get(uuid))
|
||||
break
|
||||
except RuntimeError:
|
||||
# Incase dict changed, try again
|
||||
time.sleep(0.01)
|
||||
# Dict changed during deepcopy, retry after yielding to scheduler
|
||||
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
|
||||
if attempt < 19: # Don't yield on last attempt
|
||||
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
|
||||
|
||||
if not watch:
|
||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||
@@ -126,17 +131,31 @@ class Watch(Resource):
|
||||
|
||||
if request.json.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not request.json.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
if not plist or request.json.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(request.json)
|
||||
if validation_error:
|
||||
return validation_error, 400
|
||||
|
||||
# XSS etc protection
|
||||
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
|
||||
return "Invalid URL", 400
|
||||
# XSS etc protection - validate URL if it's being updated
|
||||
if 'url' in request.json:
|
||||
new_url = request.json.get('url')
|
||||
|
||||
# URL must be a non-empty string
|
||||
if new_url is None:
|
||||
return "URL cannot be null", 400
|
||||
|
||||
if not isinstance(new_url, str):
|
||||
return "URL must be a string", 400
|
||||
|
||||
if not new_url.strip():
|
||||
return "URL cannot be empty or whitespace only", 400
|
||||
|
||||
if not is_safe_valid_url(new_url.strip()):
|
||||
return "Invalid or unsupported URL format. URL must use http://, https://, or ftp:// protocol", 400
|
||||
|
||||
# Handle processor-config-* fields separately (save to JSON, not datastore)
|
||||
from changedetectionio import processors
|
||||
@@ -232,6 +251,10 @@ class WatchSingleHistory(Resource):
|
||||
if timestamp == 'latest':
|
||||
timestamp = list(watch.history.keys())[-1]
|
||||
|
||||
# Validate that the timestamp exists in history
|
||||
if timestamp not in watch.history:
|
||||
abort(404, message=f"No history snapshot found for timestamp '{timestamp}'")
|
||||
|
||||
if request.args.get('html'):
|
||||
content = watch.get_fetched_html(timestamp)
|
||||
if content:
|
||||
@@ -380,16 +403,9 @@ class WatchFavicon(Resource):
|
||||
|
||||
favicon_filename = watch.get_favicon_filename()
|
||||
if favicon_filename:
|
||||
try:
|
||||
import magic
|
||||
mime = magic.from_file(
|
||||
os.path.join(watch.watch_data_dir, favicon_filename),
|
||||
mime=True
|
||||
)
|
||||
except ImportError:
|
||||
# Fallback, no python-magic
|
||||
import mimetypes
|
||||
mime, encoding = mimetypes.guess_type(favicon_filename)
|
||||
# Use cached MIME type detection
|
||||
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
|
||||
mime = get_favicon_mime_type(filepath)
|
||||
|
||||
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
||||
response.headers['Content-type'] = mime
|
||||
@@ -419,8 +435,9 @@ class CreateWatch(Resource):
|
||||
|
||||
if json_data.get('proxy'):
|
||||
plist = self.datastore.proxy_list
|
||||
if not json_data.get('proxy') in plist:
|
||||
return "Invalid proxy choice, currently supported proxies are '{}'".format(', '.join(plist)), 400
|
||||
if not plist or json_data.get('proxy') not in plist:
|
||||
proxy_list_str = ', '.join(plist) if plist else 'none configured'
|
||||
return f"Invalid proxy choice, currently supported proxies are '{proxy_list_str}'", 400
|
||||
|
||||
# Validate time_between_check when not using defaults
|
||||
validation_error = validate_time_between_check_required(json_data)
|
||||
|
||||
@@ -30,13 +30,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
app: Flask application instance
|
||||
datastore: Application datastore
|
||||
executor: ThreadPoolExecutor for queue operations (optional)
|
||||
|
||||
Returns:
|
||||
"restart" if worker should restart, "shutdown" for clean exit
|
||||
"""
|
||||
# Set a descriptive name for this task
|
||||
task = asyncio.current_task()
|
||||
if task:
|
||||
task.set_name(f"async-worker-{worker_id}")
|
||||
|
||||
logger.info(f"Starting async worker {worker_id}")
|
||||
# Read restart policy from environment
|
||||
max_jobs = int(os.getenv("WORKER_MAX_JOBS", "10"))
|
||||
max_runtime_seconds = int(os.getenv("WORKER_MAX_RUNTIME", "3600")) # 1 hour default
|
||||
|
||||
jobs_processed = 0
|
||||
start_time = time.time()
|
||||
|
||||
logger.info(f"Starting async worker {worker_id} (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
update_handler = None
|
||||
@@ -51,7 +61,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No jobs available, continue loop
|
||||
# No jobs available - check if we should restart based on time while idle
|
||||
runtime = time.time() - start_time
|
||||
if runtime >= max_runtime_seconds:
|
||||
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
|
||||
return "restart"
|
||||
continue
|
||||
except Exception as e:
|
||||
# Handle expected Empty exception from queue timeout
|
||||
@@ -488,6 +502,19 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
# Small yield for normal completion
|
||||
await asyncio.sleep(0.01)
|
||||
|
||||
# Job completed - increment counter and check restart conditions
|
||||
jobs_processed += 1
|
||||
runtime = time.time() - start_time
|
||||
|
||||
# Check if we should restart (only when idle, between jobs)
|
||||
should_restart_jobs = jobs_processed >= max_jobs
|
||||
should_restart_time = runtime >= max_runtime_seconds
|
||||
|
||||
if should_restart_jobs or should_restart_time:
|
||||
reason = f"{jobs_processed} jobs" if should_restart_jobs else f"{runtime:.0f}s runtime"
|
||||
logger.info(f"Worker {worker_id} restarting after {reason} ({jobs_processed} jobs, {runtime:.0f}s runtime)")
|
||||
return "restart"
|
||||
|
||||
# Check if we should exit
|
||||
if app.config.exit.is_set():
|
||||
break
|
||||
@@ -495,10 +522,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||
import sys
|
||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
|
||||
if not in_pytest:
|
||||
logger.info(f"Worker {worker_id} shutting down")
|
||||
|
||||
return "shutdown"
|
||||
|
||||
|
||||
def cleanup_error_artifacts(uuid, datastore):
|
||||
"""Helper function to clean up error artifacts"""
|
||||
|
||||
43
changedetectionio/favicon_utils.py
Normal file
43
changedetectionio/favicon_utils.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""
|
||||
Favicon utilities for changedetection.io
|
||||
Handles favicon MIME type detection with caching
|
||||
"""
|
||||
|
||||
from functools import lru_cache
|
||||
|
||||
|
||||
@lru_cache(maxsize=1000)
|
||||
def get_favicon_mime_type(filepath):
|
||||
"""
|
||||
Detect MIME type of favicon by reading file content using puremagic.
|
||||
Results are cached to avoid repeatedly reading the same files.
|
||||
|
||||
Args:
|
||||
filepath: Full path to the favicon file
|
||||
|
||||
Returns:
|
||||
MIME type string (e.g., 'image/png')
|
||||
"""
|
||||
mime = None
|
||||
|
||||
try:
|
||||
import puremagic
|
||||
with open(filepath, 'rb') as f:
|
||||
content_bytes = f.read(200) # Read first 200 bytes
|
||||
|
||||
detections = puremagic.magic_string(content_bytes)
|
||||
if detections:
|
||||
mime = detections[0].mime_type
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Fallback to mimetypes if puremagic fails
|
||||
if not mime:
|
||||
import mimetypes
|
||||
mime, _ = mimetypes.guess_type(filepath)
|
||||
|
||||
# Final fallback based on extension
|
||||
if not mime:
|
||||
mime = 'image/x-icon' if filepath.endswith('.ico') else 'image/png'
|
||||
|
||||
return mime
|
||||
@@ -44,6 +44,8 @@ from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, Watch
|
||||
from changedetectionio.api.Search import Search
|
||||
from .time_handler import is_within_schedule
|
||||
from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale
|
||||
from changedetectionio.favicon_utils import get_favicon_mime_type
|
||||
|
||||
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||
|
||||
datastore = None
|
||||
@@ -69,9 +71,13 @@ socketio_server = None
|
||||
CORS(app)
|
||||
|
||||
# Super handy for compressing large BrowserSteps responses and others
|
||||
FlaskCompress(app)
|
||||
app.config['COMPRESS_MIN_SIZE'] = 4096
|
||||
# Flask-Compress handles HTTP compression, Socket.IO compression disabled to prevent memory leak
|
||||
compress = FlaskCompress()
|
||||
app.config['COMPRESS_MIN_SIZE'] = 2096
|
||||
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml']
|
||||
# Use gzip only - smaller memory footprint than zstd/brotli (4-8KB vs 200-500KB contexts)
|
||||
app.config['COMPRESS_ALGORITHM'] = ['gzip']
|
||||
compress.init_app(app)
|
||||
app.config['TEMPLATES_AUTO_RELOAD'] = False
|
||||
|
||||
|
||||
@@ -400,11 +406,27 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
language_codes = get_language_codes()
|
||||
|
||||
def get_locale():
|
||||
# Locale aliases: map browser language codes to translation directory names
|
||||
# This handles cases where browsers send standard codes (e.g., zh-TW)
|
||||
# but our translations use more specific codes (e.g., zh_Hant_TW)
|
||||
locale_aliases = {
|
||||
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
|
||||
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
|
||||
}
|
||||
|
||||
# 1. Try to get locale from session (user explicitly selected)
|
||||
if 'locale' in session:
|
||||
return session['locale']
|
||||
|
||||
# 2. Fall back to Accept-Language header
|
||||
return request.accept_languages.best_match(language_codes)
|
||||
# Get the best match from browser's Accept-Language header
|
||||
browser_locale = request.accept_languages.best_match(language_codes + list(locale_aliases.keys()))
|
||||
|
||||
# 3. Check if we need to map the browser locale to our internal locale
|
||||
if browser_locale in locale_aliases:
|
||||
return locale_aliases[browser_locale]
|
||||
|
||||
return browser_locale
|
||||
|
||||
# Initialize Babel with locale selector
|
||||
babel = Babel(app, locale_selector=get_locale)
|
||||
@@ -666,16 +688,9 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
favicon_filename = watch.get_favicon_filename()
|
||||
if favicon_filename:
|
||||
try:
|
||||
import magic
|
||||
mime = magic.from_file(
|
||||
os.path.join(watch.watch_data_dir, favicon_filename),
|
||||
mime=True
|
||||
)
|
||||
except ImportError:
|
||||
# Fallback, no python-magic
|
||||
import mimetypes
|
||||
mime, encoding = mimetypes.guess_type(favicon_filename)
|
||||
# Use cached MIME type detection
|
||||
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
|
||||
mime = get_favicon_mime_type(filepath)
|
||||
|
||||
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
||||
response.headers['Content-type'] = mime
|
||||
|
||||
@@ -69,7 +69,7 @@ class RecheckPriorityQueue:
|
||||
# Emit signals
|
||||
self._emit_put_signals(item)
|
||||
|
||||
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
|
||||
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -240,7 +240,10 @@ def init_socketio(app, datastore):
|
||||
async_mode=async_mode,
|
||||
cors_allowed_origins=cors_origins, # None means same-origin only
|
||||
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
||||
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')))
|
||||
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
||||
# Disable WebSocket compression to prevent memory accumulation
|
||||
# Flask-Compress already handles HTTP response compression
|
||||
engineio_options={'http_compression': False, 'compression_threshold': 0})
|
||||
|
||||
# Set up event handlers
|
||||
logger.info("Socket.IO: Registering connect event handler")
|
||||
|
||||
@@ -91,8 +91,8 @@ REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notif
|
||||
# And again with brotli+screenshot attachment
|
||||
SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5 REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --dist=load tests/test_backend.py tests/test_rss.py tests/test_unique_lines.py tests/test_notification.py tests/test_access_control.py
|
||||
|
||||
# Try high concurrency
|
||||
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
||||
# Try high concurrency with aggressive worker restarts
|
||||
FETCH_WORKERS=50 WORKER_MAX_RUNTIME=2 WORKER_MAX_JOBS=1 pytest tests/test_history_consistency.py -vv -l -s
|
||||
|
||||
# Check file:// will pickup a file when enabled
|
||||
echo "Hello world" > /tmp/test-file.txt
|
||||
|
||||
@@ -69,6 +69,19 @@
|
||||
}
|
||||
});
|
||||
|
||||
// Handle Enter key in search input
|
||||
if (searchInput) {
|
||||
searchInput.addEventListener('keydown', function(e) {
|
||||
if (e.key === 'Enter') {
|
||||
e.preventDefault();
|
||||
if (searchForm) {
|
||||
// Trigger form submission programmatically
|
||||
searchForm.dispatchEvent(new Event('submit'));
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Handle form submission
|
||||
if (searchForm) {
|
||||
searchForm.addEventListener('submit', function(e) {
|
||||
@@ -88,8 +101,8 @@
|
||||
params.append('tags', tags);
|
||||
}
|
||||
|
||||
// Navigate to search results
|
||||
window.location.href = '?' + params.toString();
|
||||
// Navigate to search results (always redirect to watchlist home)
|
||||
window.location.href = '/?' + params.toString();
|
||||
});
|
||||
}
|
||||
});
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import gc
|
||||
import shutil
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
@@ -125,6 +126,10 @@ class ChangeDetectionStore:
|
||||
if 'application' in from_disk['settings']:
|
||||
self.__data['settings']['application'].update(from_disk['settings']['application'])
|
||||
|
||||
# from_disk no longer needed - free memory immediately
|
||||
del from_disk
|
||||
gc.collect()
|
||||
|
||||
# Convert each existing watch back to the Watch.model object
|
||||
for uuid, watch in self.__data['watching'].items():
|
||||
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
|
||||
@@ -348,7 +353,8 @@ class ChangeDetectionStore:
|
||||
r = requests.request(method="GET",
|
||||
url=url,
|
||||
# So we know to return the JSON instead of the human-friendly "help" page
|
||||
headers={'App-Guid': self.__data['app_guid']})
|
||||
headers={'App-Guid': self.__data['app_guid']},
|
||||
timeout=5.0) # 5 second timeout to prevent blocking
|
||||
res = r.json()
|
||||
|
||||
# List of permissible attributes we accept from the wild internet
|
||||
@@ -449,7 +455,7 @@ class ChangeDetectionStore:
|
||||
data = deepcopy(self.__data)
|
||||
except RuntimeError as e:
|
||||
# Try again in 15 seconds
|
||||
time.sleep(15)
|
||||
time.sleep(1)
|
||||
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
|
||||
self.sync_to_json()
|
||||
return
|
||||
|
||||
@@ -58,7 +58,7 @@ def is_valid_uuid(val):
|
||||
|
||||
|
||||
def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
@@ -506,7 +506,7 @@ def test_api_import(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
def test_api_conflict_UI_password(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Enable password check and diff page access bypass
|
||||
@@ -548,3 +548,172 @@ def test_api_conflict_UI_password(client, live_server, measure_memory_usage, dat
|
||||
assert len(res.json)
|
||||
|
||||
|
||||
def test_api_url_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test URL validation for edge cases in both CREATE and UPDATE endpoints.
|
||||
Addresses security issues where empty/null/invalid URLs could bypass validation.
|
||||
|
||||
This test ensures that:
|
||||
- CREATE endpoint rejects null, empty, and invalid URLs
|
||||
- UPDATE endpoint rejects attempts to change URL to null, empty, or invalid
|
||||
- UPDATE endpoint allows updating other fields without touching URL
|
||||
- URL validation properly checks protocol, format, and safety
|
||||
"""
|
||||
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: CREATE with null URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": None}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with null URL should fail"
|
||||
|
||||
# Test 2: CREATE with empty string URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with empty string URL should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'required' in res.data.lower()
|
||||
|
||||
# Test 3: CREATE with whitespace-only URL should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": " "}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with whitespace-only URL should fail"
|
||||
|
||||
# Test 4: CREATE with invalid protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "javascript:alert(1)"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch with javascript: protocol should fail"
|
||||
|
||||
# Test 5: CREATE with missing protocol should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "example.com"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400, "Creating watch without protocol should fail"
|
||||
|
||||
# Test 6: CREATE with valid URL should succeed (baseline)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid URL test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 201, "Creating watch with valid URL should succeed"
|
||||
assert is_valid_uuid(res.json.get('uuid'))
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 7: UPDATE to null URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": None}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to null should fail"
|
||||
# Accept either OpenAPI validation error or our custom validation error
|
||||
assert b'URL cannot be null' in res.data or b'OpenAPI validation failed' in res.data or b'validation error' in res.data.lower()
|
||||
|
||||
# Test 8: UPDATE to empty string URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": ""}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to empty string should fail"
|
||||
# Accept either our custom validation error or OpenAPI/schema validation error
|
||||
assert b'URL cannot be empty' in res.data or b'OpenAPI validation' in res.data or b'Invalid or unsupported URL' in res.data
|
||||
|
||||
# Test 9: UPDATE to whitespace-only URL should fail
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": " \t\n "}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to whitespace should fail"
|
||||
# Accept either our custom validation error or generic validation error
|
||||
assert b'URL cannot be empty' in res.data or b'Invalid or unsupported URL' in res.data or b'validation' in res.data.lower()
|
||||
|
||||
# Test 10: UPDATE to invalid protocol should fail (javascript:)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "javascript:alert(document.domain)"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to XSS attempt should fail"
|
||||
assert b'Invalid or unsupported URL' in res.data or b'protocol' in res.data.lower()
|
||||
|
||||
# Test 11: UPDATE to file:// protocol should fail (unless ALLOW_FILE_URI is set)
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": "file:///etc/passwd"}),
|
||||
)
|
||||
assert res.status_code == 400, "Updating watch URL to file:// should fail by default"
|
||||
|
||||
# Test 12: UPDATE other fields without URL should succeed
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"title": "Updated title without URL change"}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating other fields without URL should succeed"
|
||||
|
||||
# Test 13: Verify URL is still valid after non-URL update
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == test_url, "URL should remain unchanged"
|
||||
assert res.json.get('title') == "Updated title without URL change"
|
||||
|
||||
# Test 14: UPDATE to valid different URL should succeed
|
||||
new_valid_url = test_url + "?new=param"
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({"url": new_valid_url}),
|
||||
)
|
||||
assert res.status_code == 200, "Updating to valid different URL should succeed"
|
||||
|
||||
# Test 15: Verify URL was actually updated
|
||||
res = client.get(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.json.get('url') == new_valid_url, "URL should be updated to new valid URL"
|
||||
|
||||
# Test 16: CREATE with XSS in URL parameters should fail
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": "http://example.com?xss=<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
# This should fail because of suspicious characters check
|
||||
assert res.status_code == 400, "Creating watch with XSS in URL params should fail"
|
||||
|
||||
# Cleanup
|
||||
client.delete(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
delete_all_watches(client)
|
||||
|
||||
805
changedetectionio/tests/test_api_security.py
Normal file
805
changedetectionio/tests/test_api_security.py
Normal file
@@ -0,0 +1,805 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Comprehensive security and edge case tests for the API.
|
||||
Tests critical areas that were identified as gaps in the existing test suite.
|
||||
"""
|
||||
|
||||
import time
|
||||
import json
|
||||
import threading
|
||||
import uuid as uuid_module
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
import os
|
||||
|
||||
|
||||
def set_original_response(datastore_path):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
Some initial text<br>
|
||||
<p>Which is across multiple lines</p>
|
||||
</body>
|
||||
</html>
|
||||
"""
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
|
||||
def is_valid_uuid(val):
|
||||
try:
|
||||
uuid_module.UUID(str(val))
|
||||
return True
|
||||
except ValueError:
|
||||
return False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 1: CRITICAL SECURITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_path_traversal_in_uuids(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that path traversal attacks via UUID parameter are blocked.
|
||||
Addresses CVE-like vulnerabilities where ../../../ in UUID could access arbitrary files.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a valid watch first
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Valid watch"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
valid_uuid = res.json.get('uuid')
|
||||
|
||||
# Test 1: Path traversal with ../../../
|
||||
res = client.get(
|
||||
f"/api/v1/watch/../../etc/passwd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Path traversal should be rejected"
|
||||
|
||||
# Test 2: Encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/..%2F..%2F..%2Fetc%2Fpasswd",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Encoded path traversal should be rejected"
|
||||
|
||||
# Test 3: Double-encoded path traversal
|
||||
res = client.get(
|
||||
"/api/v1/watch/%2e%2e%2f%2e%2e%2f%2e%2e%2f",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Double-encoded traversal should be rejected"
|
||||
|
||||
# Test 4: Try to access datastore file
|
||||
res = client.get(
|
||||
"/api/v1/watch/../url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404], "Access to datastore should be blocked"
|
||||
|
||||
# Test 5: Null byte injection
|
||||
res = client.get(
|
||||
f"/api/v1/watch/{valid_uuid}%00.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (ignoring null byte) or reject - but not crash
|
||||
assert res.status_code in [200, 400, 404]
|
||||
|
||||
# Test 6: DELETE with path traversal
|
||||
res = client.delete(
|
||||
"/api/v1/watch/../../datastore/url-watches.json",
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code in [400, 404, 405], "DELETE with traversal should be blocked (405=method not allowed is also acceptable)"
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=valid_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_injection_via_headers_and_proxy(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that injection attacks via headers and proxy fields are properly sanitized.
|
||||
Addresses XSS and injection vulnerabilities.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: XSS in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {
|
||||
"User-Agent": "<script>alert(1)</script>",
|
||||
"X-Custom": "'; DROP TABLE watches; --"
|
||||
}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Headers are metadata used for HTTP requests, not HTML rendering
|
||||
# Storing them as-is is expected behavior
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
# Verify headers are stored (API returns JSON, not HTML, so no XSS risk)
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Null bytes in headers
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": {"X-Test": "value\x00null"}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle null bytes gracefully (reject or sanitize)
|
||||
assert res.status_code in [201, 400]
|
||||
|
||||
# Test 3: Malformed proxy string
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"proxy": "http://evil.com:8080@victim.com"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject invalid proxy format
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Control characters in notification title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_title": "Test\r\nInjected-Header: value"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize control characters
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_large_payload_dos(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that excessively large payloads are rejected to prevent DoS.
|
||||
Addresses memory leak issues found in changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Huge ignore_text array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"ignore_text": ["a" * 10000] * 100 # 1MB of data
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Massive headers object
|
||||
huge_headers = {f"X-Header-{i}": "x" * 1000 for i in range(100)}
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"headers": huge_headers
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or truncate
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Huge browser_steps array
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test" * 1000, "optional_value": ""}
|
||||
] * 100
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject or limit
|
||||
assert res.status_code in [201, 400, 413]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Extremely long title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "x" * 100000 # 100KB title
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should reject (exceeds maxLength: 5000)
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_utf8_encoding_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test UTF-8 encoding edge cases that have caused bugs on Windows.
|
||||
Addresses 18+ encoding bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Unicode in title (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test 中文 Ελληνικά 日本語 🔥"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
|
||||
# Verify it round-trips correctly
|
||||
res = client.get(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
assert res.status_code == 200
|
||||
assert "中文" in res.json.get('title')
|
||||
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Unicode in URL query parameters
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url + "?search=日本語"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle URL encoding properly
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Null byte in title (should be rejected or sanitized)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "Test\x00Title"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should handle gracefully
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: BOM (Byte Order Mark) in title
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"title": "\ufeffTest with BOM"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_concurrency_race_conditions(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test concurrent API requests to detect race conditions.
|
||||
Addresses 20+ concurrency bugs from changelog.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create a watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": "Concurrency test"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Concurrent updates to same watch
|
||||
# Note: Flask test client is not thread-safe, so we test sequential updates instead
|
||||
# Real concurrency issues would be caught in integration tests with actual HTTP requests
|
||||
results = []
|
||||
for i in range(10):
|
||||
try:
|
||||
r = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": f"Title {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
results.append(r.status_code)
|
||||
except Exception as e:
|
||||
results.append(str(e))
|
||||
|
||||
# All updates should succeed (200) without crashes
|
||||
assert all(r == 200 for r in results), f"Some updates failed: {results}"
|
||||
|
||||
# Test 2: Update while watch is being checked
|
||||
# Queue a recheck
|
||||
client.get(
|
||||
url_for("watch", uuid=watch_uuid, recheck=True),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
|
||||
# Immediately update it
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
data=json.dumps({"title": "Updated during check"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should succeed without error
|
||||
assert res.status_code == 200
|
||||
|
||||
# Test 3: Delete watch that's being processed
|
||||
# Create another watch
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid2 = res.json.get('uuid')
|
||||
|
||||
# Queue it for checking
|
||||
client.get(url_for("watch", uuid=watch_uuid2, recheck=True), headers={'x-api-key': api_key})
|
||||
|
||||
# Immediately delete it
|
||||
res = client.delete(url_for("watch", uuid=watch_uuid2), headers={'x-api-key': api_key})
|
||||
# Should succeed or return appropriate error
|
||||
assert res.status_code in [204, 404, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 2: IMPORTANT FUNCTIONALITY TESTS
|
||||
# ============================================================================
|
||||
|
||||
def test_api_time_validation_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test time_between_check validation edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Zero interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 0}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Zero interval should be rejected"
|
||||
|
||||
# Test 2: Negative interval
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": -100}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "Negative interval should be rejected"
|
||||
|
||||
# Test 3: All fields null with use_default=false
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": None, "days": None, "hours": None, "minutes": None, "seconds": None}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 400, "All null intervals should be rejected when not using default"
|
||||
|
||||
# Test 4: Extremely large interval (overflow risk)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"weeks": 999999999}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should either accept (with limits) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 5: Valid minimal interval (should work)
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"time_between_check_use_default": False,
|
||||
"time_between_check": {"seconds": 60}
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_browser_steps_validation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test browser_steps validation for invalid operations and structures.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Empty browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "", "selector": "", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid as null)
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid operation type
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "invalid_operation", "selector": "#test", "optional_value": ""}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (validation happens at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Missing required fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click"} # Missing selector and optional_value
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to schema validation
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 4: Extra fields in browser step
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"browser_steps": [
|
||||
{"operation": "click", "selector": "#test", "optional_value": "", "extra_field": "value"}
|
||||
]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected due to additionalProperties: false
|
||||
assert res.status_code == 400
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_queue_manipulation(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test queue behavior under stress and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Create many watches rapidly
|
||||
watch_uuids = []
|
||||
for i in range(20):
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url, "title": f"Watch {i}"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
if res.status_code == 201:
|
||||
watch_uuids.append(res.json.get('uuid'))
|
||||
|
||||
assert len(watch_uuids) == 20, "Should be able to create 20 watches"
|
||||
|
||||
# Test 2: Recheck all when watches exist
|
||||
res = client.get(
|
||||
url_for("createwatch", recheck_all='1'),
|
||||
headers={'x-api-key': api_key},
|
||||
)
|
||||
# Should return success (200 or 202 for background processing)
|
||||
assert res.status_code in [200, 202]
|
||||
|
||||
# Test 3: Verify queue doesn't overflow with moderate load
|
||||
# The app has MAX_QUEUE_SIZE = 5000, we're well below that
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Cleanup
|
||||
for uuid in watch_uuids:
|
||||
client.delete(url_for("watch", uuid=uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# TIER 3: EDGE CASES & POLISH
|
||||
# ============================================================================
|
||||
|
||||
def test_api_history_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test history API with invalid timestamps and edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Create watch and generate history
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({"url": test_url}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
watch_uuid = res.json.get('uuid')
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Test 1: Get history with invalid timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="invalid"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Invalid timestamp should return 404"
|
||||
|
||||
# Test 2: Future timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="9999999999"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Future timestamp should return 404"
|
||||
|
||||
# Test 3: Negative timestamp
|
||||
res = client.get(
|
||||
url_for("watchsinglehistory", uuid=watch_uuid, timestamp="-1"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 404, "Negative timestamp should return 404"
|
||||
|
||||
# Test 4: Diff with reversed timestamps (from > to)
|
||||
# First get actual timestamps
|
||||
res = client.get(
|
||||
url_for("watchhistory", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
if len(res.json) >= 2:
|
||||
timestamps = sorted(res.json.keys())
|
||||
# Try reversed order
|
||||
res = client.get(
|
||||
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp=timestamps[-1], to_timestamp=timestamps[0]),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
# Should either work (show reverse diff) or return error
|
||||
assert res.status_code in [200, 400]
|
||||
|
||||
# Cleanup
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_notification_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test notification configuration edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Invalid notification URL
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": ["invalid://url", "ftp://test.com"]
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (apprise validates at runtime) or reject
|
||||
assert res.status_code in [201, 400]
|
||||
if res.status_code == 201:
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 2: Invalid notification format
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_format": "invalid_format"
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected by schema
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 3: Empty notification arrays
|
||||
res = client.post(
|
||||
url_for("createwatch"),
|
||||
data=json.dumps({
|
||||
"url": test_url,
|
||||
"notification_urls": []
|
||||
}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (empty is valid)
|
||||
assert res.status_code == 201
|
||||
watch_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("watch", uuid=watch_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_api_tag_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test tag/group API edge cases including XSS and path traversal.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
|
||||
# Test 1: Empty tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": ""}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (empty title)
|
||||
assert res.status_code == 400
|
||||
|
||||
# Test 2: XSS in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "<script>alert(1)</script>"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept but sanitize
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
# Verify title is stored safely
|
||||
res = client.get(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
# Should be escaped or sanitized
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 3: Path traversal in tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "../../etc/passwd"}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should accept (it's just a string, not a path)
|
||||
if res.status_code == 201:
|
||||
tag_uuid = res.json.get('uuid')
|
||||
client.delete(url_for("tag", uuid=tag_uuid), headers={'x-api-key': api_key})
|
||||
|
||||
# Test 4: Very long tag title
|
||||
res = client.post(
|
||||
url_for("tag"),
|
||||
data=json.dumps({"title": "x" * 10000}),
|
||||
headers={'content-type': 'application/json', 'x-api-key': api_key},
|
||||
)
|
||||
# Should be rejected (exceeds maxLength)
|
||||
assert res.status_code == 400
|
||||
|
||||
|
||||
def test_api_authentication_edge_cases(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test API authentication edge cases.
|
||||
"""
|
||||
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
|
||||
# Test 1: Missing API key
|
||||
res = client.get(url_for("createwatch"))
|
||||
assert res.status_code == 403, "Missing API key should be forbidden"
|
||||
|
||||
# Test 2: Invalid API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "invalid_key_12345"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 3: API key with special characters
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "key<script>alert(1)</script>"}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 4: Very long API key
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': "x" * 10000}
|
||||
)
|
||||
assert res.status_code == 403, "Invalid API key should be forbidden"
|
||||
|
||||
# Test 5: Case sensitivity of API key
|
||||
wrong_case_key = api_key.upper() if api_key.islower() else api_key.lower()
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': wrong_case_key}
|
||||
)
|
||||
# Should be forbidden (keys are case-sensitive)
|
||||
assert res.status_code == 403, "Wrong case API key should be forbidden"
|
||||
|
||||
# Test 6: Valid API key should work
|
||||
res = client.get(
|
||||
url_for("createwatch"),
|
||||
headers={'x-api-key': api_key}
|
||||
)
|
||||
assert res.status_code == 200, "Valid API key should work"
|
||||
@@ -325,3 +325,274 @@ def test_time_unit_translations(client, live_server, measure_memory_usage, datas
|
||||
assert b"Time Between Check" not in res.data, "Should not have English 'Time Between Check'"
|
||||
assert "Chrome 請求".encode() not in res.data, "Should not have incorrect 'Chrome 請求' (Chrome requests)"
|
||||
assert "使用預設通知".encode() not in res.data, "Should not have incorrect '使用預設通知' (Use default notification)"
|
||||
|
||||
|
||||
def test_accept_language_header_zh_tw(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending zh-TW in Accept-Language header get Traditional Chinese.
|
||||
This tests the locale alias mapping for issue #3779.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Clear any session data to simulate a fresh visitor
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
# Request the index page with zh-TW in Accept-Language header (what browsers send)
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Traditional Chinese content, not Simplified Chinese
|
||||
# Traditional: 選擇語言, Simplified: 选择语言
|
||||
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' (Select Language)"
|
||||
assert '选择语言'.encode() not in res.data, "Should not get Simplified Chinese '选择语言'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format
|
||||
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected BCP 47 language tag zh-Hant-TW in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (Taiwan flag for Traditional Chinese)
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Taiwan flag 'fi fi-tw' for Traditional Chinese"
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show China flag 'fi fi-cn' for Traditional Chinese"
|
||||
|
||||
# Verify we're getting Traditional Chinese text throughout the page
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Traditional Chinese translations (not English)
|
||||
assert "小時".encode() in res.data, "Expected Traditional Chinese '小時' for Hours"
|
||||
assert "分鐘".encode() in res.data, "Expected Traditional Chinese '分鐘' for Minutes"
|
||||
assert b"Hours" not in res.data or "小時".encode() in res.data, "Should have Traditional Chinese, not English"
|
||||
|
||||
|
||||
def test_accept_language_header_en_variants(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending en-GB and en-US in Accept-Language header get the correct English variant.
|
||||
This ensures the locale selector works properly for English variants.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Test 1: British English (en-GB)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en-GB,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format with hyphen
|
||||
assert b'<html lang="en-GB"' in res.data, "Expected BCP 47 language tag en-GB in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (UK flag for en-GB)
|
||||
assert b'<span class="fi fi-gb fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected UK flag 'fi fi-gb' for British English"
|
||||
|
||||
# Test 2: American English (en-US)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en-US,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
# Check HTML lang attribute uses BCP 47 format with hyphen
|
||||
assert b'<html lang="en-US"' in res.data, "Expected BCP 47 language tag en-US in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (US flag for en-US)
|
||||
assert b'<span class="fi fi-us fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected US flag 'fi fi-us' for American English"
|
||||
|
||||
# Test 3: Generic 'en' should fall back to one of the English variants
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'en'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get English content (either variant is fine)
|
||||
assert b"Select Language" in res.data, "Expected English text 'Select Language'"
|
||||
|
||||
|
||||
def test_accept_language_header_zh_simplified(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that browsers sending zh or zh-CN in Accept-Language header get Simplified Chinese.
|
||||
This ensures Simplified Chinese still works correctly and doesn't get confused with Traditional.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Test 1: Generic 'zh' should get Simplified Chinese
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Simplified Chinese content, not Traditional Chinese
|
||||
# Simplified: 选择语言, Traditional: 選擇語言
|
||||
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' (Select Language)"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese '選擇語言'"
|
||||
|
||||
# Check HTML lang attribute
|
||||
assert b'<html lang="zh"' in res.data, "Expected language tag zh in HTML"
|
||||
|
||||
# Check that the correct flag icon is shown (China flag for Simplified Chinese)
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected China flag 'fi fi-cn' for Simplified Chinese"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show Taiwan flag 'fi fi-tw' for Simplified Chinese"
|
||||
|
||||
# Test 2: 'zh-CN' should also get Simplified Chinese
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-CN,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should get Simplified Chinese content
|
||||
assert '选择语言'.encode() in res.data, "Expected Simplified Chinese '选择语言' with zh-CN header"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese with zh-CN header"
|
||||
|
||||
# Check that the correct flag icon is shown (China flag for zh-CN)
|
||||
assert b'<span class="fi fi-cn fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected China flag 'fi fi-cn' for zh-CN header"
|
||||
|
||||
# Verify Simplified Chinese in settings page
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh,en;q=0.9'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Simplified Chinese translations (not Traditional or English)
|
||||
# Simplified: 小时, Traditional: 小時
|
||||
assert "小时".encode() in res.data, "Expected Simplified Chinese '小时' for Hours"
|
||||
assert "分钟".encode() in res.data, "Expected Simplified Chinese '分钟' for Minutes"
|
||||
assert "秒".encode() in res.data, "Expected Simplified Chinese '秒' for Seconds"
|
||||
# Make sure it's not Traditional Chinese
|
||||
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時'"
|
||||
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘'"
|
||||
|
||||
|
||||
def test_session_locale_overrides_accept_language(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that session locale preference overrides browser Accept-Language header.
|
||||
|
||||
Scenario:
|
||||
1. Browser auto-detects zh-TW (Traditional Chinese) from Accept-Language header
|
||||
2. User explicitly selects Korean language
|
||||
3. On subsequent page loads, Korean should be shown (not Traditional Chinese)
|
||||
even though the Accept-Language header still says zh-TW
|
||||
|
||||
This tests the session override behavior for issue #3779.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Step 1: Clear session and make first request with zh-TW header (auto-detect)
|
||||
with client.session_transaction() as sess:
|
||||
sess.clear()
|
||||
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should initially get Traditional Chinese from auto-detect
|
||||
assert '選擇語言'.encode() in res.data, "Expected Traditional Chinese '選擇語言' from auto-detect"
|
||||
assert b'<html lang="zh-Hant-TW"' in res.data, "Expected zh-Hant-TW language tag"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Taiwan flag 'fi fi-tw' from auto-detect"
|
||||
|
||||
# Step 2: User explicitly selects Korean language
|
||||
res = client.get(
|
||||
url_for("set_language", locale="ko"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Browser still sends zh-TW
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Step 3: Make another request with same zh-TW header
|
||||
# Session should override the Accept-Language header
|
||||
res = client.get(
|
||||
url_for("watchlist.index"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still sending zh-TW!
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Should now get Korean (session overrides auto-detect)
|
||||
# Korean: 언어 선택, Traditional Chinese: 選擇語言
|
||||
assert '언어 선택'.encode() in res.data, "Expected Korean '언어 선택' (Select Language) from session"
|
||||
assert '選擇語言'.encode() not in res.data, "Should not get Traditional Chinese when Korean is set in session"
|
||||
|
||||
# Check HTML lang attribute is Korean
|
||||
assert b'<html lang="ko"' in res.data, "Expected Korean language tag 'ko' in HTML"
|
||||
|
||||
# Check that Korean flag is shown (not Taiwan flag)
|
||||
assert b'<span class="fi fi-kr fis" id="language-selector-flag">' in res.data, \
|
||||
"Expected Korean flag 'fi fi-kr' from session preference"
|
||||
assert b'<span class="fi fi-tw fis" id="language-selector-flag">' not in res.data, \
|
||||
"Should not show Taiwan flag when Korean is set in session"
|
||||
|
||||
# Verify Korean text on settings page as well
|
||||
res = client.get(
|
||||
url_for("settings.settings_page"),
|
||||
headers={'Accept-Language': 'zh-TW,zh;q=0.9,en;q=0.8'}, # Still zh-TW!
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert res.status_code == 200
|
||||
|
||||
# Check Korean translations (not Traditional Chinese or English)
|
||||
# Korean: 시간 (Hours), 분 (Minutes), 초 (Seconds)
|
||||
# Traditional Chinese: 小時, 分鐘, 秒
|
||||
assert "시간".encode() in res.data, "Expected Korean '시간' for Hours"
|
||||
assert "분".encode() in res.data, "Expected Korean '분' for Minutes"
|
||||
assert "小時".encode() not in res.data, "Should not have Traditional Chinese '小時' when Korean is set"
|
||||
assert "分鐘".encode() not in res.data, "Should not have Traditional Chinese '分鐘' when Korean is set"
|
||||
|
||||
@@ -64,6 +64,19 @@ def is_safe_valid_url(test_url):
|
||||
import re
|
||||
import validators
|
||||
|
||||
# Validate input type first - must be a non-empty string
|
||||
if test_url is None:
|
||||
logger.warning('URL validation failed: URL is None')
|
||||
return False
|
||||
|
||||
if not isinstance(test_url, str):
|
||||
logger.warning(f'URL validation failed: URL must be a string, got {type(test_url).__name__}')
|
||||
return False
|
||||
|
||||
if not test_url.strip():
|
||||
logger.warning('URL validation failed: URL is empty or whitespace only')
|
||||
return False
|
||||
|
||||
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
|
||||
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
|
||||
|
||||
|
||||
@@ -132,11 +132,19 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
try:
|
||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||
# If we reach here, worker exited cleanly
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||
break
|
||||
result = await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||
|
||||
if result == "restart":
|
||||
# Worker requested restart - immediately loop back and restart
|
||||
if not in_pytest:
|
||||
logger.debug(f"Async worker {worker_id} restarting")
|
||||
continue
|
||||
else:
|
||||
# Worker exited cleanly (shutdown)
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||
break
|
||||
|
||||
except asyncio.CancelledError:
|
||||
# Task was cancelled (normal shutdown)
|
||||
if not in_pytest:
|
||||
@@ -147,7 +155,7 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
|
||||
if not in_pytest:
|
||||
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
|
||||
await asyncio.sleep(5)
|
||||
|
||||
|
||||
if not in_pytest:
|
||||
logger.info(f"Async worker {worker_id} shutdown complete")
|
||||
|
||||
@@ -161,7 +169,11 @@ def add_worker(update_q, notification_q, app, datastore):
|
||||
"""Add a new async worker (for dynamic scaling)"""
|
||||
global worker_threads
|
||||
|
||||
worker_id = len(worker_threads)
|
||||
# Reuse lowest available ID to prevent unbounded growth over time
|
||||
used_ids = {w.worker_id for w in worker_threads}
|
||||
worker_id = 0
|
||||
while worker_id in used_ids:
|
||||
worker_id += 1
|
||||
logger.info(f"Adding async worker {worker_id}")
|
||||
|
||||
try:
|
||||
@@ -251,7 +263,7 @@ def queue_item_async_safe(update_q, item, silent=False):
|
||||
return False
|
||||
|
||||
if not silent:
|
||||
logger.debug(f"Successfully queued item: {item_uuid}")
|
||||
logger.trace(f"Successfully queued item: {item_uuid}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
|
||||
@@ -183,15 +183,30 @@ components:
|
||||
properties:
|
||||
weeks:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 52000
|
||||
nullable: true
|
||||
days:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 365000
|
||||
nullable: true
|
||||
hours:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 8760000
|
||||
nullable: true
|
||||
minutes:
|
||||
type: integer
|
||||
minimum: 0
|
||||
maximum: 525600000
|
||||
nullable: true
|
||||
seconds:
|
||||
type: integer
|
||||
description: Time intervals between checks
|
||||
minimum: 0
|
||||
maximum: 31536000000
|
||||
nullable: true
|
||||
description: Time intervals between checks. All fields must be non-negative. At least one non-zero value required when not using default settings.
|
||||
time_between_check_use_default:
|
||||
type: boolean
|
||||
default: true
|
||||
@@ -200,7 +215,9 @@ components:
|
||||
type: array
|
||||
items:
|
||||
type: string
|
||||
description: Notification URLs for this web page change monitor (watch)
|
||||
maxLength: 1000
|
||||
maxItems: 100
|
||||
description: Notification URLs for this web page change monitor (watch). Maximum 100 URLs.
|
||||
notification_title:
|
||||
type: string
|
||||
description: Custom notification title
|
||||
@@ -224,14 +241,19 @@ components:
|
||||
operation:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
selector:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
optional_value:
|
||||
type: string
|
||||
maxLength: 5000
|
||||
nullable: true
|
||||
required: [operation, selector, optional_value]
|
||||
description: Browser automation steps
|
||||
additionalProperties: false
|
||||
maxItems: 100
|
||||
description: Browser automation steps. Maximum 100 steps allowed.
|
||||
processor:
|
||||
type: string
|
||||
enum: [restock_diff, text_json_diff]
|
||||
|
||||
Reference in New Issue
Block a user