Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
fe45bfc27a more resilient same UUID being processed 2026-01-02 17:25:58 +01:00
13 changed files with 53 additions and 869 deletions

View File

@@ -302,28 +302,18 @@ class WatchHistoryDiff(Resource):
from_version_file_contents = watch.get_history_snapshot(from_timestamp)
to_version_file_contents = watch.get_history_snapshot(to_timestamp)
# Get diff preferences from query parameters (matching UI preferences in DIFF_PREFERENCES_CONFIG)
# Support both 'type' (UI parameter) and 'word_diff' (API parameter) for backward compatibility
diff_type = request.args.get('type', 'diffLines')
if diff_type == 'diffWords':
word_diff = True
# Get diff preferences (using defaults similar to the existing code)
diff_prefs = {
'diff_ignoreWhitespace': False,
'diff_changesOnly': True
}
# Get boolean diff preferences with defaults from DIFF_PREFERENCES_CONFIG
changes_only = strtobool(request.args.get('changesOnly', 'true'))
ignore_whitespace = strtobool(request.args.get('ignoreWhitespace', 'false'))
include_removed = strtobool(request.args.get('removed', 'true'))
include_added = strtobool(request.args.get('added', 'true'))
include_replaced = strtobool(request.args.get('replaced', 'true'))
# Generate the diff with all preferences
# Generate the diff
content = diff.render_diff(
previous_version_file_contents=from_version_file_contents,
newest_version_file_contents=to_version_file_contents,
ignore_junk=ignore_whitespace,
include_equal=changes_only,
include_removed=include_removed,
include_added=include_added,
include_replaced=include_replaced,
ignore_junk=diff_prefs.get('diff_ignoreWhitespace'),
include_equal=not diff_prefs.get('diff_changesOnly'),
word_diff=word_diff,
)

View File

@@ -204,7 +204,7 @@ class fetcher(Fetcher):
import re
self.delete_browser_steps_screenshots()
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 12)) + self.render_extract_delay
n = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
extra_wait = min(n, 15)
logger.debug(f"Extra wait set to {extra_wait}s, requested was {n}s.")
@@ -288,27 +288,28 @@ class fetcher(Fetcher):
# Enable Network domain to detect when first bytes arrive
await self.page._client.send('Network.enable')
# Now set up the frame navigation handlers
async def handle_frame_navigation(event=None):
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
logger.debug(f"Frame navigated: {event}")
w = extra_wait - 2 if extra_wait > 4 else 2
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
await asyncio.sleep(w)
logger.debug("Issuing stopLoading command...")
await self.page._client.send('Page.stopLoading')
logger.debug("stopLoading command sent!")
async def setup_frame_handlers_on_first_response(event):
# Only trigger for the main document response
if event.get('type') == 'Document':
logger.debug("First response received, setting up frame handlers for forced page stop load.")
# De-register this listener - we only need it once
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
# Now set up the frame navigation handlers
async def handle_frame_navigation(event):
# Wait n seconds after the frameStartedLoading, not from any frameStartedLoading/frameStartedNavigating
logger.debug(f"Frame navigated: {event}")
w = extra_wait - 2 if extra_wait > 4 else 2
logger.debug(f"Waiting {w} seconds before calling Page.stopLoading...")
await asyncio.sleep(w)
logger.debug("Issuing stopLoading command...")
await self.page._client.send('Page.stopLoading')
logger.debug("stopLoading command sent!")
self.page._client.on('Page.frameStartedNavigating', lambda e: asyncio.create_task(handle_frame_navigation(e)))
self.page._client.on('Page.frameStartedLoading', lambda e: asyncio.create_task(handle_frame_navigation(e)))
self.page._client.on('Page.frameStoppedLoading', lambda e: logger.debug(f"Frame stopped loading: {e}"))
logger.debug("First response received, setting up frame handlers for forced page stop load DONE SETUP")
# De-register this listener - we only need it once
self.page._client.remove_listener('Network.responseReceived', setup_frame_handlers_on_first_response)
# Listen for first response to trigger frame handler setup
self.page._client.on('Network.responseReceived', setup_frame_handlers_on_first_response)
@@ -317,11 +318,8 @@ class fetcher(Fetcher):
attempt=0
while not response:
logger.debug(f"Attempting page fetch {url} attempt {attempt}")
asyncio.create_task(handle_frame_navigation())
response = await self.page.goto(url, timeout=0)
await asyncio.sleep(1 + extra_wait)
await self.page._client.send('Page.stopLoading')
if response:
break
if not response:

View File

@@ -26,7 +26,6 @@ from flask import (
session,
url_for,
)
from urllib.parse import urlparse
from flask_compress import Compress as FlaskCompress
from flask_login import current_user
from flask_restful import abort, Api
@@ -351,13 +350,6 @@ def changedetection_app(config=None, datastore_o=None):
global datastore, socketio_server
datastore = datastore_o
# Import and create a wrapper for is_safe_url that has access to app
from changedetectionio.is_safe_url import is_safe_url as _is_safe_url
def is_safe_url(target):
"""Wrapper for is_safe_url that passes the app instance"""
return _is_safe_url(target, app)
# so far just for read-only via tests, but this will be moved eventually to be the main source
# (instead of the global var)
app.config['DATASTORE'] = datastore_o
@@ -479,21 +471,11 @@ def changedetection_app(config=None, datastore_o=None):
@login_manager.unauthorized_handler
def unauthorized_handler():
# Pass the current request path so users are redirected back after login
return redirect(url_for('login', redirect=request.path))
return redirect(url_for('login', next=url_for('watchlist.index')))
@app.route('/logout')
def logout():
flask_login.logout_user()
# Check if there's a redirect parameter to return to after re-login
redirect_url = request.args.get('redirect')
# If redirect is provided and safe, pass it to login page
if redirect_url and is_safe_url(redirect_url):
return redirect(url_for('login', redirect=redirect_url))
# Otherwise just go to watchlist
return redirect(url_for('watchlist.index'))
@app.route('/set-language/<locale>')
@@ -505,36 +487,20 @@ def changedetection_app(config=None, datastore_o=None):
else:
logger.error(f"Invalid locale {locale}, available: {language_codes}")
# Check if there's a redirect parameter to return to the same page
redirect_url = request.args.get('redirect')
# If redirect is provided and safe, use it
if redirect_url and is_safe_url(redirect_url):
return redirect(redirect_url)
# Otherwise redirect to watchlist
# Redirect back to the page they came from, or home
return redirect(url_for('watchlist.index'))
# https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39
# You can divide up the stuff like this
@app.route('/login', methods=['GET', 'POST'])
def login():
# Extract and validate the redirect parameter
redirect_url = request.args.get('redirect') or request.form.get('redirect')
# Validate the redirect URL - default to watchlist if invalid
if redirect_url and is_safe_url(redirect_url):
validated_redirect = redirect_url
else:
validated_redirect = url_for('watchlist.index')
if request.method == 'GET':
if flask_login.current_user.is_authenticated:
# Already logged in - redirect immediately to the target
flash(gettext("Already logged in"))
return redirect(validated_redirect)
return redirect(url_for("watchlist.index"))
flash(gettext("You must be logged in, please log in."), 'error')
output = render_template("login.html", redirect_url=validated_redirect)
output = render_template("login.html")
return output
user = User()
@@ -544,13 +510,23 @@ def changedetection_app(config=None, datastore_o=None):
if (user.check_password(password)):
flask_login.login_user(user, remember=True)
# Redirect to the validated URL after successful login
return redirect(validated_redirect)
# For now there's nothing else interesting here other than the index/list page
# It's more reliable and safe to ignore the 'next' redirect
# When we used...
# next = request.args.get('next')
# return redirect(next or url_for('watchlist.index'))
# We would sometimes get login loop errors on sites hosted in sub-paths
# note for the future:
# if not is_safe_valid_url(next):
# return flask.abort(400)
return redirect(url_for('watchlist.index'))
else:
flash(gettext('Incorrect password'), 'error')
return redirect(url_for('login', redirect=redirect_url if redirect_url else None))
return redirect(url_for('login'))
@app.before_request
def before_request_handle_cookie_x_settings():

View File

@@ -1,113 +0,0 @@
"""
URL redirect validation module for preventing open redirect vulnerabilities.
This module provides functionality to safely validate redirect URLs, ensuring they:
1. Point to internal routes only (no external redirects)
2. Are properly normalized (preventing browser parsing differences)
3. Match registered Flask routes (no fake/non-existent pages)
4. Are fully logged for security monitoring
References:
- https://flask-login.readthedocs.io/ (safe redirect patterns)
- https://blog.miguelgrinberg.com/post/the-flask-mega-tutorial-part-v-user-logins
- https://www.pythonkitchen.com/how-prevent-open-redirect-vulnerab-flask/
"""
from urllib.parse import urlparse, urljoin
from flask import request
from loguru import logger
def is_safe_url(target, app):
"""
Validate that a redirect URL is safe to prevent open redirect vulnerabilities.
This follows Flask/Werkzeug best practices by ensuring the redirect URL:
1. Is a relative path starting with exactly one '/'
2. Does not start with '//' (double-slash attack)
3. Has no external protocol handlers
4. Points to a valid registered route in the application
5. Is properly normalized to prevent browser parsing differences
Args:
target: The URL to validate (e.g., '/settings', '/login#top')
app: The Flask application instance (needed for route validation)
Returns:
bool: True if the URL is safe for redirection, False otherwise
Examples:
>>> is_safe_url('/settings', app)
True
>>> is_safe_url('//evil.com', app)
False
>>> is_safe_url('/settings#general', app)
True
>>> is_safe_url('/fake-page', app)
False
"""
if not target:
return False
# Normalize the URL to prevent browser parsing differences
# Strip whitespace and replace backslashes (which some browsers interpret as forward slashes)
target = target.strip()
target = target.replace('\\', '/')
# First, check if it starts with // or more (double-slash attack)
if target.startswith('//'):
logger.warning(f"Blocked redirect attempt with double-slash: {target}")
return False
# Parse the URL to check for scheme and netloc
parsed = urlparse(target)
# Block any URL with a scheme (http://, https://, javascript:, etc.)
if parsed.scheme:
logger.warning(f"Blocked redirect attempt with scheme: {target}")
return False
# Block any URL with a network location (netloc)
# This catches patterns like //evil.com, user@host, etc.
if parsed.netloc:
logger.warning(f"Blocked redirect attempt with netloc: {target}")
return False
# At this point, we have a relative URL with no scheme or netloc
# Use urljoin to resolve it and verify it points to the same host
ref_url = urlparse(request.host_url)
test_url = urlparse(urljoin(request.host_url, target))
# Check: ensure the resolved URL has the same netloc as current host
if not (test_url.scheme in ('http', 'https') and ref_url.netloc == test_url.netloc):
logger.warning(f"Blocked redirect attempt with mismatched netloc: {target}")
return False
# Additional validation: Check if the URL matches a registered route
# This prevents redirects to non-existent pages or unintended endpoints
try:
# Get the path without query string and fragment
# Fragments (like #general) are automatically stripped by urlparse
path = parsed.path
# Create a URL adapter bound to the server name
adapter = app.url_map.bind(ref_url.netloc)
# Try to match the path to a registered route
# This will raise NotFound if the route doesn't exist
endpoint, values = adapter.match(path, return_rule=False)
# Block redirects to static file endpoints - these are catch-all routes
# that would match arbitrary paths, potentially allowing unintended redirects
if endpoint in ('static_content', 'static', 'static_flags'):
logger.warning(f"Blocked redirect to static endpoint: {target}")
return False
# Successfully matched a valid route
logger.debug(f"Validated safe redirect to endpoint '{endpoint}': {target}")
return True
except Exception as e:
# Route doesn't exist or can't be matched
logger.warning(f"Blocked redirect to non-existent route: {target} (error: {e})")
return False

View File

@@ -15,22 +15,6 @@ document.addEventListener('DOMContentLoaded', function() {
// Open modal when language button is clicked
languageButton.addEventListener('click', function(e) {
e.preventDefault();
// Update all language links to include current hash in the redirect parameter
const currentPath = window.location.pathname;
const currentHash = window.location.hash;
if (currentHash) {
const languageOptions = languageModal.querySelectorAll('.language-option');
languageOptions.forEach(function(option) {
const url = new URL(option.href, window.location.origin);
// Update the redirect parameter to include the hash
const redirectPath = currentPath + currentHash;
url.searchParams.set('redirect', redirectPath);
option.setAttribute('href', url.pathname + url.search + url.hash);
});
}
languageModal.showModal();
});

View File

@@ -97,7 +97,7 @@
{% endif %}
{% if current_user.is_authenticated %}
<li class="pure-menu-item">
<a href="{{url_for('logout', redirect=request.path)}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
<a href="{{url_for('logout')}}" class="pure-menu-link">{{ _('LOG OUT') }}</a>
</li>
{% endif %}
{% if current_user.is_authenticated or not has_password %}
@@ -261,7 +261,7 @@
<div class="modal-body">
<div class="language-list">
{% for locale, lang_data in available_languages.items()|sort %}
<a href="{{ url_for('set_language', locale=locale, redirect=request.path) }}" class="language-option" data-locale="{{ locale }}">
<a href="{{ url_for('set_language', locale=locale) }}" class="language-option" data-locale="{{ locale }}">
<span class="{{ lang_data.flag }}" style="display: inline-block; width: 1.5em; height: 1.5em; vertical-align: middle; margin-right: 0.5em; border-radius: 50%; overflow: hidden;"></span> <span class="language-name">{{ lang_data.name }}</span>
</a>
{% endfor %}

View File

@@ -5,7 +5,6 @@
<div class="inner">
<form class="pure-form pure-form-stacked" action="{{url_for('login')}}" method="POST">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
<input type="hidden" id="redirect" name="redirect" value="{{ redirect_url }}">
<fieldset>
<div class="pure-control-group">
<label for="password">{{ _('Password') }}</label>

View File

@@ -165,83 +165,18 @@ def test_api_simple(client, live_server, measure_memory_usage, datastore_path):
assert b'<div id' in res.data
# Fetch the difference between two versions (default text format)
# Fetch the difference between two versions
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest'),
headers={'x-api-key': api_key},
)
assert b'(changed) Which is across' in res.data
# Test htmlcolor format
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?format=htmlcolor',
headers={'x-api-key': api_key},
)
assert b'aria-label="Changed text" title="Changed text">Which is across multiple lines' in res.data
# Test html format
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?format=html',
headers={'x-api-key': api_key},
)
assert res.status_code == 200
assert b'<br>' in res.data
# Test markdown format
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?format=markdown',
headers={'x-api-key': api_key},
)
assert res.status_code == 200
# Test new diff preference parameters
# Test removed=false (should hide removed content)
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?removed=false',
headers={'x-api-key': api_key},
)
# Should not contain removed content indicator
assert b'(removed)' not in res.data
# Should still contain added content
assert b'(added)' in res.data or b'which has this one new line' in res.data
# Test added=false (should hide added content)
# Note: The test data has replacements, not pure additions, so we test differently
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?added=false&replaced=false',
headers={'x-api-key': api_key},
)
# With both added and replaced disabled, should have minimal content
# Should not contain added indicators
assert b'(added)' not in res.data
# Test replaced=false (should hide replaced/changed content)
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?replaced=false',
headers={'x-api-key': api_key},
)
# Should not contain changed content indicator
assert b'(changed)' not in res.data
# Test type=diffWords for word-level diff
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?type=diffWords&format=htmlcolor',
headers={'x-api-key': api_key},
)
# Should contain HTML formatted diff
assert res.status_code == 200
assert len(res.data) > 0
# Test combined parameters: show only additions with word diff
res = client.get(
url_for("watchhistorydiff", uuid=watch_uuid, from_timestamp='previous', to_timestamp='latest')+'?removed=false&replaced=false&type=diffWords',
headers={'x-api-key': api_key},
)
assert res.status_code == 200
# Should not contain removed or changed markers
assert b'(removed)' not in res.data
assert b'(changed)' not in res.data
# Fetch the whole watch
res = client.get(

View File

@@ -110,42 +110,3 @@ def test_language_persistence_in_session(client, live_server, measure_memory_usa
assert res.status_code == 200
assert b"Annulla" in res.data, "Italian text should persist across requests"
def test_set_language_with_redirect(client, live_server, measure_memory_usage, datastore_path):
"""
Test that changing language keeps the user on the same page.
Example: User is on /settings, changes language, stays on /settings.
"""
from flask import url_for
# Set language with a redirect parameter (simulating language change from /settings)
res = client.get(
url_for("set_language", locale="de", redirect="/settings"),
follow_redirects=False
)
# Should redirect back to settings
assert res.status_code in [302, 303]
assert '/settings' in res.location
# Verify language was set in session
with client.session_transaction() as sess:
assert sess.get('locale') == 'de'
# Test with invalid locale (should still redirect safely)
res = client.get(
url_for("set_language", locale="invalid_locale", redirect="/settings"),
follow_redirects=False
)
assert res.status_code in [302, 303]
assert '/settings' in res.location
# Test with malicious redirect (should default to watchlist)
res = client.get(
url_for("set_language", locale="en", redirect="https://evil.com"),
follow_redirects=False
)
assert res.status_code in [302, 303]
# Should not redirect to evil.com
assert 'evil.com' not in res.location

View File

@@ -240,6 +240,7 @@ def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage, da
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, datastore_path):
delete_all_watches(client)
@@ -298,26 +299,7 @@ def test_itemprop_percent_threshold(client, live_server, measure_memory_usage, d
assert b'has-unread-changes' not in res.data
# Re #2600 - Switch the mode to normal type and back, and see if the values stick..
###################################################################################
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"restock_settings-follow_price_changes": "y",
"restock_settings-price_change_threshold_percent": 5.05,
"processor": "text_json_diff",
"url": test_url,
'fetch_backend': "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# And back again
live_server.app.config['DATASTORE'].data['watching'][uuid]['processor'] = 'restock_diff'
res = client.get(url_for("ui.ui_edit.edit_page", uuid=uuid))
assert b'type="text" value="5.05"' in res.data
delete_all_watches(client)
@@ -461,4 +443,3 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
res = client.get(url_for("watchlist.index"))
assert b'ception' not in res.data
assert b'155.55' in res.data

View File

@@ -192,289 +192,3 @@ def test_xss_watch_last_error(client, live_server, measure_memory_usage, datasto
assert b'&lt;a href=&#34;https://foobar&#34;&gt;&lt;/a&gt;&lt;script&gt;alert(123);&lt;/script&gt;' in res.data
assert b"https://foobar" in res.data # this text should be there
def test_login_redirect_safe_urls(client, live_server, measure_memory_usage, datastore_path):
"""
Test that safe redirect URLs work correctly in login flow.
This verifies the fix for open redirect vulnerabilities while maintaining
legitimate redirect functionality for both authenticated and unauthenticated users.
"""
# Test 1: Accessing /login?redirect=/settings when not logged in
# Should show the login form with redirect parameter preserved
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# Check that the redirect is preserved in the hidden form field
assert b'name="redirect"' in res.data
# Test 2: Valid internal redirect with query parameters
res = client.get(
url_for("login", redirect="/settings?tab=notifications"),
follow_redirects=False
)
assert res.status_code == 200
# Check that the redirect is preserved
assert b'value="/settings?tab=notifications"' in res.data
# Test 3: Malicious external URL should be blocked and default to watchlist
res = client.get(
url_for("login", redirect="https://evil.com/phishing"),
follow_redirects=False
)
# Should show login form
assert res.status_code == 200
# The redirect parameter in the form should NOT contain the evil URL
# Check the actual input value, not just anywhere in the page
assert b'value="https://evil.com' not in res.data
assert b'value="/evil.com' not in res.data
assert b'name="redirect"' in res.data
# Test 4: Double-slash attack should be blocked
res = client.get(
url_for("login", redirect="//evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//evil.com"' not in res.data
# Test 5: Protocol handler exploit should be blocked
res = client.get(
url_for("login", redirect="javascript:alert(document.domain)"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have javascript: in the redirect input value
assert b'value="javascript:' not in res.data
# Test 6: At-symbol obfuscation attack should be blocked
res = client.get(
url_for("login", redirect="//@evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="//@evil.com"' not in res.data
# Test 7: Multiple slashes attack should be blocked
res = client.get(
url_for("login", redirect="////evil.com"),
follow_redirects=False
)
assert res.status_code == 200
# Should not have the malicious URL in the redirect input value
assert b'value="////evil.com"' not in res.data
def test_login_redirect_with_password(client, live_server, measure_memory_usage, datastore_path):
"""
Test that redirect functionality works correctly when a password is set.
This ensures that notifications can always link to /login and users will
be redirected to the correct page after authentication.
"""
# Set a password
from changedetectionio import store
import base64
import hashlib
# Generate a test password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
# Set the password in the datastore
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Test 1: Try to access /login?redirect=/settings without being logged in
# Should show login form and preserve redirect parameter
res = client.get(
url_for("login", redirect="/settings"),
follow_redirects=False
)
assert res.status_code == 200
assert b"Password" in res.data
# Check that redirect parameter is preserved in the form
assert b'name="redirect"' in res.data
assert b'value="/settings"' in res.data
# Test 2: Submit correct password with redirect parameter
# Should redirect to /settings after successful login
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
# Should be on settings page
assert b"Settings" in res.data or b"settings" in res.data
# Test 3: Now that we're logged in, accessing /login?redirect=/settings
# should redirect immediately without showing login form
res = client.get(
url_for("login", redirect="/"),
follow_redirects=True
)
assert res.status_code == 200
assert b"Already logged in" in res.data
# Test 4: Malicious redirect should be blocked even with correct password
res = client.post(
url_for("login"),
data={"password": password, "redirect": "https://evil.com"},
follow_redirects=True
)
# Should redirect to watchlist index instead of evil.com
assert b"evil.com" not in res.data
# Logout for cleanup
client.get(url_for("logout"))
# Test 5: Incorrect password with redirect should stay on login page
res = client.post(
url_for("login"),
data={"password": "wrongpassword", "redirect": "/settings"},
follow_redirects=True
)
assert res.status_code == 200
assert b"Incorrect password" in res.data or b"password" in res.data
# Clear the password
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_login_redirect_from_protected_page(client, live_server, measure_memory_usage, datastore_path):
"""
Test the complete redirect flow: accessing a protected page while logged out
should redirect to login with the page URL, then redirect back after login.
This is the real-world scenario where users try to access /edit/uuid or /settings
and need to login first.
"""
import base64
import hashlib
# Add a watch first
set_original_response(datastore_path=datastore_path)
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Set a password
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Logout to ensure we're not authenticated
client.get(url_for("logout"))
# Try to access a protected page (edit page for first watch)
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
follow_redirects=False
)
# Should redirect to login with the edit page as redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved in the login form
# It should contain the edit page URL
assert b'name="redirect"' in res.data
assert b'value="/edit/first"' in res.data or b'value="%2Fedit%2Ffirst"' in res.data
# Now login with correct password and the redirect parameter
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/edit/first"},
follow_redirects=False
)
# Should redirect to the edit page
assert res.status_code in [302, 303]
assert '/edit/first' in res.location
# Follow the redirect to verify we're on the edit page
res = client.get(res.location, follow_redirects=True)
assert res.status_code == 200
# Should see edit page content
assert b'Edit' in res.data or b'Watching' in res.data
# Cleanup
client.get(url_for("logout"))
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_logout_with_redirect(client, live_server, measure_memory_usage, datastore_path):
"""
Test that logout preserves the current page URL, so after re-login
the user returns to where they were before logging out.
Example: User is on /edit/uuid, clicks logout, then logs back in and
returns to /edit/uuid.
"""
import base64
import hashlib
# Set a password and login
password = "test123"
salt = os.urandom(32)
key = hashlib.pbkdf2_hmac('sha256', password.encode('utf-8'), salt, 100000)
salted_pass = base64.b64encode(salt + key).decode('ascii')
client.application.config['DATASTORE'].data['settings']['application']['password'] = salted_pass
# Login
res = client.post(
url_for("login"),
data={"password": password},
follow_redirects=True
)
assert res.status_code == 200
# Now logout with a redirect parameter (simulating logout from /settings)
res = client.get(
url_for("logout", redirect="/settings"),
follow_redirects=False
)
# Should redirect to login with the redirect parameter
assert res.status_code in [302, 303]
assert '/login' in res.location
assert 'redirect=' in res.location or 'redirect=%2F' in res.location
# Follow the redirect to login page
res = client.get(res.location, follow_redirects=False)
assert res.status_code == 200
assert b'Password' in res.data
# The redirect parameter should be preserved
assert b'value="/settings"' in res.data or b'value="%2Fsettings"' in res.data
# Login again with the redirect
res = client.post(
url_for("login"),
data={"password": password, "redirect": "/settings"},
follow_redirects=False
)
# Should redirect back to settings
assert res.status_code in [302, 303]
assert '/settings' in res.location or 'settings' in res.location
# Cleanup
del client.application.config['DATASTORE'].data['settings']['application']['password']

View File

@@ -28,7 +28,7 @@ info:
For example: `x-api-key: YOUR_API_KEY`
version: 0.1.4
version: 0.1.3
contact:
name: ChangeDetection.io
url: https://github.com/dgtlmoon/changedetection.io
@@ -761,9 +761,9 @@ paths:
get:
operationId: getWatchHistoryDiff
tags: [Watch History]
summary: Get the difference between two snapshots
summary: Get diff between two snapshots
description: |
Generate a difference (comparison) between two historical snapshots of a web page change monitor (watch).
Generate a formatted diff (comparison) between two historical snapshots of a web page change monitor (watch).
This endpoint compares content between two points in time and returns the differences in your chosen format.
Perfect for reviewing what changed between specific versions or comparing recent changes.
@@ -798,10 +798,6 @@ paths:
# Compare two specific timestamps in plain text with word-level diff
curl -X GET "http://localhost:5000/api/v1/watch/095be615-a8ad-4c33-8e9c-c7612fbf6c9f/difference/1640995200/1640998800?format=text&word_diff=true" \
-H "x-api-key: YOUR_API_KEY"
# Show only additions (hide removed/replaced content), ignore whitespace
curl -X GET "http://localhost:5000/api/v1/watch/095be615-a8ad-4c33-8e9c-c7612fbf6c9f/difference/previous/latest?format=htmlcolor&removed=false&replaced=false&ignoreWhitespace=true" \
-H "x-api-key: YOUR_API_KEY"
- lang: 'Python'
source: |
import requests
@@ -826,20 +822,6 @@ paths:
params={'format': 'text', 'word_diff': 'true'}
)
print(response.text)
# Show only additions, ignore whitespace and use word-level diff
response = requests.get(
f'http://localhost:5000/api/v1/watch/{uuid}/difference/previous/latest',
headers=headers,
params={
'format': 'htmlcolor',
'type': 'diffWords',
'removed': 'false',
'replaced': 'false',
'ignoreWhitespace': 'true'
}
)
print(response.text)
parameters:
- name: uuid
in: path
@@ -879,10 +861,9 @@ paths:
- `text` (default): Plain text with (removed) and (added) prefixes
- `html`: Basic HTML format
- `htmlcolor`: Rich HTML with colored backgrounds (red for deletions, green for additions)
- `markdown`: Markdown format with HTML rendering
schema:
type: string
enum: [text, html, htmlcolor, markdown]
enum: [text, html, htmlcolor]
default: text
- name: word_diff
in: query
@@ -907,69 +888,6 @@ paths:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "false"
- name: type
in: query
description: |
Diff granularity type:
- `diffLines` (default): Line-level comparison, showing which lines changed
- `diffWords`: Word-level comparison, showing which words changed within lines
This parameter is an alternative to `word_diff` for better alignment with the UI.
If both are specified, `type=diffWords` will enable word-level diffing.
schema:
type: string
enum: [diffLines, diffWords]
default: diffLines
- name: changesOnly
in: query
description: |
When enabled, only show lines/content that changed (no surrounding context).
When disabled, include unchanged lines for context around changes.
Accepts: true, false, 1, 0, yes, no, on, off
schema:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "true"
- name: ignoreWhitespace
in: query
description: |
When enabled, ignore whitespace-only changes (spaces, tabs, newlines).
Useful for focusing on content changes and ignoring formatting differences.
Accepts: true, false, 1, 0, yes, no, on, off
schema:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "false"
- name: removed
in: query
description: |
Include removed/deleted content in the diff output.
When disabled, content that was deleted will not appear in the diff.
Accepts: true, false, 1, 0, yes, no, on, off
schema:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "true"
- name: added
in: query
description: |
Include added/new content in the diff output.
When disabled, content that was added will not appear in the diff.
Accepts: true, false, 1, 0, yes, no, on, off
schema:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "true"
- name: replaced
in: query
description: |
Include replaced/modified content in the diff output.
When disabled, content that was modified (changed from one value to another) will not appear in the diff.
Accepts: true, false, 1, 0, yes, no, on, off
schema:
type: string
enum: ["true", "false", "1", "0", "yes", "no", "on", "off"]
default: "true"
responses:
'200':
description: Formatted diff between the two snapshots

File diff suppressed because one or more lines are too long