mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-03 05:40:23 +00:00
Compare commits
4 Commits
resilient-
...
2600-mode-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
e16e72ee07 | ||
|
|
b1257dd196 | ||
|
|
7e61f5b663 | ||
|
|
afa8451448 |
@@ -204,7 +204,7 @@ class fetcher(Fetcher):
|
||||
import re
|
||||
self.delete_browser_steps_screenshots()
|
||||
|
||||
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
|
||||
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 12)) + self.render_extract_delay
|
||||
extra_wait = min(n, 15)
|
||||
|
||||
logger.debug(f"Extra wait set to {extra_wait}s, requested was {n}s.")
|
||||
@@ -288,28 +288,27 @@ class fetcher(Fetcher):
|
||||
# Enable Network domain to detect when first bytes arrive
|
||||
await self.page._client.send('Network.enable')
|
||||
|
||||
# Now set up the frame navigation handlers
|
||||
async def handle_frame_navigation(event=None):
|
||||
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
|
||||
logger.debug(f"Frame navigated: {event}")
|
||||
w = extra_wait - 2 if extra_wait > 4 else 2
|
||||
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
|
||||
await asyncio.sleep(w)
|
||||
logger.debug("Issuing stopLoading command...")
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
logger.debug("stopLoading command sent!")
|
||||
|
||||
async def setup_frame_handlers_on_first_response(event):
|
||||
# Only trigger for the main document response
|
||||
if event.get('type') == 'Document':
|
||||
logger.debug("First response received, setting up frame handlers for forced page stop load.")
|
||||
|
||||
# De-register this listener - we only need it once
|
||||
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
|
||||
# Now set up the frame navigation handlers
|
||||
async def handle_frame_navigation(event):
|
||||
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
|
||||
logger.debug(f"Frame navigated: {event}")
|
||||
w = extra_wait - 2 if extra_wait > 4 else 2
|
||||
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
|
||||
await asyncio.sleep(w)
|
||||
logger.debug("Issuing stopLoading command...")
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
logger.debug("stopLoading command sent!")
|
||||
|
||||
self.page._client.on('Page.frameStartedNavigating', lambda e: asyncio.create_task(handle_frame_navigation(e)))
|
||||
self.page._client.on('Page.frameStartedLoading', lambda e: asyncio.create_task(handle_frame_navigation(e)))
|
||||
self.page._client.on('Page.frameStoppedLoading', lambda e: logger.debug(f"Frame stopped loading: {e}"))
|
||||
logger.debug("First response received, setting up frame handlers for forced page stop load DONE SETUP")
|
||||
# De-register this listener - we only need it once
|
||||
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
|
||||
# Listen for first response to trigger frame handler setup
|
||||
self.page._client.on('Network.responseReceived', setup_frame_handlers_on_first_response)
|
||||
@@ -318,8 +317,11 @@ class fetcher(Fetcher):
|
||||
attempt=0
|
||||
while not response:
|
||||
logger.debug(f"Attempting page fetch {url} attempt {attempt}")
|
||||
asyncio.create_task(handle_frame_navigation())
|
||||
response = await self.page.goto(url, timeout=0)
|
||||
await asyncio.sleep(1 + extra_wait)
|
||||
await self.page._client.send('Page.stopLoading')
|
||||
|
||||
if response:
|
||||
break
|
||||
if not response:
|
||||
|
||||
@@ -26,6 +26,7 @@ from flask import (
|
||||
session,
|
||||
url_for,
|
||||
)
|
||||
from urllib.parse import urlparse
|
||||
from flask_compress import Compress as FlaskCompress
|
||||
from flask_login import current_user
|
||||
from flask_restful import abort, Api
|
||||
@@ -350,6 +351,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
global datastore, socketio_server
|
||||
datastore = datastore_o
|
||||
|
||||
# Import and create a wrapper for is_safe_url that has access to app
|
||||
from changedetectionio.is_safe_url import is_safe_url as _is_safe_url
|
||||
|
||||
def is_safe_url(target):
|
||||
"""Wrapper for is_safe_url that passes the app instance"""
|
||||
return _is_safe_url(target, app)
|
||||
|
||||
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
||||
# (instead of the global var)
|
||||
app.config['DATASTORE'] = datastore_o
|
||||
@@ -471,11 +479,21 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
@login_manager.unauthorized_handler
|
||||
def unauthorized_handler():
|
||||
return redirect(url_for('login', next=url_for('watchlist.index')))
|
||||
# Pass the current request path so users are redirected back after login
|
||||
return redirect(url_for('login', redirect=request.path))
|
||||
|
||||
@app.route('/logout')
|
||||
def logout():
|
||||
flask_login.logout_user()
|
||||
|
||||
# Check if there's a redirect parameter to return to after re-login
|
||||
redirect_url = request.args.get('redirect')
|
||||
|
||||
# If redirect is provided and safe, pass it to login page
|
||||
if redirect_url and is_safe_url(redirect_url):
|
||||
return redirect(url_for('login', redirect=redirect_url))
|
||||
|
||||
# Otherwise just go to watchlist
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
@app.route('/set-language/<locale>')
|
||||
@@ -487,20 +505,36 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
else:
|
||||
logger.error(f"Invalid locale {locale}, available: {language_codes}")
|
||||
|
||||
# Redirect back to the page they came from, or home
|
||||
# Check if there's a redirect parameter to return to the same page
|
||||
redirect_url = request.args.get('redirect')
|
||||
|
||||
# If redirect is provided and safe, use it
|
||||
if redirect_url and is_safe_url(redirect_url):
|
||||
return redirect(redirect_url)
|
||||
|
||||
# Otherwise redirect to watchlist
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
# https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39
|
||||
# You can divide up the stuff like this
|
||||
@app.route('/login', methods=['GET', 'POST'])
|
||||
def login():
|
||||
# Extract and validate the redirect parameter
|
||||
redirect_url = request.args.get('redirect') or request.form.get('redirect')
|
||||
|
||||
# Validate the redirect URL - default to watchlist if invalid
|
||||
if redirect_url and is_safe_url(redirect_url):
|
||||
validated_redirect = redirect_url
|
||||
else:
|
||||
validated_redirect = url_for('watchlist.index')
|
||||
|
||||
if request.method == 'GET':
|
||||
if flask_login.current_user.is_authenticated:
|
||||
# Already logged in - redirect immediately to the target
|
||||
flash(gettext("Already logged in"))
|
||||
return redirect(url_for("watchlist.index"))
|
||||
return redirect(validated_redirect)
|
||||
flash(gettext("You must be logged in, please log in."), 'error')
|
||||
output = render_template("login.html")
|
||||
output = render_template("login.html", redirect_url=validated_redirect)
|
||||
return output
|
||||
|
||||
user = User()
|
||||
@@ -510,23 +544,13 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
|
||||
if (user.check_password(password)):
|
||||
flask_login.login_user(user, remember=True)
|
||||
|
||||
# For now there's nothing else interesting here other than the index/list page
|
||||
# It's more reliable and safe to ignore the 'next' redirect
|
||||
# When we used...
|
||||
# next = request.args.get('next')
|
||||
# return redirect(next or url_for('watchlist.index'))
|
||||
# We would sometimes get login loop errors on sites hosted in sub-paths
|
||||
|
||||
# note for the future:
|
||||
# if not is_safe_valid_url(next):
|
||||
# return flask.abort(400)
|
||||
return redirect(url_for('watchlist.index'))
|
||||
# Redirect to the validated URL after successful login
|
||||
return redirect(validated_redirect)
|
||||
|
||||
else:
|
||||
flash(gettext('Incorrect password'), 'error')
|
||||
|
||||
return redirect(url_for('login'))
|
||||
return redirect(url_for('login', redirect=redirect_url if redirect_url else None))
|
||||
|
||||
@app.before_request
|
||||
def before_request_handle_cookie_x_settings():
|
||||
|
||||
113
changedetectionio/is_safe_url.py
Normal file
113
changedetectionio/is_safe_url.py
Normal file
@@ -0,0 +1,113 @@
|
||||
"""
|
||||
URL redirect validation module for preventing open redirect vulnerabilities.
|
||||
|
||||
This module provides functionality to safely validate redirect URLs, ensuring they:
|
||||
1. Point to internal routes only (no external redirects)
|
||||
2. Are properly normalized (preventing browser parsing differences)
|
||||
3. Match registered Flask routes (no fake/non-existent pages)
|
||||
4. Are fully logged for security monitoring
|
||||
|
||||
References:
|
||||
- https://flask-login.readthedocs.io/ (safe redirect patterns)
|
||||
- https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-v-user-logins
|
||||
- https://www.pythonkitchen.com/how-prevent-open-redirect-vulnerab-flask/
|
||||
"""
|
||||
|
||||
from urllib.parse import urlparse, urljoin
|
||||
from flask import request
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def is_safe_url(target, app):
|
||||
"""
|
||||
Validate that a redirect URL is safe to prevent open redirect vulnerabilities.
|
||||
|
||||
This follows Flask/Werkzeug best practices by ensuring the redirect URL:
|
||||
1. Is a relative path starting with exactly one '/'
|
||||
2. Does not start with '//' (double-slash attack)
|
||||
3. Has no external protocol handlers
|
||||
4. Points to a valid registered route in the application
|
||||
5. Is properly normalized to prevent browser parsing differences
|
||||
|
||||
Args:
|
||||
target: The URL to validate (e.g., '/settings', '/login#top')
|
||||
app: The Flask application instance (needed for route validation)
|
||||
|
||||
Returns:
|
||||
bool: True if the URL is safe for redirection, False otherwise
|
||||
|
||||
Examples:
|
||||
>>> is_safe_url('/settings', app)
|
||||
True
|
||||
>>> is_safe_url('//evil.com', app)
|
||||
False
|
||||
>>> is_safe_url('/settings#general', app)
|
||||
True
|
||||
>>> is_safe_url('/fake-page', app)
|
||||
False
|
||||
"""
|
||||
if not target:
|
||||
return False
|
||||
|
||||
# Normalize the URL to prevent browser parsing differences
|
||||
# Strip whitespace and replace backslashes (which some browsers interpret as forward slashes)
|
||||
target = target.strip()
|
||||
target = target.replace('\\', '/')
|
||||
|
||||
# First, check if it starts with // or more (double-slash attack)
|
||||
if target.startswith('//'):
|
||||
logger.warning(f"Blocked redirect attempt with double-slash: {target}")
|
||||
return False
|
||||
|
||||
# Parse the URL to check for scheme and netloc
|
||||
parsed = urlparse(target)
|
||||
|
||||
# Block any URL with a scheme (http://, https://, javascript:, etc.)
|
||||
if parsed.scheme:
|
||||
logger.warning(f"Blocked redirect attempt with scheme: {target}")
|
||||
return False
|
||||
|
||||
# Block any URL with a network location (netloc)
|
||||
# This catches patterns like //evil.com, user@host, etc.
|
||||
if parsed.netloc:
|
||||
logger.warning(f"Blocked redirect attempt with netloc: {target}")
|
||||
return False
|
||||
|
||||
# At this point, we have a relative URL with no scheme or netloc
|
||||
# Use urljoin to resolve it and verify it points to the same host
|
||||
ref_url = urlparse(request.host_url)
|
||||
test_url = urlparse(urljoin(request.host_url, target))
|
||||
|
||||
# Check: ensure the resolved URL has the same netloc as current host
|
||||
if not (test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc):
|
||||
logger.warning(f"Blocked redirect attempt with mismatched netloc: {target}")
|
||||
return False
|
||||
|
||||
# Additional validation: Check if the URL matches a registered route
|
||||
# This prevents redirects to non-existent pages or unintended endpoints
|
||||
try:
|
||||
# Get the path without query string and fragment
|
||||
# Fragments (like #general) are automatically stripped by urlparse
|
||||
path = parsed.path
|
||||
|
||||
# Create a URL adapter bound to the server name
|
||||
adapter = app.url_map.bind(ref_url.netloc)
|
||||
|
||||
# Try to match the path to a registered route
|
||||
# This will raise NotFound if the route doesn't exist
|
||||
endpoint, values = adapter.match(path, return_rule=False)
|
||||
|
||||
# Block redirects to static file endpoints - these are catch-all routes
|
||||
# that would match arbitrary paths, potentially allowing unintended redirects
|
||||
if endpoint in ('static_content', 'static', 'static_flags'):
|
||||
logger.warning(f"Blocked redirect to static endpoint: {target}")
|
||||
return False
|
||||
|
||||
# Successfully matched a valid route
|
||||
logger.debug(f"Validated safe redirect to endpoint '{endpoint}': {target}")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
# Route doesn't exist or can't be matched
|
||||
logger.warning(f"Blocked redirect to non-existent route: {target} (error: {e})")
|
||||
return False
|
||||
@@ -97,7 +97,7 @@
|
||||
{% endif %}
|
||||
{% if current_user.is_authenticated %}
|
||||
<li class="pure-menu-item">
|
||||
<a href="{{url_for('logout')}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
|
||||
<a href="{{url_for('logout', redirect=request.path)}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
|
||||
</li>
|
||||
{% endif %}
|
||||
{% if current_user.is_authenticated or not has_password %}
|
||||
@@ -261,7 +261,7 @@
|
||||
<div class="modal-body">
|
||||
<div class="language-list">
|
||||
{% for locale, lang_data in available_languages.items()|sort %}
|
||||
<a href="{{ url_for('set_language', locale=locale) }}" class="language-option" data-locale="{{ locale }}">
|
||||
<a href="{{ url_for('set_language', locale=locale, redirect=request.path) }}" class="language-option" data-locale="{{ locale }}">
|
||||
<span class="{{ lang_data.flag }}" style="display: inline-block; width: 1.5em; height: 1.5em; vertical-align: middle; margin-right: 0.5em; border-radius: 50%; overflow: hidden;"></span> <span class="language-name">{{ lang_data.name }}</span>
|
||||
</a>
|
||||
{% endfor %}
|
||||
|
||||
@@ -5,6 +5,7 @@
|
||||
<div class="inner">
|
||||
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
|
||||
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
|
||||
<input type="hidden" id="redirect" name="redirect" value="{{ redirect_url }}">
|
||||
<fieldset>
|
||||
<div class="pure-control-group">
|
||||
<label for="password">{{ _('Password') }}</label>
|
||||
|
||||
@@ -110,3 +110,42 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
|
||||
|
||||
assert res.status_code == 200
|
||||
assert b"Annulla" in res.data, "Italian text should persist across requests"
|
||||
|
||||
|
||||
def test_set_language_with_redirect(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that changing language keeps the user on the same page.
|
||||
Example: User is on /settings, changes language, stays on /settings.
|
||||
"""
|
||||
from flask import url_for
|
||||
|
||||
# Set language with a redirect parameter (simulating language change from /settings)
|
||||
res = client.get(
|
||||
url_for("set_language", locale="de", redirect="/settings"),
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
# Should redirect back to settings
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/settings' in res.location
|
||||
|
||||
# Verify language was set in session
|
||||
with client.session_transaction() as sess:
|
||||
assert sess.get('locale') == 'de'
|
||||
|
||||
# Test with invalid locale (should still redirect safely)
|
||||
res = client.get(
|
||||
url_for("set_language", locale="invalid_locale", redirect="/settings"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/settings' in res.location
|
||||
|
||||
# Test with malicious redirect (should default to watchlist)
|
||||
res = client.get(
|
||||
url_for("set_language", locale="en", redirect="https://evil.com"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code in [302, 303]
|
||||
# Should not redirect to evil.com
|
||||
assert 'evil.com' not in res.location
|
||||
|
||||
@@ -240,7 +240,6 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
|
||||
|
||||
|
||||
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -299,7 +298,26 @@ def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, d
|
||||
assert b'has-unread-changes' not in res.data
|
||||
|
||||
|
||||
# Re #2600 - Switch the mode to normal type and back, and see if the values stick..
|
||||
###################################################################################
|
||||
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||
data={"restock_settings-follow_price_changes": "y",
|
||||
"restock_settings-price_change_threshold_percent": 5.05,
|
||||
"processor": "text_json_diff",
|
||||
"url": test_url,
|
||||
'fetch_backend': "html_requests",
|
||||
"time_between_check_use_default": "y"
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch." in res.data
|
||||
# And back again
|
||||
live_server.app.config['DATASTORE'].data['watching'][uuid]['processor'] = 'restock_diff'
|
||||
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
|
||||
assert b'type="text" value="5.05"' in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -443,3 +461,4 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'ception' not in res.data
|
||||
assert b'155.55' in res.data
|
||||
|
||||
|
||||
@@ -192,3 +192,289 @@ def test_xss_watch_last_error(client, live_server, measure_memory_usage, datasto
|
||||
assert b'<a href="https://foobar"></a><script>alert(123);</script>' in res.data
|
||||
assert b"https://foobar" in res.data # this text should be there
|
||||
|
||||
|
||||
def test_login_redirect_safe_urls(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that safe redirect URLs work correctly in login flow.
|
||||
This verifies the fix for open redirect vulnerabilities while maintaining
|
||||
legitimate redirect functionality for both authenticated and unauthenticated users.
|
||||
"""
|
||||
|
||||
# Test 1: Accessing /login?redirect=/settings when not logged in
|
||||
# Should show the login form with redirect parameter preserved
|
||||
res = client.get(
|
||||
url_for("login", redirect="/settings"),
|
||||
follow_redirects=False
|
||||
)
|
||||
# Should show login form
|
||||
assert res.status_code == 200
|
||||
# Check that the redirect is preserved in the hidden form field
|
||||
assert b'name="redirect"' in res.data
|
||||
|
||||
# Test 2: Valid internal redirect with query parameters
|
||||
res = client.get(
|
||||
url_for("login", redirect="/settings?tab=notifications"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Check that the redirect is preserved
|
||||
assert b'value="/settings?tab=notifications"' in res.data
|
||||
|
||||
# Test 3: Malicious external URL should be blocked and default to watchlist
|
||||
res = client.get(
|
||||
url_for("login", redirect="https://evil.com/phishing"),
|
||||
follow_redirects=False
|
||||
)
|
||||
# Should show login form
|
||||
assert res.status_code == 200
|
||||
# The redirect parameter in the form should NOT contain the evil URL
|
||||
# Check the actual input value, not just anywhere in the page
|
||||
assert b'value="https://evil.com' not in res.data
|
||||
assert b'value="/evil.com' not in res.data
|
||||
assert b'name="redirect"' in res.data
|
||||
|
||||
# Test 4: Double-slash attack should be blocked
|
||||
res = client.get(
|
||||
url_for("login", redirect="//evil.com"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should not have the malicious URL in the redirect input value
|
||||
assert b'value="//evil.com"' not in res.data
|
||||
|
||||
# Test 5: Protocol handler exploit should be blocked
|
||||
res = client.get(
|
||||
url_for("login", redirect="javascript:alert(document.domain)"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should not have javascript: in the redirect input value
|
||||
assert b'value="javascript:' not in res.data
|
||||
|
||||
# Test 6: At-symbol obfuscation attack should be blocked
|
||||
res = client.get(
|
||||
url_for("login", redirect="//@evil.com"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should not have the malicious URL in the redirect input value
|
||||
assert b'value="//@evil.com"' not in res.data
|
||||
|
||||
# Test 7: Multiple slashes attack should be blocked
|
||||
res = client.get(
|
||||
url_for("login", redirect="////evil.com"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should not have the malicious URL in the redirect input value
|
||||
assert b'value="////evil.com"' not in res.data
|
||||
|
||||
|
||||
def test_login_redirect_with_password(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that redirect functionality works correctly when a password is set.
|
||||
This ensures that notifications can always link to /login and users will
|
||||
be redirected to the correct page after authentication.
|
||||
"""
|
||||
|
||||
# Set a password
|
||||
from changedetectionio import store
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
# Generate a test password
|
||||
password = "test123"
|
||||
salt = os.urandom(32)
|
||||
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||
|
||||
# Set the password in the datastore
|
||||
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||
|
||||
# Test 1: Try to access /login?redirect=/settings without being logged in
|
||||
# Should show login form and preserve redirect parameter
|
||||
res = client.get(
|
||||
url_for("login", redirect="/settings"),
|
||||
follow_redirects=False
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert b"Password" in res.data
|
||||
# Check that redirect parameter is preserved in the form
|
||||
assert b'name="redirect"' in res.data
|
||||
assert b'value="/settings"' in res.data
|
||||
|
||||
# Test 2: Submit correct password with redirect parameter
|
||||
# Should redirect to /settings after successful login
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": password, "redirect": "/settings"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
# Should be on settings page
|
||||
assert b"Settings" in res.data or b"settings" in res.data
|
||||
|
||||
# Test 3: Now that we're logged in, accessing /login?redirect=/settings
|
||||
# should redirect immediately without showing login form
|
||||
res = client.get(
|
||||
url_for("login", redirect="/"),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert b"Already logged in" in res.data
|
||||
|
||||
# Test 4: Malicious redirect should be blocked even with correct password
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": password, "redirect": "https://evil.com"},
|
||||
follow_redirects=True
|
||||
)
|
||||
# Should redirect to watchlist index instead of evil.com
|
||||
assert b"evil.com" not in res.data
|
||||
|
||||
# Logout for cleanup
|
||||
client.get(url_for("logout"))
|
||||
|
||||
# Test 5: Incorrect password with redirect should stay on login page
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": "wrongpassword", "redirect": "/settings"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
assert b"Incorrect password" in res.data or b"password" in res.data
|
||||
|
||||
# Clear the password
|
||||
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||
|
||||
|
||||
def test_login_redirect_from_protected_page(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test the complete redirect flow: accessing a protected page while logged out
|
||||
should redirect to login with the page URL, then redirect back after login.
|
||||
This is the real-world scenario where users try to access /edit/uuid or /settings
|
||||
and need to login first.
|
||||
"""
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
# Add a watch first
|
||||
set_original_response(datastore_path=datastore_path)
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": url_for('test_endpoint', _external=True)},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"1 Imported" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Set a password
|
||||
password = "test123"
|
||||
salt = os.urandom(32)
|
||||
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||
|
||||
# Logout to ensure we're not authenticated
|
||||
client.get(url_for("logout"))
|
||||
|
||||
# Try to access a protected page (edit page for first watch)
|
||||
res = client.get(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
# Should redirect to login with the edit page as redirect parameter
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/login' in res.location
|
||||
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
|
||||
|
||||
# Follow the redirect to login page
|
||||
res = client.get(res.location, follow_redirects=False)
|
||||
assert res.status_code == 200
|
||||
assert b'Password' in res.data
|
||||
|
||||
# The redirect parameter should be preserved in the login form
|
||||
# It should contain the edit page URL
|
||||
assert b'name="redirect"' in res.data
|
||||
assert b'value="/edit/first"' in res.data or b'value="%2Fedit%2Ffirst"' in res.data
|
||||
|
||||
# Now login with correct password and the redirect parameter
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": password, "redirect": "/edit/first"},
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
# Should redirect to the edit page
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/edit/first' in res.location
|
||||
|
||||
# Follow the redirect to verify we're on the edit page
|
||||
res = client.get(res.location, follow_redirects=True)
|
||||
assert res.status_code == 200
|
||||
# Should see edit page content
|
||||
assert b'Edit' in res.data or b'Watching' in res.data
|
||||
|
||||
# Cleanup
|
||||
client.get(url_for("logout"))
|
||||
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||
|
||||
|
||||
def test_logout_with_redirect(client, live_server, measure_memory_usage, datastore_path):
|
||||
"""
|
||||
Test that logout preserves the current page URL, so after re-login
|
||||
the user returns to where they were before logging out.
|
||||
Example: User is on /edit/uuid, clicks logout, then logs back in and
|
||||
returns to /edit/uuid.
|
||||
"""
|
||||
import base64
|
||||
import hashlib
|
||||
|
||||
# Set a password and login
|
||||
password = "test123"
|
||||
salt = os.urandom(32)
|
||||
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
|
||||
salted_pass = base64.b64encode(salt + key).decode('ascii')
|
||||
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
|
||||
|
||||
# Login
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": password},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 200
|
||||
|
||||
# Now logout with a redirect parameter (simulating logout from /settings)
|
||||
res = client.get(
|
||||
url_for("logout", redirect="/settings"),
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
# Should redirect to login with the redirect parameter
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/login' in res.location
|
||||
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
|
||||
|
||||
# Follow the redirect to login page
|
||||
res = client.get(res.location, follow_redirects=False)
|
||||
assert res.status_code == 200
|
||||
assert b'Password' in res.data
|
||||
# The redirect parameter should be preserved
|
||||
assert b'value="/settings"' in res.data or b'value="%2Fsettings"' in res.data
|
||||
|
||||
# Login again with the redirect
|
||||
res = client.post(
|
||||
url_for("login"),
|
||||
data={"password": password, "redirect": "/settings"},
|
||||
follow_redirects=False
|
||||
)
|
||||
|
||||
# Should redirect back to settings
|
||||
assert res.status_code in [302, 303]
|
||||
assert '/settings' in res.location or 'settings' in res.location
|
||||
|
||||
# Cleanup
|
||||
del client.application.config['DATASTORE'].data['settings']['application']['password']
|
||||
|
||||
|
||||
Reference in New Issue
Block a user