import os
from flask import url_for
from changedetectionio.tests.util import set_modified_response
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .. import strtobool
def set_original_response(datastore_path):
test_return_data = """
head title
Some initial text
Which is across multiple lines
So let's see what happens.
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
return None
def test_bad_access(client, live_server, measure_memory_usage, datastore_path):
res = client.post(
url_for("imports.import_page"),
data={"urls": 'https://localhost'},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Attempt to add a body with a GET method
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": 'javascript:alert(document.domain)',
"tags": "",
"method": "GET",
"fetch_backend": "html_requests",
"body": "",
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": ' javascript:alert(123)', "tags": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": '%20%20%20javascript:alert(123)%20%20', "tags": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": ' source:javascript:alert(document.domain)', "tags": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": 'https://i-wanna-xss-you.com?hereis=', "tags": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
def _runner_test_various_file_slash(client, file_uri):
client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": file_uri, "tags": ''},
follow_redirects=True
)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
substrings = [b"URLs with hostname components are not permitted", b"No connection adapters were found for"]
# If it is enabled at test time
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')):
if file_uri.startswith('file:///'):
# This one should be the full qualified path to the file and should get the contents of this file
res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"),
follow_redirects=True
)
assert b'_runner_test_various_file_slash' in res.data
else:
# This will give some error from requests or if it went to chrome, will give some other error :-)
assert any(s in res.data for s in substrings)
delete_all_watches(client)
def test_file_slash_access(client, live_server, measure_memory_usage, datastore_path):
# file: is NOT permitted by default, so it will be caught by ALLOW_FILE_URI check
test_file_path = os.path.abspath(__file__)
_runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}")
# _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
# _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
def test_xss(client, live_server, measure_memory_usage, datastore_path):
from changedetectionio.notification import (
default_notification_format
)
# the template helpers were named .jinja which meant they were not having jinja2 autoescape enabled.
res = client.post(
url_for("settings.settings_page"),
data={"application-notification_urls": '">
',
"application-notification_title": '">
',
"application-notification_body": '">
',
"application-notification_format": default_notification_format,
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"
',
"url": url_for('test_endpoint', _external=True),
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b"" not in res.data # this text should be there
assert b'<a href="https://foobar"></a><script>alert(123);</script>' in res.data
assert b"https://foobar" in res.data # this text should be there
def test_login_redirect_safe_urls(client, live_server, measure_memory_usage, datastore_path):
"""
Test that safe redirect URLs work correctly in login flow.
This verifies the fix for open redirect vulnerabilities while maintaining
legitimate redirect functionality for both authenticated and unauthenticated users.
"""
# Test 1: Accessing /login?redirect=/settings when not logged in
# Should show the login form with redirect parameter preserved
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# Check that the redirect is preserved in the hidden form field
assert b'name="redirect"' in res.data
# Test 2: Valid internal redirect with query parameters
res = client.get(
url_for("login", redirect="/settings?tab=notifications"),
follow_redirects=False
)
assert res.status_code == 200
# Check that the redirect is preserved
assert b'value="/settings?tab=notifications"' in res.data
# Test 3: Malicious external URL should be blocked and default to watchlist
res = client.get(
url_for("login", redirect="https://evil.com/phishing"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# The redirect parameter in the form should NOT contain the evil URL
# Check the actual input value, not just anywhere in the page
assert b'value="https://evil.com' not in res.data
assert b'value="/evil.com' not in res.data
assert b'name="redirect"' in res.data
# Test 4: Double-slash attack should be blocked
res = client.get(
url_for("login", redirect="//evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//evil.com"' not in res.data
# Test 5: Protocol handler exploit should be blocked
res = client.get(
url_for("login", redirect="javascript:alert(document.domain)"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have javascript: in the redirect input value
assert b'value="javascript:' not in res.data
# Test 6: At-symbol obfuscation attack should be blocked
res = client.get(
url_for("login", redirect="//@evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//@evil.com"' not in res.data
# Test 7: Multiple slashes attack should be blocked
res = client.get(
url_for("login", redirect="////evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="////evil.com"' not in res.data
def test_login_redirect_with_password(client, live_server, measure_memory_usage, datastore_path):
"""
Test that redirect functionality works correctly when a password is set.
This ensures that notifications can always link to /login and users will
be redirected to the correct page after authentication.
"""
# Set a password
from changedetectionio import store
import base64
import hashlib
# Generate a test password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
# Set the password in the datastore
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Test 1: Try to access /login?redirect=/settings without being logged in
# Should show login form and preserve redirect parameter
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
assert res.status_code == 200
assert b"Password" in res.data
# Check that redirect parameter is preserved in the form
assert b'name="redirect"' in res.data
assert b'value="/settings"' in res.data
# Test 2: Submit correct password with redirect parameter
# Should redirect to /settings after successful login
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
# Should be on settings page
assert b"Settings" in res.data or b"settings" in res.data
# Test 3: Now that we're logged in, accessing /login?redirect=/settings
# should redirect immediately without showing login form
res = client.get(
url_for("login", redirect="/"),
follow_redirects=True
)
assert res.status_code == 200
assert b"Already logged in" in res.data
# Test 4: Malicious redirect should be blocked even with correct password
res = client.post(
url_for("login"),
data={"password": password, "redirect": "https://evil.com"},
follow_redirects=True
)
# Should redirect to watchlist index instead of evil.com
assert b"evil.com" not in res.data
# Logout for cleanup
client.get(url_for("logout"))
# Test 5: Incorrect password with redirect should stay on login page
res = client.post(
url_for("login"),
data={"password": "wrongpassword", "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
assert b"Incorrect password" in res.data or b"password" in res.data
# Clear the password
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_login_redirect_from_protected_page(client, live_server, measure_memory_usage, datastore_path):
"""
Test the complete redirect flow: accessing a protected page while logged out
should redirect to login with the page URL, then redirect back after login.
This is the real-world scenario where users try to access /edit/uuid or /settings
and need to login first.
"""
import base64
import hashlib
# Add a watch first
set_original_response(datastore_path=datastore_path)
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Set a password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Logout to ensure we're not authenticated
client.get(url_for("logout"))
# Try to access a protected page (edit page for first watch)
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
follow_redirects=False
)
# Should redirect to login with the edit page as redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved in the login form
# It should contain the edit page URL
assert b'name="redirect"' in res.data
assert b'value="/edit/first"' in res.data or b'value="%2Fedit%2Ffirst"' in res.data
# Now login with correct password and the redirect parameter
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/edit/first"},
follow_redirects=False
)
# Should redirect to the edit page
assert res.status_code in [302, 303]
assert '/edit/first' in res.location
# Follow the redirect to verify we're on the edit page
res = client.get(res.location, follow_redirects=True)
assert res.status_code == 200
# Should see edit page content
assert b'Edit' in res.data or b'Watching' in res.data
# Cleanup
client.get(url_for("logout"))
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_logout_with_redirect(client, live_server, measure_memory_usage, datastore_path):
"""
Test that logout preserves the current page URL, so after re-login
the user returns to where they were before logging out.
Example: User is on /edit/uuid, clicks logout, then logs back in and
returns to /edit/uuid.
"""
import base64
import hashlib
# Set a password and login
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Login
res = client.post(
url_for("login"),
data={"password": password},
follow_redirects=True
)
assert res.status_code == 200
# Now logout with a redirect parameter (simulating logout from /settings)
res = client.get(
url_for("logout", redirect="/settings"),
follow_redirects=False
)
# Should redirect to login with the redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved
assert b'value="/settings"' in res.data or b'value="%2Fsettings"' in res.data
# Login again with the redirect
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=False
)
# Should redirect back to settings
assert res.status_code in [302, 303]
assert '/settings' in res.location or 'settings' in res.location
# Cleanup
del client.application.config['DATASTORE'].data['settings']['application']['password']