Unify safe URL checking to the one function

This commit is contained in:
dgtlmoon
2025-10-28 09:59:13 +01:00
parent 552e98519b
commit d48c82052a
9 changed files with 54 additions and 49 deletions
+2 -4
View File
@@ -1,10 +1,9 @@
import os
from changedetectionio.strtobool import strtobool
from flask_restful import abort, Resource
from flask import request
import validators
from functools import wraps
from . import auth, validate_openapi_request
from ..html_tools import is_safe_valid_url
def default_content_type(content_type='text/plain'):
@@ -50,14 +49,13 @@ class Import(Resource):
urls = request.get_data().decode('utf8').splitlines()
added = []
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
for url in urls:
url = url.strip()
if not len(url):
continue
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
if not validators.url(url, simple_host=allow_simplehost):
if not is_safe_valid_url(url):
return f"Invalid or unsupported URL - {url}", 400
if dedupe and self.datastore.url_exists(url):
+3 -5
View File
@@ -1,7 +1,7 @@
import os
from changedetectionio.strtobool import strtobool
from changedetectionio.html_tools import is_safe_url
from changedetectionio.html_tools import is_safe_valid_url
from flask_expects_json import expects_json
from changedetectionio import queuedWatchMetaData
@@ -124,7 +124,7 @@ class Watch(Resource):
return validation_error, 400
# XSS etc protection
if request.json.get('url') and not is_safe_url(request.json.get('url')):
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
return "Invalid URL", 400
watch.update(request.json)
@@ -232,9 +232,7 @@ class CreateWatch(Resource):
json_data = request.get_json()
url = json_data['url'].strip()
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
if not validators.url(url, simple_host=allow_simplehost):
if not is_safe_valid_url(url):
return "Invalid or unsupported URL", 400
if json_data.get('proxy'):
+5 -5
View File
@@ -133,10 +133,10 @@ def get_socketio_path():
# Socket.IO will be available at {prefix}/socket.io/
return prefix
@app.template_global('is_safe_url')
def _is_safe_url(test_url):
from .html_tools import is_safe_url
return is_safe_url(test_url)
@app.template_global('is_safe_valid_url')
def _is_safe_valid_url(test_url):
from .html_tools import is_safe_valid_url
return is_safe_valid_url(test_url)
@app.template_filter('format_number_locale')
@@ -387,7 +387,7 @@ def changedetection_app(config=None, datastore_o=None):
# We would sometimes get login loop errors on sites hosted in sub-paths
# note for the future:
# if not is_safe_url(next):
# if not is_safe_valid_url(next):
# return flask.abort(400)
return redirect(url_for('watchlist.index'))
+3 -15
View File
@@ -28,11 +28,8 @@ from wtforms.utils import unset_value
from wtforms.validators import ValidationError
from validators.url import url as url_validator
from changedetectionio.widgets import TernaryNoneBooleanField
# default
# each select <option data-enabled="enabled-0-0"
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
@@ -541,19 +538,10 @@ class validateURL(object):
def validate_url(test_url):
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
try:
url_validator(test_url, simple_host=allow_simplehost)
except validators.ValidationError:
#@todo check for xss
message = f"'{test_url}' is not a valid URL."
from changedetectionio.html_tools import is_safe_valid_url
if not is_safe_valid_url(test_url):
# This should be wtforms.validators.
raise ValidationError(message)
from changedetectionio.html_tools import is_safe_url
if not is_safe_url(test_url):
# This should be wtforms.validators.
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
raise ValidationError('Watch protocol is not permitted or invalid URL format')
class ValidateSinglePythonRegexString(object):
+30 -6
View File
@@ -1,3 +1,4 @@
from functools import lru_cache
from loguru import logger
from typing import List
import html
@@ -13,8 +14,6 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
# 'price' , 'lowPrice', 'highPrice' are usually under here
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
@@ -23,20 +22,45 @@ class JSONNotFound(ValueError):
def __init__(self, msg):
ValueError.__init__(self, msg)
def is_safe_url(test_url):
@lru_cache(maxsize=10000)
def is_safe_valid_url(test_url):
import os
import validators
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio import strtobool
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile(re.escape('source:'), re.IGNORECASE)
r = re.compile('^source:', re.IGNORECASE)
test_url = r.sub('', test_url)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
if not pattern.match(test_url.strip()):
# Check the actual rendered URL in case of any Jinja markup
try:
test_url = jinja_render(test_url)
except Exception as e:
logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
return False
# Be sure the protocol is safe (no file, etcetc)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', safe_protocol_regex), re.IGNORECASE)
if not pattern.match(test_url.strip()):
logger.warning(f'URL "{test_url}" is not safe, aborting.')
return False
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
try:
if not validators.url(test_url, simple_host=allow_simplehost):
logger.warning(f'URLf "{test_url}" failed validation, aborting.')
return False
except validators.ValidationError:
logger.warning(f'URL f"{test_url}" failed validation, aborting.')
return False
return True
# Doesn't look like python supports forward slash auto enclosure in re.findall
+3 -6
View File
@@ -1,5 +1,5 @@
from blinker import signal
from changedetectionio.html_tools import is_safe_url
from changedetectionio.html_tools import is_safe_valid_url
from changedetectionio.strtobool import strtobool
from changedetectionio.jinja2_custom import render as jinja_render
from . import watch_base
@@ -12,9 +12,6 @@ from .. import jinja2_custom as safe_jinja
from ..diff import ADDED_PLACEMARKER_OPEN
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
# Allowable protocols, protects against javascript: etc
# file:// is further checked by ALLOW_FILE_URI
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
@@ -63,7 +60,7 @@ class model(watch_base):
def link(self):
url = self.get('url', '')
if not is_safe_url(url):
if not is_safe_valid_url(url):
return 'DISABLED'
ready_url = url
@@ -84,7 +81,7 @@ class model(watch_base):
ready_url=ready_url.replace('source:', '')
# Also double check it after any Jinja2 formatting just incase
if not is_safe_url(ready_url):
if not is_safe_valid_url(ready_url):
return 'DISABLED'
return ready_url
+3 -3
View File
@@ -1,5 +1,5 @@
from changedetectionio.strtobool import strtobool
from changedetectionio.html_tools import is_safe_url
from changedetectionio.html_tools import is_safe_valid_url
from flask import (
flash
@@ -341,8 +341,8 @@ class ChangeDetectionStore:
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
flash("Error fetching metadata for {}".format(url), 'error')
return False
if not is_safe_url(url):
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
if not is_safe_valid_url(url):
flash('Watch protocol is not permitted or invalid URL format', 'error')
return None
if tag and type(tag) == str:
+1 -1
View File
@@ -53,7 +53,7 @@
<a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
<strong>Change</strong>Detection.io</a>
{% endif %}
{% if current_diff_url and is_safe_url(current_diff_url) %}
{% if current_diff_url and is_safe_valid_url(current_diff_url) %}
<a class="current-diff-url" href="{{ current_diff_url }}">
<span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
{% else %}
+4 -4
View File
@@ -48,7 +48,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
@@ -56,7 +56,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
@@ -64,7 +64,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
assert b'Watch protocol is not permitted or invalid URL format' in res.data
res = client.post(
@@ -73,7 +73,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
assert b'Watch protocol is not permitted or invalid URL format' in res.data
def _runner_test_various_file_slash(client, file_uri):