Compare commits

...

2 Commits

Author SHA1 Message Date
dgtlmoon
9a3dba3d5c Hardening 2026-01-02 17:13:21 +01:00
dgtlmoon
2e447b47f1 UI - Handling redirects on login to the correct page 2026-01-02 16:58:42 +01:00
6 changed files with 482 additions and 19 deletions

View File

@@ -26,6 +26,7 @@ from flask import (
session,
url_for,
)
from urllib.parse import urlparse
from flask_compress import Compress as FlaskCompress
from flask_login import current_user
from flask_restful import abort, Api
@@ -350,6 +351,13 @@ def changedetection_app(config=None, datastore_o=None):
global datastore, socketio_server
datastore = datastore_o
# Import and create a wrapper for is_safe_url that has access to app
from changedetectionio.is_safe_url import is_safe_url as _is_safe_url
def is_safe_url(target):
"""Wrapper for is_safe_url that passes the app instance"""
return _is_safe_url(target, app)
# so far just for read-only via tests, but this will be moved eventually to be the main source
# (instead of the global var)
app.config['DATASTORE'] = datastore_o
@@ -471,11 +479,21 @@ def changedetection_app(config=None, datastore_o=None):
@login_manager.unauthorized_handler
def unauthorized_handler():
return redirect(url_for('login', next=url_for('watchlist.index')))
# Pass the current request path so users are redirected back after login
return redirect(url_for('login', redirect=request.path))
@app.route('/logout')
def logout():
flask_login.logout_user()
# Check if there's a redirect parameter to return to after re-login
redirect_url = request.args.get('redirect')
# If redirect is provided and safe, pass it to login page
if redirect_url and is_safe_url(redirect_url):
return redirect(url_for('login', redirect=redirect_url))
# Otherwise just go to watchlist
return redirect(url_for('watchlist.index'))
@app.route('/set-language/<locale>')
@@ -487,20 +505,36 @@ def changedetection_app(config=None, datastore_o=None):
else:
logger.error(f"Invalid locale {locale}, available: {language_codes}")
# Redirect back to the page they came from, or home
# Check if there's a redirect parameter to return to the same page
redirect_url = request.args.get('redirect')
# If redirect is provided and safe, use it
if redirect_url and is_safe_url(redirect_url):
return redirect(redirect_url)
# Otherwise redirect to watchlist
return redirect(url_for('watchlist.index'))
# https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39
# You can divide up the stuff like this
@app.route('/login', methods=['GET', 'POST'])
def login():
# Extract and validate the redirect parameter
redirect_url = request.args.get('redirect') or request.form.get('redirect')
# Validate the redirect URL - default to watchlist if invalid
if redirect_url and is_safe_url(redirect_url):
validated_redirect = redirect_url
else:
validated_redirect = url_for('watchlist.index')
if request.method == 'GET':
if flask_login.current_user.is_authenticated:
# Already logged in - redirect immediately to the target
flash(gettext("Already logged in"))
return redirect(url_for("watchlist.index"))
return redirect(validated_redirect)
flash(gettext("You must be logged in, please log in."), 'error')
output = render_template("login.html")
output = render_template("login.html", redirect_url=validated_redirect)
return output
user = User()
@@ -510,23 +544,13 @@ def changedetection_app(config=None, datastore_o=None):
if (user.check_password(password)):
flask_login.login_user(user, remember=True)
# For now there's nothing else interesting here other than the index/list page
# It's more reliable and safe to ignore the 'next' redirect
# When we used...
# next = request.args.get('next')
# return redirect(next or url_for('watchlist.index'))
# We would sometimes get login loop errors on sites hosted in sub-paths
# note for the future:
# if not is_safe_valid_url(next):
# return flask.abort(400)
return redirect(url_for('watchlist.index'))
# Redirect to the validated URL after successful login
return redirect(validated_redirect)
else:
flash(gettext('Incorrect password'), 'error')
return redirect(url_for('login'))
return redirect(url_for('login', redirect=redirect_url if redirect_url else None))
@app.before_request
def before_request_handle_cookie_x_settings():

View File

@@ -0,0 +1,113 @@
"""
URL redirect validation module for preventing open redirect vulnerabilities.
This module provides functionality to safely validate redirect URLs, ensuring they:
1. Point to internal routes only (no external redirects)
2. Are properly normalized (preventing browser parsing differences)
3. Match registered Flask routes (no fake/non-existent pages)
4. Are fully logged for security monitoring
References:
- https://flask-login.readthedocs.io/ (safe redirect patterns)
- https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-v-user-logins
- https://www.pythonkitchen.com/how-prevent-open-redirect-vulnerab-flask/
"""
from urllib.parse import urlparse, urljoin
from flask import request
from loguru import logger
def is_safe_url(target, app):
"""
Validate that a redirect URL is safe to prevent open redirect vulnerabilities.
This follows Flask/Werkzeug best practices by ensuring the redirect URL:
1. Is a relative path starting with exactly one '/'
2. Does not start with '//' (double-slash attack)
3. Has no external protocol handlers
4. Points to a valid registered route in the application
5. Is properly normalized to prevent browser parsing differences
Args:
target: The URL to validate (e.g., '/settings', '/login#top')
app: The Flask application instance (needed for route validation)
Returns:
bool: True if the URL is safe for redirection, False otherwise
Examples:
>>> is_safe_url('/settings', app)
True
>>> is_safe_url('//evil.com', app)
False
>>> is_safe_url('/settings#general', app)
True
>>> is_safe_url('/fake-page', app)
False
"""
if not target:
return False
# Normalize the URL to prevent browser parsing differences
# Strip whitespace and replace backslashes (which some browsers interpret as forward slashes)
target = target.strip()
target = target.replace('\\', '/')
# First, check if it starts with // or more (double-slash attack)
if target.startswith('//'):
logger.warning(f"Blocked redirect attempt with double-slash: {target}")
return False
# Parse the URL to check for scheme and netloc
parsed = urlparse(target)
# Block any URL with a scheme (http://, https://, javascript:, etc.)
if parsed.scheme:
logger.warning(f"Blocked redirect attempt with scheme: {target}")
return False
# Block any URL with a network location (netloc)
# This catches patterns like //evil.com, user@host, etc.
if parsed.netloc:
logger.warning(f"Blocked redirect attempt with netloc: {target}")
return False
# At this point, we have a relative URL with no scheme or netloc
# Use urljoin to resolve it and verify it points to the same host
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
# Check: ensure the resolved URL has the same netloc as current host
if not (test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc):
logger.warning(f"Blocked redirect attempt with mismatched netloc: {target}")
return False
# Additional validation: Check if the URL matches a registered route
# This prevents redirects to non-existent pages or unintended endpoints
try:
# Get the path without query string and fragment
# Fragments (like #general) are automatically stripped by urlparse
path = parsed.path
# Create a URL adapter bound to the server name
adapter = app.url_map.bind(ref_url.netloc)
# Try to match the path to a registered route
# This will raise NotFound if the route doesn't exist
endpoint, values = adapter.match(path, return_rule=False)
# Block redirects to static file endpoints - these are catch-all routes
# that would match arbitrary paths, potentially allowing unintended redirects
if endpoint in ('static_content', 'static', 'static_flags'):
logger.warning(f"Blocked redirect to static endpoint: {target}")
return False
# Successfully matched a valid route
logger.debug(f"Validated safe redirect to endpoint '{endpoint}': {target}")
return True
except Exception as e:
# Route doesn't exist or can't be matched
logger.warning(f"Blocked redirect to non-existent route: {target} (error: {e})")
return False

View File

@@ -97,7 +97,7 @@
{% endif %}
{% if current_user.is_authenticated %}
<li class="pure-menu-item">
<a href="{{url_for('logout')}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
<a href="{{url_for('logout', redirect=request.path)}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
</li>
{% endif %}
{% if current_user.is_authenticated or not has_password %}
@@ -261,7 +261,7 @@
<div class="modal-body">
<div class="language-list">
{% for locale, lang_data in available_languages.items()|sort %}
<a href="{{ url_for('set_language', locale=locale) }}" class="language-option" data-locale="{{ locale }}">
<a href="{{ url_for('set_language', locale=locale, redirect=request.path) }}" class="language-option" data-locale="{{ locale }}">
<span class="{{ lang_data.flag }}" style="display: inline-block; width: 1.5em; height: 1.5em; vertical-align: middle; margin-right: 0.5em; border-radius: 50%; overflow: hidden;"></span> <span class="language-name">{{ lang_data.name }}</span>
</a>
{% endfor %}

View File

@@ -5,6 +5,7 @@
<div class="inner">
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" id="redirect" name="redirect" value="{{ redirect_url }}">
<fieldset>
<div class="pure-control-group">
<label for="password">{{ _('Password') }}</label>

View File

@@ -110,3 +110,42 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
assert res.status_code == 200
assert b"Annulla" in res.data, "Italian text should persist across requests"
def test_set_language_with_redirect(client, live_server, measure_memory_usage, datastore_path):
"""
Test that changing language keeps the user on the same page.
Example: User is on /settings, changes language, stays on /settings.
"""
from flask import url_for
# Set language with a redirect parameter (simulating language change from /settings)
res = client.get(
url_for("set_language", locale="de", redirect="/settings"),
follow_redirects=False
)
# Should redirect back to settings
assert res.status_code in [302, 303]
assert '/settings' in res.location
# Verify language was set in session
with client.session_transaction() as sess:
assert sess.get('locale') == 'de'
# Test with invalid locale (should still redirect safely)
res = client.get(
url_for("set_language", locale="invalid_locale", redirect="/settings"),
follow_redirects=False
)
assert res.status_code in [302, 303]
assert '/settings' in res.location
# Test with malicious redirect (should default to watchlist)
res = client.get(
url_for("set_language", locale="en", redirect="https://evil.com"),
follow_redirects=False
)
assert res.status_code in [302, 303]
# Should not redirect to evil.com
assert 'evil.com' not in res.location

View File

@@ -192,3 +192,289 @@ def test_xss_watch_last_error(client, live_server, measure_memory_usage, datasto
assert b'&lt;a href=&#34;https://foobar&#34;&gt;&lt;/a&gt;&lt;script&gt;alert(123);&lt;/script&gt;' in res.data
assert b"https://foobar" in res.data # this text should be there
def test_login_redirect_safe_urls(client, live_server, measure_memory_usage, datastore_path):
"""
Test that safe redirect URLs work correctly in login flow.
This verifies the fix for open redirect vulnerabilities while maintaining
legitimate redirect functionality for both authenticated and unauthenticated users.
"""
# Test 1: Accessing /login?redirect=/settings when not logged in
# Should show the login form with redirect parameter preserved
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# Check that the redirect is preserved in the hidden form field
assert b'name="redirect"' in res.data
# Test 2: Valid internal redirect with query parameters
res = client.get(
url_for("login", redirect="/settings?tab=notifications"),
follow_redirects=False
)
assert res.status_code == 200
# Check that the redirect is preserved
assert b'value="/settings?tab=notifications"' in res.data
# Test 3: Malicious external URL should be blocked and default to watchlist
res = client.get(
url_for("login", redirect="https://evil.com/phishing"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# The redirect parameter in the form should NOT contain the evil URL
# Check the actual input value, not just anywhere in the page
assert b'value="https://evil.com' not in res.data
assert b'value="/evil.com' not in res.data
assert b'name="redirect"' in res.data
# Test 4: Double-slash attack should be blocked
res = client.get(
url_for("login", redirect="//evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//evil.com"' not in res.data
# Test 5: Protocol handler exploit should be blocked
res = client.get(
url_for("login", redirect="javascript:alert(document.domain)"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have javascript: in the redirect input value
assert b'value="javascript:' not in res.data
# Test 6: At-symbol obfuscation attack should be blocked
res = client.get(
url_for("login", redirect="//@evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//@evil.com"' not in res.data
# Test 7: Multiple slashes attack should be blocked
res = client.get(
url_for("login", redirect="////evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="////evil.com"' not in res.data
def test_login_redirect_with_password(client, live_server, measure_memory_usage, datastore_path):
"""
Test that redirect functionality works correctly when a password is set.
This ensures that notifications can always link to /login and users will
be redirected to the correct page after authentication.
"""
# Set a password
from changedetectionio import store
import base64
import hashlib
# Generate a test password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
# Set the password in the datastore
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Test 1: Try to access /login?redirect=/settings without being logged in
# Should show login form and preserve redirect parameter
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
assert res.status_code == 200
assert b"Password" in res.data
# Check that redirect parameter is preserved in the form
assert b'name="redirect"' in res.data
assert b'value="/settings"' in res.data
# Test 2: Submit correct password with redirect parameter
# Should redirect to /settings after successful login
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
# Should be on settings page
assert b"Settings" in res.data or b"settings" in res.data
# Test 3: Now that we're logged in, accessing /login?redirect=/settings
# should redirect immediately without showing login form
res = client.get(
url_for("login", redirect="/"),
follow_redirects=True
)
assert res.status_code == 200
assert b"Already logged in" in res.data
# Test 4: Malicious redirect should be blocked even with correct password
res = client.post(
url_for("login"),
data={"password": password, "redirect": "https://evil.com"},
follow_redirects=True
)
# Should redirect to watchlist index instead of evil.com
assert b"evil.com" not in res.data
# Logout for cleanup
client.get(url_for("logout"))
# Test 5: Incorrect password with redirect should stay on login page
res = client.post(
url_for("login"),
data={"password": "wrongpassword", "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
assert b"Incorrect password" in res.data or b"password" in res.data
# Clear the password
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_login_redirect_from_protected_page(client, live_server, measure_memory_usage, datastore_path):
"""
Test the complete redirect flow: accessing a protected page while logged out
should redirect to login with the page URL, then redirect back after login.
This is the real-world scenario where users try to access /edit/uuid or /settings
and need to login first.
"""
import base64
import hashlib
# Add a watch first
set_original_response(datastore_path=datastore_path)
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Set a password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Logout to ensure we're not authenticated
client.get(url_for("logout"))
# Try to access a protected page (edit page for first watch)
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
follow_redirects=False
)
# Should redirect to login with the edit page as redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved in the login form
# It should contain the edit page URL
assert b'name="redirect"' in res.data
assert b'value="/edit/first"' in res.data or b'value="%2Fedit%2Ffirst"' in res.data
# Now login with correct password and the redirect parameter
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/edit/first"},
follow_redirects=False
)
# Should redirect to the edit page
assert res.status_code in [302, 303]
assert '/edit/first' in res.location
# Follow the redirect to verify we're on the edit page
res = client.get(res.location, follow_redirects=True)
assert res.status_code == 200
# Should see edit page content
assert b'Edit' in res.data or b'Watching' in res.data
# Cleanup
client.get(url_for("logout"))
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_logout_with_redirect(client, live_server, measure_memory_usage, datastore_path):
"""
Test that logout preserves the current page URL, so after re-login
the user returns to where they were before logging out.
Example: User is on /edit/uuid, clicks logout, then logs back in and
returns to /edit/uuid.
"""
import base64
import hashlib
# Set a password and login
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Login
res = client.post(
url_for("login"),
data={"password": password},
follow_redirects=True
)
assert res.status_code == 200
# Now logout with a redirect parameter (simulating logout from /settings)
res = client.get(
url_for("logout", redirect="/settings"),
follow_redirects=False
)
# Should redirect to login with the redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved
assert b'value="/settings"' in res.data or b'value="%2Fsettings"' in res.data
# Login again with the redirect
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=False
)
# Should redirect back to settings
assert res.status_code in [302, 303]
assert '/settings' in res.location or 'settings' in res.location
# Cleanup
del client.application.config['DATASTORE'].data['settings']['application']['password']