mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-05 17:16:12 +00:00
Compare commits
1 Commits
url-valida
...
speed-up-w
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a738ba0d7 |
@@ -1,9 +1,10 @@
|
||||
import os
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from flask_restful import abort, Resource
|
||||
from flask import request
|
||||
import validators
|
||||
from functools import wraps
|
||||
from . import auth, validate_openapi_request
|
||||
from ..validate_url import is_safe_valid_url
|
||||
|
||||
|
||||
def default_content_type(content_type='text/plain'):
|
||||
@@ -49,13 +50,14 @@ class Import(Resource):
|
||||
|
||||
urls = request.get_data().decode('utf8').splitlines()
|
||||
added = []
|
||||
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
|
||||
for url in urls:
|
||||
url = url.strip()
|
||||
if not len(url):
|
||||
continue
|
||||
|
||||
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
|
||||
if not is_safe_valid_url(url):
|
||||
if not validators.url(url, simple_host=allow_simplehost):
|
||||
return f"Invalid or unsupported URL - {url}", 400
|
||||
|
||||
if dedupe and self.datastore.url_exists(url):
|
||||
|
||||
@@ -1,12 +1,14 @@
|
||||
import os
|
||||
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
|
||||
from flask_expects_json import expects_json
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
from changedetectionio import worker_handler
|
||||
from flask_restful import abort, Resource
|
||||
from flask import request, make_response, send_from_directory
|
||||
import validators
|
||||
from . import auth
|
||||
import copy
|
||||
|
||||
@@ -122,7 +124,7 @@ class Watch(Resource):
|
||||
return validation_error, 400
|
||||
|
||||
# XSS etc protection
|
||||
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
|
||||
if request.json.get('url') and not is_safe_url(request.json.get('url')):
|
||||
return "Invalid URL", 400
|
||||
|
||||
watch.update(request.json)
|
||||
@@ -230,7 +232,9 @@ class CreateWatch(Resource):
|
||||
json_data = request.get_json()
|
||||
url = json_data['url'].strip()
|
||||
|
||||
if not is_safe_valid_url(url):
|
||||
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
|
||||
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
|
||||
if not validators.url(url, simple_host=allow_simplehost):
|
||||
return "Invalid or unsupported URL", 400
|
||||
|
||||
if json_data.get('proxy'):
|
||||
|
||||
@@ -39,7 +39,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
|
||||
try:
|
||||
# Use native janus async interface - no threads needed!
|
||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=1.0)
|
||||
queued_item_data = await asyncio.wait_for(q.async_get(), timeout=0.3)
|
||||
|
||||
except asyncio.TimeoutError:
|
||||
# No jobs available, continue loop
|
||||
|
||||
@@ -133,10 +133,10 @@ def get_socketio_path():
|
||||
# Socket.IO will be available at {prefix}/socket.io/
|
||||
return prefix
|
||||
|
||||
@app.template_global('is_safe_valid_url')
|
||||
def _is_safe_valid_url(test_url):
|
||||
from .validate_url import is_safe_valid_url
|
||||
return is_safe_valid_url(test_url)
|
||||
@app.template_global('is_safe_url')
|
||||
def _is_safe_url(test_url):
|
||||
from .html_tools import is_safe_url
|
||||
return is_safe_url(test_url)
|
||||
|
||||
|
||||
@app.template_filter('format_number_locale')
|
||||
@@ -387,7 +387,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# We would sometimes get login loop errors on sites hosted in sub-paths
|
||||
|
||||
# note for the future:
|
||||
# if not is_safe_valid_url(next):
|
||||
# if not is_safe_url(next):
|
||||
# return flask.abort(400)
|
||||
return redirect(url_for('watchlist.index'))
|
||||
|
||||
|
||||
@@ -28,8 +28,11 @@ from wtforms.utils import unset_value
|
||||
|
||||
from wtforms.validators import ValidationError
|
||||
|
||||
from validators.url import url as url_validator
|
||||
|
||||
from changedetectionio.widgets import TernaryNoneBooleanField
|
||||
|
||||
|
||||
# default
|
||||
# each select <option data-enabled="enabled-0-0"
|
||||
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
|
||||
@@ -538,10 +541,19 @@ class validateURL(object):
|
||||
|
||||
|
||||
def validate_url(test_url):
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
if not is_safe_valid_url(test_url):
|
||||
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
|
||||
try:
|
||||
url_validator(test_url, simple_host=allow_simplehost)
|
||||
except validators.ValidationError:
|
||||
#@todo check for xss
|
||||
message = f"'{test_url}' is not a valid URL."
|
||||
# This should be wtforms.validators.
|
||||
raise ValidationError('Watch protocol is not permitted or invalid URL format')
|
||||
raise ValidationError(message)
|
||||
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
if not is_safe_url(test_url):
|
||||
# This should be wtforms.validators.
|
||||
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
|
||||
|
||||
|
||||
class ValidateSinglePythonRegexString(object):
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from functools import lru_cache
|
||||
|
||||
from loguru import logger
|
||||
from typing import List
|
||||
import html
|
||||
@@ -15,6 +13,8 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
|
||||
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
|
||||
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
|
||||
|
||||
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
|
||||
|
||||
# 'price' , 'lowPrice', 'highPrice' are usually under here
|
||||
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
|
||||
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
|
||||
@@ -23,9 +23,24 @@ class JSONNotFound(ValueError):
|
||||
def __init__(self, msg):
|
||||
ValueError.__init__(self, msg)
|
||||
|
||||
def is_safe_url(test_url):
|
||||
import os
|
||||
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
|
||||
|
||||
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
|
||||
# 'source:' is a valid way to tell us to return the source
|
||||
|
||||
r = re.compile(re.escape('source:'), re.IGNORECASE)
|
||||
test_url = r.sub('', test_url)
|
||||
|
||||
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
|
||||
if not pattern.match(test_url.strip()):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
# Doesn't look like python supports forward slash auto enclosure in re.findall
|
||||
# So convert it to inline flag "(?i)foobar" type configuration
|
||||
@lru_cache(maxsize=100)
|
||||
def perl_style_slash_enclosed_regex_to_options(regex):
|
||||
|
||||
res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE)
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from blinker import signal
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import watch_base
|
||||
@@ -13,6 +12,9 @@ from .. import jinja2_custom as safe_jinja
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
# Allowable protocols, protects against javascript: etc
|
||||
# file:// is further checked by ALLOW_FILE_URI
|
||||
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
|
||||
|
||||
@@ -61,7 +63,7 @@ class model(watch_base):
|
||||
def link(self):
|
||||
|
||||
url = self.get('url', '')
|
||||
if not is_safe_valid_url(url):
|
||||
if not is_safe_url(url):
|
||||
return 'DISABLED'
|
||||
|
||||
ready_url = url
|
||||
@@ -82,7 +84,7 @@ class model(watch_base):
|
||||
ready_url=ready_url.replace('source:', '')
|
||||
|
||||
# Also double check it after any Jinja2 formatting just incase
|
||||
if not is_safe_valid_url(ready_url):
|
||||
if not is_safe_url(ready_url):
|
||||
return 'DISABLED'
|
||||
return ready_url
|
||||
|
||||
|
||||
@@ -360,18 +360,6 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
# texty types
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
|
||||
|
||||
else:
|
||||
# ?format was IN the apprise URL, they are kind of on their own here, we will try our best
|
||||
if 'format=html' in url:
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n')
|
||||
# This will also prevent apprise from doing conversion
|
||||
apprise_input_format = NotifyFormat.HTML.value
|
||||
requested_output_format = NotifyFormat.HTML.value
|
||||
elif 'format=text' in url:
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
|
||||
apprise_input_format = NotifyFormat.TEXT.value
|
||||
requested_output_format = NotifyFormat.TEXT.value
|
||||
|
||||
sent_objs.append({'title': n_title,
|
||||
'body': n_body,
|
||||
'url': url})
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from changedetectionio.strtobool import strtobool
|
||||
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
|
||||
from flask import (
|
||||
flash
|
||||
@@ -342,10 +341,8 @@ class ChangeDetectionStore:
|
||||
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
|
||||
flash("Error fetching metadata for {}".format(url), 'error')
|
||||
return False
|
||||
|
||||
if not is_safe_valid_url(url):
|
||||
flash('Watch protocol is not permitted or invalid URL format', 'error')
|
||||
|
||||
if not is_safe_url(url):
|
||||
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
|
||||
return None
|
||||
|
||||
if tag and type(tag) == str:
|
||||
|
||||
@@ -53,7 +53,7 @@
|
||||
<a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
|
||||
<strong>Change</strong>Detection.io</a>
|
||||
{% endif %}
|
||||
{% if current_diff_url and is_safe_valid_url(current_diff_url) %}
|
||||
{% if current_diff_url and is_safe_url(current_diff_url) %}
|
||||
<a class="current-diff-url" href="{{ current_diff_url }}">
|
||||
<span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
|
||||
{% else %}
|
||||
|
||||
@@ -4,7 +4,7 @@ from email import message_from_string
|
||||
from email.policy import default as email_policy
|
||||
|
||||
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE
|
||||
from changedetectionio.notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
|
||||
wait_for_all_checks, \
|
||||
set_longer_modified_response, delete_all_watches
|
||||
@@ -99,7 +99,6 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert 'fallback-body\r\n' in text_content # The plaintext part
|
||||
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
@@ -108,7 +107,6 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
assert 'some text<br>' in html_content # We converted \n from the notification body
|
||||
assert 'fallback-body<br>' in html_content # kept the original <br>
|
||||
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
|
||||
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
@@ -682,73 +680,3 @@ def test_check_html_document_plaintext_notification(client, live_server, measure
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_check_html_notification_with_apprise_format_is_html(client, live_server, measure_memory_usage):
|
||||
## live_server_setup(live_server) # Setup on conftest per function
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com&format=html'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
|
||||
"application-notification_format": 'html',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert 'fallback-body\r\n' in text_content # The plaintext part
|
||||
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert 'some text<br>' in html_content # We converted \n from the notification body
|
||||
assert 'fallback-body<br>' in html_content # kept the original <br>
|
||||
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
|
||||
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
|
||||
delete_all_watches(client)
|
||||
@@ -64,21 +64,29 @@ def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usa
|
||||
# Should not have template error
|
||||
assert b'Invalid template' not in res.data
|
||||
|
||||
|
||||
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
|
||||
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
||||
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_return_query', _external=True)
|
||||
|
||||
full_url = test_url + "?date={{ ''.__class__.__mro__[1].__subclasses__()}}"
|
||||
|
||||
# because url_for() will URL-encode the var, but we dont here
|
||||
full_url = "{}?{}".format(test_url,
|
||||
"date={{ ''.__class__.__mro__[1].__subclasses__()}}", )
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": full_url, "tags": "test"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" not in res.data
|
||||
assert b"Watch added" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# It should report nothing found (no new 'has-unread-changes' class)
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'is invalid and cannot be used' in res.data
|
||||
# Some of the spewed output from the subclasses
|
||||
assert b'dict_values' not in res.data
|
||||
|
||||
def test_timezone(mocker):
|
||||
"""Verify that timezone is parsed."""
|
||||
|
||||
@@ -25,7 +25,7 @@ def set_original_response():
|
||||
return None
|
||||
|
||||
def test_bad_access(client, live_server, measure_memory_usage):
|
||||
|
||||
|
||||
res = client.post(
|
||||
url_for("imports.import_page"),
|
||||
data={"urls": 'https://localhost'},
|
||||
@@ -48,7 +48,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'Watch protocol is not permitted or invalid URL format' in res.data
|
||||
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
@@ -56,7 +56,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'Watch protocol is not permitted or invalid URL format' in res.data
|
||||
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
@@ -64,7 +64,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'Watch protocol is not permitted or invalid URL format' in res.data
|
||||
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
|
||||
|
||||
|
||||
res = client.post(
|
||||
@@ -73,15 +73,8 @@ def test_bad_access(client, live_server, measure_memory_usage):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'Watch protocol is not permitted or invalid URL format' in res.data
|
||||
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
|
||||
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": 'https://i-wanna-xss-you.com?hereis=<script>alert(1)</script>', "tags": ''},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b'Watch protocol is not permitted or invalid URL format' in res.data
|
||||
|
||||
def _runner_test_various_file_slash(client, file_uri):
|
||||
|
||||
@@ -118,8 +111,8 @@ def test_file_slash_access(client, live_server, measure_memory_usage):
|
||||
|
||||
test_file_path = os.path.abspath(__file__)
|
||||
_runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}")
|
||||
# _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
|
||||
# _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
|
||||
_runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
|
||||
_runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
|
||||
|
||||
def test_xss(client, live_server, measure_memory_usage):
|
||||
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from functools import lru_cache
|
||||
|
||||
import arrow
|
||||
from enum import IntEnum
|
||||
|
||||
@@ -14,7 +12,7 @@ class Weekday(IntEnum):
|
||||
Saturday = 5
|
||||
Sunday = 6
|
||||
|
||||
@lru_cache(maxsize=100)
|
||||
|
||||
def am_i_inside_time(
|
||||
day_of_week: str,
|
||||
time_str: str,
|
||||
|
||||
@@ -1,109 +0,0 @@
|
||||
from functools import lru_cache
|
||||
from loguru import logger
|
||||
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
|
||||
|
||||
|
||||
def normalize_url_encoding(url):
|
||||
"""
|
||||
Safely encode a URL's query parameters, regardless of whether they're already encoded.
|
||||
|
||||
Why this is necessary:
|
||||
URLs can arrive in various states - some with already encoded query parameters (%20 for spaces),
|
||||
some with unencoded parameters (literal spaces), or a mix of both. The validators.url() function
|
||||
requires proper encoding, but simply encoding an already-encoded URL would double-encode it
|
||||
(e.g., %20 would become %2520).
|
||||
|
||||
This function solves the problem by:
|
||||
1. Parsing the URL to extract query parameters
|
||||
2. parse_qsl() automatically decodes parameters if they're encoded
|
||||
3. urlencode() re-encodes them properly
|
||||
4. Returns a consistently encoded URL that will pass validation
|
||||
|
||||
Example:
|
||||
- Input: "http://example.com/test?time=2025-10-28 09:19" (space not encoded)
|
||||
- Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
|
||||
|
||||
- Input: "http://example.com/test?time=2025-10-28%2009:19" (already encoded)
|
||||
- Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
|
||||
|
||||
Returns a properly encoded URL string.
|
||||
"""
|
||||
try:
|
||||
# Parse the URL into components (scheme, netloc, path, params, query, fragment)
|
||||
parsed = urlparse(url)
|
||||
|
||||
# Parse query string - this automatically decodes it if encoded
|
||||
# parse_qsl handles both encoded and unencoded query strings gracefully
|
||||
query_params = parse_qsl(parsed.query, keep_blank_values=True)
|
||||
|
||||
# Re-encode the query string properly using standard URL encoding
|
||||
encoded_query = urlencode(query_params, safe='')
|
||||
|
||||
# Reconstruct the URL with properly encoded query string
|
||||
normalized = urlunparse((
|
||||
parsed.scheme,
|
||||
parsed.netloc,
|
||||
parsed.path,
|
||||
parsed.params,
|
||||
encoded_query, # Use the re-encoded query
|
||||
parsed.fragment
|
||||
))
|
||||
|
||||
return normalized
|
||||
except Exception as e:
|
||||
# If parsing fails for any reason, return original URL
|
||||
logger.debug(f"URL normalization failed for '{url}': {e}")
|
||||
return url
|
||||
|
||||
|
||||
@lru_cache(maxsize=10000)
|
||||
def is_safe_valid_url(test_url):
|
||||
from changedetectionio import strtobool
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
import os
|
||||
import re
|
||||
import validators
|
||||
|
||||
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
|
||||
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
|
||||
|
||||
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
|
||||
|
||||
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
|
||||
# 'source:' is a valid way to tell us to return the source
|
||||
|
||||
r = re.compile('^source:', re.IGNORECASE)
|
||||
test_url = r.sub('', test_url)
|
||||
|
||||
# Check the actual rendered URL in case of any Jinja markup
|
||||
try:
|
||||
test_url = jinja_render(test_url)
|
||||
except Exception as e:
|
||||
logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
|
||||
return False
|
||||
|
||||
# Check query parameters and fragment
|
||||
if re.search(r'[<>]', test_url):
|
||||
logger.warning(f'URL "{test_url}" contains suspicious characters')
|
||||
return False
|
||||
|
||||
# Normalize URL encoding - handle both encoded and unencoded query parameters
|
||||
test_url = normalize_url_encoding(test_url)
|
||||
|
||||
# Be sure the protocol is safe (no file, etcetc)
|
||||
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', safe_protocol_regex), re.IGNORECASE)
|
||||
if not pattern.match(test_url.strip()):
|
||||
logger.warning(f'URL "{test_url}" is not safe, aborting.')
|
||||
return False
|
||||
|
||||
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
|
||||
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
|
||||
try:
|
||||
if not test_url.strip().lower().startswith('file:') and not validators.url(test_url, simple_host=allow_simplehost):
|
||||
logger.warning(f'URL "{test_url}" failed validation, aborting.')
|
||||
return False
|
||||
except validators.ValidationError:
|
||||
logger.warning(f'URL f"{test_url}" failed validation, aborting.')
|
||||
return False
|
||||
|
||||
return True
|
||||
Reference in New Issue
Block a user