mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-17 12:40:37 +00:00
Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9a3dba3d5c | ||
|
|
2e447b47f1 |
@@ -26,6 +26,7 @@ from flask import (
|
|||||||
session,
|
session,
|
||||||
url_for,
|
url_for,
|
||||||
)
|
)
|
||||||
|
from urllib.parse import urlparse
|
||||||
from flask_compress import Compress as FlaskCompress
|
from flask_compress import Compress as FlaskCompress
|
||||||
from flask_login import current_user
|
from flask_login import current_user
|
||||||
from flask_restful import abort, Api
|
from flask_restful import abort, Api
|
||||||
@@ -350,6 +351,13 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
global datastore, socketio_server
|
global datastore, socketio_server
|
||||||
datastore = datastore_o
|
datastore = datastore_o
|
||||||
|
|
||||||
|
# Import and create a wrapper for is_safe_url that has access to app
|
||||||
|
from changedetectionio.is_safe_url import is_safe_url as _is_safe_url
|
||||||
|
|
||||||
|
def is_safe_url(target):
|
||||||
|
"""Wrapper for is_safe_url that passes the app instance"""
|
||||||
|
return _is_safe_url(target, app)
|
||||||
|
|
||||||
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
||||||
# (instead of the global var)
|
# (instead of the global var)
|
||||||
app.config['DATASTORE'] = datastore_o
|
app.config['DATASTORE'] = datastore_o
|
||||||
@@ -471,11 +479,21 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
@login_manager.unauthorized_handler
|
@login_manager.unauthorized_handler
|
||||||
def unauthorized_handler():
|
def unauthorized_handler():
|
||||||
return redirect(url_for('login', next=url_for('watchlist.index')))
|
# Pass the current request path so users are redirected back after login
|
||||||
|
return redirect(url_for('login', redirect=request.path))
|
||||||
|
|
||||||
@app.route('/logout')
|
@app.route('/logout')
|
||||||
def logout():
|
def logout():
|
||||||
flask_login.logout_user()
|
flask_login.logout_user()
|
||||||
|
|
||||||
|
# Check if there's a redirect parameter to return to after re-login
|
||||||
|
redirect_url = request.args.get('redirect')
|
||||||
|
|
||||||
|
# If redirect is provided and safe, pass it to login page
|
||||||
|
if redirect_url and is_safe_url(redirect_url):
|
||||||
|
return redirect(url_for('login', redirect=redirect_url))
|
||||||
|
|
||||||
|
# Otherwise just go to watchlist
|
||||||
return redirect(url_for('watchlist.index'))
|
return redirect(url_for('watchlist.index'))
|
||||||
|
|
||||||
@app.route('/set-language/<locale>')
|
@app.route('/set-language/<locale>')
|
||||||
@@ -487,20 +505,36 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
else:
|
else:
|
||||||
logger.error(f"Invalid locale {locale}, available: {language_codes}")
|
logger.error(f"Invalid locale {locale}, available: {language_codes}")
|
||||||
|
|
||||||
# Redirect back to the page they came from, or home
|
# Check if there's a redirect parameter to return to the same page
|
||||||
|
redirect_url = request.args.get('redirect')
|
||||||
|
|
||||||
|
# If redirect is provided and safe, use it
|
||||||
|
if redirect_url and is_safe_url(redirect_url):
|
||||||
|
return redirect(redirect_url)
|
||||||
|
|
||||||
|
# Otherwise redirect to watchlist
|
||||||
return redirect(url_for('watchlist.index'))
|
return redirect(url_for('watchlist.index'))
|
||||||
|
|
||||||
# https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39
|
# https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39
|
||||||
# You can divide up the stuff like this
|
# You can divide up the stuff like this
|
||||||
@app.route('/login', methods=['GET', 'POST'])
|
@app.route('/login', methods=['GET', 'POST'])
|
||||||
def login():
|
def login():
|
||||||
|
# Extract and validate the redirect parameter
|
||||||
|
redirect_url = request.args.get('redirect') or request.form.get('redirect')
|
||||||
|
|
||||||
|
# Validate the redirect URL - default to watchlist if invalid
|
||||||
|
if redirect_url and is_safe_url(redirect_url):
|
||||||
|
validated_redirect = redirect_url
|
||||||
|
else:
|
||||||
|
validated_redirect = url_for('watchlist.index')
|
||||||
|
|
||||||
if request.method == 'GET':
|
if request.method == 'GET':
|
||||||
if flask_login.current_user.is_authenticated:
|
if flask_login.current_user.is_authenticated:
|
||||||
|
# Already logged in - redirect immediately to the target
|
||||||
flash(gettext("Already logged in"))
|
flash(gettext("Already logged in"))
|
||||||
return redirect(url_for("watchlist.index"))
|
return redirect(validated_redirect)
|
||||||
flash(gettext("You must be logged in, please log in."), 'error')
|
flash(gettext("You must be logged in, please log in."), 'error')
|
||||||
output = render_template("login.html")
|
output = render_template("login.html", redirect_url=validated_redirect)
|
||||||
return output
|
return output
|
||||||
|
|
||||||
user = User()
|
user = User()
|
||||||
@@ -510,23 +544,13 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
if (user.check_password(password)):
|
if (user.check_password(password)):
|
||||||
flask_login.login_user(user, remember=True)
|
flask_login.login_user(user, remember=True)
|
||||||
|
# Redirect to the validated URL after successful login
|
||||||
# For now there's nothing else interesting here other than the index/list page
|
return redirect(validated_redirect)
|
||||||
# It's more reliable and safe to ignore the 'next' redirect
|
|
||||||
# When we used...
|
|
||||||
# next = request.args.get('next')
|
|
||||||
# return redirect(next or url_for('watchlist.index'))
|
|
||||||
# We would sometimes get login loop errors on sites hosted in sub-paths
|
|
||||||
|
|
||||||
# note for the future:
|
|
||||||
# if not is_safe_valid_url(next):
|
|
||||||
# return flask.abort(400)
|
|
||||||
return redirect(url_for('watchlist.index'))
|
|
||||||
|
|
||||||
else:
|
else:
|
||||||
flash(gettext('Incorrect password'), 'error')
|
flash(gettext('Incorrect password'), 'error')
|
||||||
|
|
||||||
return redirect(url_for('login'))
|
return redirect(url_for('login', redirect=redirect_url if redirect_url else None))
|
||||||
|
|
||||||
@app.before_request
|
@app.before_request
|
||||||
def before_request_handle_cookie_x_settings():
|
def before_request_handle_cookie_x_settings():
|
||||||
|
|||||||
113
changedetectionio/is_safe_url.py
Normal file
113
changedetectionio/is_safe_url.py
Normal file
@@ -0,0 +1,113 @@
|
|||||||
|
"""
|
||||||
|
URL redirect validation module for preventing open redirect vulnerabilities.
|
||||||
|
|
||||||
|
This module provides functionality to safely validate redirect URLs, ensuring they:
|
||||||
|
1. Point to internal routes only (no external redirects)
|
||||||
|
2. Are properly normalized (preventing browser parsing differences)
|
||||||
|
3. Match registered Flask routes (no fake/non-existent pages)
|
||||||
|
4. Are fully logged for security monitoring
|
||||||
|
|
||||||
|
References:
|
||||||
|
- https://flask-login.readthedocs.io/ (safe redirect patterns)
|
||||||
|
- https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-v-user-logins
|
||||||
|
- https://www.pythonkitchen.com/how-prevent-open-redirect-vulnerab-flask/
|
||||||
|
"""
|
||||||
|
|
||||||
|
from urllib.parse import urlparse, urljoin
|
||||||
|
from flask import request
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
|
||||||
|
def is_safe_url(target, app):
|
||||||
|
"""
|
||||||
|
Validate that a redirect URL is safe to prevent open redirect vulnerabilities.
|
||||||
|
|
||||||
|
This follows Flask/Werkzeug best practices by ensuring the redirect URL:
|
||||||
|
1. Is a relative path starting with exactly one '/'
|
||||||
|
2. Does not start with '//' (double-slash attack)
|
||||||
|
3. Has no external protocol handlers
|
||||||
|
4. Points to a valid registered route in the application
|
||||||
|
5. Is properly normalized to prevent browser parsing differences
|
||||||
|
|
||||||
|
Args:
|
||||||
|
target: The URL to validate (e.g., '/settings', '/login#top')
|
||||||
|
app: The Flask application instance (needed for route validation)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if the URL is safe for redirection, False otherwise
|
||||||
|
|
||||||
|
Examples:
|
||||||
|
>>> is_safe_url('/settings', app)
|
||||||
|
True
|
||||||
|
>>> is_safe_url('//evil.com', app)
|
||||||
|
False
|
||||||
|
>>> is_safe_url('/settings#general', app)
|
||||||
|
True
|
||||||
|
>>> is_safe_url('/fake-page', app)
|
||||||
|
False
|
||||||
|
"""
|
||||||
|
if not target:
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Normalize the URL to prevent browser parsing differences
|
||||||
|
# Strip whitespace and replace backslashes (which some browsers interpret as forward slashes)
|
||||||
|
target = target.strip()
|
||||||
|
target = target.replace('\\', '/')
|
||||||
|
|
||||||
|
# First, check if it starts with // or more (double-slash attack)
|
||||||
|
if target.startswith('//'):
|
||||||
|
logger.warning(f"Blocked redirect attempt with double-slash: {target}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Parse the URL to check for scheme and netloc
|
||||||
|
parsed = urlparse(target)
|
||||||
|
|
||||||
|
# Block any URL with a scheme (http://, https://, javascript:, etc.)
|
||||||
|
if parsed.scheme:
|
||||||
|
logger.warning(f"Blocked redirect attempt with scheme: {target}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Block any URL with a network location (netloc)
|
||||||
|
# This catches patterns like //evil.com, user@host, etc.
|
||||||
|
if parsed.netloc:
|
||||||
|
logger.warning(f"Blocked redirect attempt with netloc: {target}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# At this point, we have a relative URL with no scheme or netloc
|
||||||
|
# Use urljoin to resolve it and verify it points to the same host
|
||||||
|
ref_url = urlparse(request.host_url)
|
||||||
|
test_url = urlparse(urljoin(request.host_url, target))
|
||||||
|
|
||||||
|
# Check: ensure the resolved URL has the same netloc as current host
|
||||||
|
if not (test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc):
|
||||||
|
logger.warning(f"Blocked redirect attempt with mismatched netloc: {target}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Additional validation: Check if the URL matches a registered route
|
||||||
|
# This prevents redirects to non-existent pages or unintended endpoints
|
||||||
|
try:
|
||||||
|
# Get the path without query string and fragment
|
||||||
|
# Fragments (like #general) are automatically stripped by urlparse
|
||||||
|
path = parsed.path
|
||||||
|
|
||||||
|
# Create a URL adapter bound to the server name
|
||||||
|
adapter = app.url_map.bind(ref_url.netloc)
|
||||||
|
|
||||||
|
# Try to match the path to a registered route
|
||||||
|
# This will raise NotFound if the route doesn't exist
|
||||||
|
endpoint, values = adapter.match(path, return_rule=False)
|
||||||
|
|
||||||
|
# Block redirects to static file endpoints - these are catch-all routes
|
||||||
|
# that would match arbitrary paths, potentially allowing unintended redirects
|
||||||
|
if endpoint in ('static_content', 'static', 'static_flags'):
|
||||||
|
logger.warning(f"Blocked redirect to static endpoint: {target}")
|
||||||
|
return False
|
||||||
|
|
||||||
|
# Successfully matched a valid route
|
||||||
|
logger.debug(f"Validated safe redirect to endpoint '{endpoint}': {target}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
# Route doesn't exist or can't be matched
|
||||||
|
logger.warning(f"Blocked redirect to non-existent route: {target} (error: {e})")
|
||||||
|
return False
|
||||||
@@ -97,7 +97,7 @@
|
|||||||
{% endif %}
|
{% endif %}
|
||||||
{% if current_user.is_authenticated %}
|
{% if current_user.is_authenticated %}
|
||||||
<li class="pure-menu-item">
|
<li class="pure-menu-item">
|
||||||
<a href="{{url_for('logout')}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
|
<a href="{{url_for('logout', redirect=request.path)}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
|
||||||
</li>
|
</li>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% if current_user.is_authenticated or not has_password %}
|
{% if current_user.is_authenticated or not has_password %}
|
||||||
@@ -261,7 +261,7 @@
|
|||||||
<div class="modal-body">
|
<div class="modal-body">
|
||||||
<div class="language-list">
|
<div class="language-list">
|
||||||
{% for locale, lang_data in available_languages.items()|sort %}
|
{% for locale, lang_data in available_languages.items()|sort %}
|
||||||
<a href="{{ url_for('set_language', locale=locale) }}" class="language-option" data-locale="{{ locale }}">
|
<a href="{{ url_for('set_language', locale=locale, redirect=request.path) }}" class="language-option" data-locale="{{ locale }}">
|
||||||
<span class="{{ lang_data.flag }}" style="display: inline-block; width: 1.5em; height: 1.5em; vertical-align: middle; margin-right: 0.5em; border-radius: 50%; overflow: hidden;"></span> <span class="language-name">{{ lang_data.name }}</span>
|
<span class="{{ lang_data.flag }}" style="display: inline-block; width: 1.5em; height: 1.5em; vertical-align: middle; margin-right: 0.5em; border-radius: 50%; overflow: hidden;"></span> <span class="language-name">{{ lang_data.name }}</span>
|
||||||
</a>
|
</a>
|
||||||
{% endfor %}
|
{% endfor %}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@
|
|||||||
<div class="inner">
|
<div class="inner">
|
||||||
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
|
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
|
||||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||||
|
<input type="hidden" id="redirect" name="redirect" value="{{ redirect_url }}">
|
||||||
<fieldset>
|
<fieldset>
|
||||||
<div class="pure-control-group">
|
<div class="pure-control-group">
|
||||||
<label for="password">{{ _('Password') }}</label>
|
<label for="password">{{ _('Password') }}</label>
|
||||||
|
|||||||
@@ -110,3 +110,42 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
|
|||||||
|
|
||||||
assert res.status_code == 200
|
assert res.status_code == 200
|
||||||
assert b"Annulla" in res.data, "Italian text should persist across requests"
|
assert b"Annulla" in res.data, "Italian text should persist across requests"
|
||||||
|
|
||||||
|
|
||||||
|
def test_set_language_with_redirect(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that changing language keeps the user on the same page.
|
||||||
|
Example: User is on /settings, changes language, stays on /settings.
|
||||||
|
"""
|
||||||
|
from flask import url_for
|
||||||
|
|
||||||
|
# Set language with a redirect parameter (simulating language change from /settings)
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="de", redirect="/settings"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should redirect back to settings
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/settings' in res.location
|
||||||
|
|
||||||
|
# Verify language was set in session
|
||||||
|
with client.session_transaction() as sess:
|
||||||
|
assert sess.get('locale') == 'de'
|
||||||
|
|
||||||
|
# Test with invalid locale (should still redirect safely)
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="invalid_locale", redirect="/settings"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/settings' in res.location
|
||||||
|
|
||||||
|
# Test with malicious redirect (should default to watchlist)
|
||||||
|
res = client.get(
|
||||||
|
url_for("set_language", locale="en", redirect="https://evil.com"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
# Should not redirect to evil.com
|
||||||
|
assert 'evil.com' not in res.location
|
||||||
|
|||||||
@@ -192,3 +192,289 @@ def test_xss_watch_last_error(client, live_server, measure_memory_usage, datasto
|
|||||||
assert b'<a href="https://foobar"></a><script>alert(123);</script>' in res.data
|
assert b'<a href="https://foobar"></a><script>alert(123);</script>' in res.data
|
||||||
assert b"https://foobar" in res.data # this text should be there
|
assert b"https://foobar" in res.data # this text should be there
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_redirect_safe_urls(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that safe redirect URLs work correctly in login flow.
|
||||||
|
This verifies the fix for open redirect vulnerabilities while maintaining
|
||||||
|
legitimate redirect functionality for both authenticated and unauthenticated users.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Test 1: Accessing /login?redirect=/settings when not logged in
|
||||||
|
# Should show the login form with redirect parameter preserved
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="/settings"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
# Should show login form
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Check that the redirect is preserved in the hidden form field
|
||||||
|
assert b'name="redirect"' in res.data
|
||||||
|
|
||||||
|
# Test 2: Valid internal redirect with query parameters
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="/settings?tab=notifications"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Check that the redirect is preserved
|
||||||
|
assert b'value="/settings?tab=notifications"' in res.data
|
||||||
|
|
||||||
|
# Test 3: Malicious external URL should be blocked and default to watchlist
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="https://evil.com/phishing"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
# Should show login form
|
||||||
|
assert res.status_code == 200
|
||||||
|
# The redirect parameter in the form should NOT contain the evil URL
|
||||||
|
# Check the actual input value, not just anywhere in the page
|
||||||
|
assert b'value="https://evil.com' not in res.data
|
||||||
|
assert b'value="/evil.com' not in res.data
|
||||||
|
assert b'name="redirect"' in res.data
|
||||||
|
|
||||||
|
# Test 4: Double-slash attack should be blocked
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="//evil.com"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should not have the malicious URL in the redirect input value
|
||||||
|
assert b'value="//evil.com"' not in res.data
|
||||||
|
|
||||||
|
# Test 5: Protocol handler exploit should be blocked
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="javascript:alert(document.domain)"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should not have javascript: in the redirect input value
|
||||||
|
assert b'value="javascript:' not in res.data
|
||||||
|
|
||||||
|
# Test 6: At-symbol obfuscation attack should be blocked
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="//@evil.com"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should not have the malicious URL in the redirect input value
|
||||||
|
assert b'value="//@evil.com"' not in res.data
|
||||||
|
|
||||||
|
# Test 7: Multiple slashes attack should be blocked
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="////evil.com"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should not have the malicious URL in the redirect input value
|
||||||
|
assert b'value="////evil.com"' not in res.data
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_redirect_with_password(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that redirect functionality works correctly when a password is set.
|
||||||
|
This ensures that notifications can always link to /login and users will
|
||||||
|
be redirected to the correct page after authentication.
|
||||||
|
"""
|
||||||
|
|
||||||
|
# Set a password
|
||||||
|
from changedetectionio import store
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
# Generate a test password
|
||||||
|
password = "test123"
|
||||||
|
salt = os.urandom(32)
|
||||||
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||||
|
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||||
|
|
||||||
|
# Set the password in the datastore
|
||||||
|
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||||
|
|
||||||
|
# Test 1: Try to access /login?redirect=/settings without being logged in
|
||||||
|
# Should show login form and preserve redirect parameter
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="/settings"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"Password" in res.data
|
||||||
|
# Check that redirect parameter is preserved in the form
|
||||||
|
assert b'name="redirect"' in res.data
|
||||||
|
assert b'value="/settings"' in res.data
|
||||||
|
|
||||||
|
# Test 2: Submit correct password with redirect parameter
|
||||||
|
# Should redirect to /settings after successful login
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": password, "redirect": "/settings"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should be on settings page
|
||||||
|
assert b"Settings" in res.data or b"settings" in res.data
|
||||||
|
|
||||||
|
# Test 3: Now that we're logged in, accessing /login?redirect=/settings
|
||||||
|
# should redirect immediately without showing login form
|
||||||
|
res = client.get(
|
||||||
|
url_for("login", redirect="/"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"Already logged in" in res.data
|
||||||
|
|
||||||
|
# Test 4: Malicious redirect should be blocked even with correct password
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": password, "redirect": "https://evil.com"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
# Should redirect to watchlist index instead of evil.com
|
||||||
|
assert b"evil.com" not in res.data
|
||||||
|
|
||||||
|
# Logout for cleanup
|
||||||
|
client.get(url_for("logout"))
|
||||||
|
|
||||||
|
# Test 5: Incorrect password with redirect should stay on login page
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": "wrongpassword", "redirect": "/settings"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b"Incorrect password" in res.data or b"password" in res.data
|
||||||
|
|
||||||
|
# Clear the password
|
||||||
|
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||||
|
|
||||||
|
|
||||||
|
def test_login_redirect_from_protected_page(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test the complete redirect flow: accessing a protected page while logged out
|
||||||
|
should redirect to login with the page URL, then redirect back after login.
|
||||||
|
This is the real-world scenario where users try to access /edit/uuid or /settings
|
||||||
|
and need to login first.
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
# Add a watch first
|
||||||
|
set_original_response(datastore_path=datastore_path)
|
||||||
|
res = client.post(
|
||||||
|
url_for("imports.import_page"),
|
||||||
|
data={"urls": url_for('test_endpoint', _external=True)},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Set a password
|
||||||
|
password = "test123"
|
||||||
|
salt = os.urandom(32)
|
||||||
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||||
|
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||||
|
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||||
|
|
||||||
|
# Logout to ensure we're not authenticated
|
||||||
|
client.get(url_for("logout"))
|
||||||
|
|
||||||
|
# Try to access a protected page (edit page for first watch)
|
||||||
|
res = client.get(
|
||||||
|
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should redirect to login with the edit page as redirect parameter
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/login' in res.location
|
||||||
|
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
|
||||||
|
|
||||||
|
# Follow the redirect to login page
|
||||||
|
res = client.get(res.location, follow_redirects=False)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b'Password' in res.data
|
||||||
|
|
||||||
|
# The redirect parameter should be preserved in the login form
|
||||||
|
# It should contain the edit page URL
|
||||||
|
assert b'name="redirect"' in res.data
|
||||||
|
assert b'value="/edit/first"' in res.data or b'value="%2Fedit%2Ffirst"' in res.data
|
||||||
|
|
||||||
|
# Now login with correct password and the redirect parameter
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": password, "redirect": "/edit/first"},
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should redirect to the edit page
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/edit/first' in res.location
|
||||||
|
|
||||||
|
# Follow the redirect to verify we're on the edit page
|
||||||
|
res = client.get(res.location, follow_redirects=True)
|
||||||
|
assert res.status_code == 200
|
||||||
|
# Should see edit page content
|
||||||
|
assert b'Edit' in res.data or b'Watching' in res.data
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
client.get(url_for("logout"))
|
||||||
|
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||||
|
|
||||||
|
|
||||||
|
def test_logout_with_redirect(client, live_server, measure_memory_usage, datastore_path):
|
||||||
|
"""
|
||||||
|
Test that logout preserves the current page URL, so after re-login
|
||||||
|
the user returns to where they were before logging out.
|
||||||
|
Example: User is on /edit/uuid, clicks logout, then logs back in and
|
||||||
|
returns to /edit/uuid.
|
||||||
|
"""
|
||||||
|
import base64
|
||||||
|
import hashlib
|
||||||
|
|
||||||
|
# Set a password and login
|
||||||
|
password = "test123"
|
||||||
|
salt = os.urandom(32)
|
||||||
|
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||||
|
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||||
|
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||||
|
|
||||||
|
# Login
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": password},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert res.status_code == 200
|
||||||
|
|
||||||
|
# Now logout with a redirect parameter (simulating logout from /settings)
|
||||||
|
res = client.get(
|
||||||
|
url_for("logout", redirect="/settings"),
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should redirect to login with the redirect parameter
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/login' in res.location
|
||||||
|
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
|
||||||
|
|
||||||
|
# Follow the redirect to login page
|
||||||
|
res = client.get(res.location, follow_redirects=False)
|
||||||
|
assert res.status_code == 200
|
||||||
|
assert b'Password' in res.data
|
||||||
|
# The redirect parameter should be preserved
|
||||||
|
assert b'value="/settings"' in res.data or b'value="%2Fsettings"' in res.data
|
||||||
|
|
||||||
|
# Login again with the redirect
|
||||||
|
res = client.post(
|
||||||
|
url_for("login"),
|
||||||
|
data={"password": password, "redirect": "/settings"},
|
||||||
|
follow_redirects=False
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should redirect back to settings
|
||||||
|
assert res.status_code in [302, 303]
|
||||||
|
assert '/settings' in res.location or 'settings' in res.location
|
||||||
|
|
||||||
|
# Cleanup
|
||||||
|
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user