Compare commits

...

4 Commits

Author SHA1 Message Date
dgtlmoon
e76f710164 adding test 2025-10-28 11:30:18 +01:00
dgtlmoon
33a3d2cfb6 Handle format= in apprise URLs 2025-10-28 11:06:23 +01:00
dgtlmoon
f304ae19db Adding small amount of cache to common functions (#3565) 2025-10-28 10:43:20 +01:00
dgtlmoon
2116b2cb93 CVE-2025-62780 - Stored XSS in Watch update via API 2025-10-28 10:09:30 +01:00
12 changed files with 156 additions and 24 deletions

View File

@@ -1,5 +1,7 @@
import os
from changedetectionio.strtobool import strtobool
from changedetectionio.html_tools import is_safe_url
from flask_expects_json import expects_json
from changedetectionio import queuedWatchMetaData
@@ -121,6 +123,10 @@ class Watch(Resource):
if validation_error:
return validation_error, 400
# XSS etc protection
if request.json.get('url') and not is_safe_url(request.json.get('url')):
return "Invalid URL", 400
watch.update(request.json)
return "OK", 200

View File

@@ -133,6 +133,11 @@ def get_socketio_path():
# Socket.IO will be available at {prefix}/socket.io/
return prefix
@app.template_global('is_safe_url')
def _is_safe_url(test_url):
from .html_tools import is_safe_url
return is_safe_url(test_url)
@app.template_filter('format_number_locale')
def _jinja2_filter_format_number_locale(value: float) -> str:

View File

@@ -550,7 +550,7 @@ def validate_url(test_url):
# This should be wtforms.validators.
raise ValidationError(message)
from .model.Watch import is_safe_url
from changedetectionio.html_tools import is_safe_url
if not is_safe_url(test_url):
# This should be wtforms.validators.
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')

View File

@@ -1,3 +1,5 @@
from functools import lru_cache
from loguru import logger
from typing import List
import html
@@ -13,6 +15,7 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
# 'price' , 'lowPrice', 'highPrice' are usually under here
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
@@ -22,9 +25,25 @@ class JSONNotFound(ValueError):
def __init__(self, msg):
ValueError.__init__(self, msg)
def is_safe_url(test_url):
import os
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile(re.escape('source:'), re.IGNORECASE)
test_url = r.sub('', test_url)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
if not pattern.match(test_url.strip()):
return False
return True
# Doesn't look like python supports forward slash auto enclosure in re.findall
# So convert it to inline flag "(?i)foobar" type configuration
@lru_cache(maxsize=100)
def perl_style_slash_enclosed_regex_to_options(regex):
res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE)

View File

@@ -1,5 +1,5 @@
from blinker import signal
from changedetectionio.html_tools import is_safe_url
from changedetectionio.strtobool import strtobool
from changedetectionio.jinja2_custom import render as jinja_render
from . import watch_base
@@ -21,23 +21,6 @@ FAVICON_RESAVE_THRESHOLD_SECONDS=86400
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
def is_safe_url(test_url):
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile(re.escape('source:'), re.IGNORECASE)
test_url = r.sub('', test_url)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
if not pattern.match(test_url.strip()):
return False
return True
class model(watch_base):
__newest_history_key = None
__history_n = 0

View File

@@ -360,6 +360,18 @@ def process_notification(n_object: NotificationContextData, datastore):
# texty types
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
else:
# ?format was IN the apprise URL, they are kind of on their own here, we will try our best
if 'format=html' in url:
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n')
# This will also prevent apprise from doing conversion
apprise_input_format = NotifyFormat.HTML.value
requested_output_format = NotifyFormat.HTML.value
elif 'format=text' in url:
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
apprise_input_format = NotifyFormat.TEXT.value
requested_output_format = NotifyFormat.TEXT.value
sent_objs.append({'title': n_title,
'body': n_body,
'url': url})

View File

@@ -1,4 +1,5 @@
from changedetectionio.strtobool import strtobool
from changedetectionio.html_tools import is_safe_url
from flask import (
flash
@@ -340,7 +341,6 @@ class ChangeDetectionStore:
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
flash("Error fetching metadata for {}".format(url), 'error')
return False
from .model.Watch import is_safe_url
if not is_safe_url(url):
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
return None

View File

@@ -53,7 +53,7 @@
<a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
<strong>Change</strong>Detection.io</a>
{% endif %}
{% if current_diff_url %}
{% if current_diff_url and is_safe_url(current_diff_url) %}
<a class="current-diff-url" href="{{ current_diff_url }}">
<span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
{% else %}

View File

@@ -4,7 +4,7 @@ from email import message_from_string
from email.policy import default as email_policy
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE
from changedetectionio.notification_service import NotificationContextData
from changedetectionio.notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \
set_longer_modified_response, delete_all_watches
@@ -99,6 +99,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
text_content = text_part.get_content()
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
assert 'fallback-body\r\n' in text_content # The plaintext part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
# Second part should be text/html
html_part = parts[1]
@@ -107,6 +108,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
assert 'some text<br>' in html_content # We converted \n from the notification body
assert 'fallback-body<br>' in html_content # kept the original <br>
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
delete_all_watches(client)
@@ -680,3 +682,73 @@ def test_check_html_document_plaintext_notification(client, live_server, measure
delete_all_watches(client)
def test_check_html_notification_with_apprise_format_is_html(client, live_server, measure_memory_usage):
## live_server_setup(live_server) # Setup on conftest per function
set_original_response()
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com&format=html'
#####################
# Set this up for when we remove the notification from the watch, it should fallback with these details
res = client.post(
url_for("settings.settings_page"),
data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
"application-notification_format": 'html',
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
set_longer_modified_response()
time.sleep(2)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(3)
msg_raw = get_last_message_from_smtp_server()
assert len(msg_raw) >= 1
# Parse the email properly using Python's email library
msg = message_from_string(msg_raw, policy=email_policy)
# The email should have two bodies (multipart/alternative with text/plain and text/html)
assert msg.is_multipart()
assert msg.get_content_type() == 'multipart/alternative'
# Get the parts
parts = list(msg.iter_parts())
assert len(parts) == 2
# First part should be text/plain (the auto-generated plaintext version)
text_part = parts[0]
assert text_part.get_content_type() == 'text/plain'
text_content = text_part.get_content()
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
assert 'fallback-body\r\n' in text_content # The plaintext part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
# Second part should be text/html
html_part = parts[1]
assert html_part.get_content_type() == 'text/html'
html_content = html_part.get_content()
assert 'some text<br>' in html_content # We converted \n from the notification body
assert 'fallback-body<br>' in html_content # kept the original <br>
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
delete_all_watches(client)

View File

@@ -370,7 +370,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
######################################################
# HTTP PUT try a field that doenst exist
# HTTP PUT try a field that doesn't exist
# HTTP PUT an update
res = client.put(
@@ -383,6 +383,17 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
# Message will come from `flask_expects_json`
assert b'Additional properties are not allowed' in res.data
# Try a XSS URL
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({
'url': 'javascript:alert(document.domain)'
}),
)
assert res.status_code == 400
# Cleanup everything
delete_all_watches(client)

View File

@@ -1,6 +1,8 @@
import os
from flask import url_for
from changedetectionio.tests.util import set_modified_response
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .. import strtobool
@@ -132,6 +134,26 @@ def test_xss(client, live_server, measure_memory_usage):
assert b"<img src=x onerror=alert(" not in res.data
assert b"&lt;img" in res.data
# Check that even forcing an update directly still doesnt get to the frontend
set_original_response()
XSS_HACK = 'javascript:alert(document.domain)'
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response()
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
live_server.app.config['DATASTORE'].data['watching'][uuid]['url']=XSS_HACK
res = client.get(url_for("ui.ui_views.preview_page", uuid=uuid))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
res = client.get(url_for("watchlist.index"))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
def test_xss_watch_last_error(client, live_server, measure_memory_usage):
set_original_response()

View File

@@ -1,3 +1,5 @@
from functools import lru_cache
import arrow
from enum import IntEnum
@@ -12,7 +14,7 @@ class Weekday(IntEnum):
Saturday = 5
Sunday = 6
@lru_cache(maxsize=100)
def am_i_inside_time(
day_of_week: str,
time_str: str,