mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-03 16:17:51 +00:00 
			
		
		
		
	Compare commits
	
		
			11 Commits
		
	
	
		
			0.50.35
			...
			url-valida
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					d07cb2a7a8 | ||
| 
						 | 
					f9cd4da2bb | ||
| 
						 | 
					4e699cc13b | ||
| 
						 | 
					964302cf3c | ||
| 
						 | 
					00b31cabd5 | ||
| 
						 | 
					ac13d8cbde | ||
| 
						 | 
					a037cf7b9a | ||
| 
						 | 
					8a7ea79fb3 | ||
| 
						 | 
					ec0d7cff21 | ||
| 
						 | 
					d48c82052a | ||
| 
						 | 
					552e98519b | 
@@ -1,10 +1,9 @@
 | 
			
		||||
import os
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from flask_restful import abort, Resource
 | 
			
		||||
from flask import request
 | 
			
		||||
import validators
 | 
			
		||||
from functools import wraps
 | 
			
		||||
from . import auth, validate_openapi_request
 | 
			
		||||
from ..validate_url import is_safe_valid_url
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def default_content_type(content_type='text/plain'):
 | 
			
		||||
@@ -50,14 +49,13 @@ class Import(Resource):
 | 
			
		||||
 | 
			
		||||
        urls = request.get_data().decode('utf8').splitlines()
 | 
			
		||||
        added = []
 | 
			
		||||
        allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
 | 
			
		||||
        for url in urls:
 | 
			
		||||
            url = url.strip()
 | 
			
		||||
            if not len(url):
 | 
			
		||||
                continue
 | 
			
		||||
 | 
			
		||||
            # If hosts that only contain alphanumerics are allowed ("localhost" for example)
 | 
			
		||||
            if not validators.url(url, simple_host=allow_simplehost):
 | 
			
		||||
            if not is_safe_valid_url(url):
 | 
			
		||||
                return f"Invalid or unsupported URL - {url}", 400
 | 
			
		||||
 | 
			
		||||
            if dedupe and self.datastore.url_exists(url):
 | 
			
		||||
 
 | 
			
		||||
@@ -1,14 +1,12 @@
 | 
			
		||||
import os
 | 
			
		||||
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from changedetectionio.html_tools import is_safe_url
 | 
			
		||||
from changedetectionio.validate_url import is_safe_valid_url
 | 
			
		||||
 | 
			
		||||
from flask_expects_json import expects_json
 | 
			
		||||
from changedetectionio import queuedWatchMetaData
 | 
			
		||||
from changedetectionio import worker_handler
 | 
			
		||||
from flask_restful import abort, Resource
 | 
			
		||||
from flask import request, make_response, send_from_directory
 | 
			
		||||
import validators
 | 
			
		||||
from . import auth
 | 
			
		||||
import copy
 | 
			
		||||
 | 
			
		||||
@@ -124,7 +122,7 @@ class Watch(Resource):
 | 
			
		||||
            return validation_error, 400
 | 
			
		||||
 | 
			
		||||
        # XSS etc protection
 | 
			
		||||
        if request.json.get('url') and not is_safe_url(request.json.get('url')):
 | 
			
		||||
        if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
 | 
			
		||||
            return "Invalid URL", 400
 | 
			
		||||
 | 
			
		||||
        watch.update(request.json)
 | 
			
		||||
@@ -232,9 +230,7 @@ class CreateWatch(Resource):
 | 
			
		||||
        json_data = request.get_json()
 | 
			
		||||
        url = json_data['url'].strip()
 | 
			
		||||
 | 
			
		||||
        # If hosts that only contain alphanumerics are allowed ("localhost" for example)
 | 
			
		||||
        allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
 | 
			
		||||
        if not validators.url(url, simple_host=allow_simplehost):
 | 
			
		||||
        if not is_safe_valid_url(url):
 | 
			
		||||
            return "Invalid or unsupported URL", 400
 | 
			
		||||
 | 
			
		||||
        if json_data.get('proxy'):
 | 
			
		||||
 
 | 
			
		||||
@@ -133,10 +133,10 @@ def get_socketio_path():
 | 
			
		||||
    # Socket.IO will be available at {prefix}/socket.io/
 | 
			
		||||
    return prefix
 | 
			
		||||
 | 
			
		||||
@app.template_global('is_safe_url')
 | 
			
		||||
def _is_safe_url(test_url):
 | 
			
		||||
    from .html_tools import is_safe_url
 | 
			
		||||
    return is_safe_url(test_url)
 | 
			
		||||
@app.template_global('is_safe_valid_url')
 | 
			
		||||
def _is_safe_valid_url(test_url):
 | 
			
		||||
    from .validate_url import is_safe_valid_url
 | 
			
		||||
    return is_safe_valid_url(test_url)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@app.template_filter('format_number_locale')
 | 
			
		||||
@@ -387,7 +387,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
            # We would sometimes get login loop errors on sites hosted in sub-paths
 | 
			
		||||
 | 
			
		||||
            # note for the future:
 | 
			
		||||
            #            if not is_safe_url(next):
 | 
			
		||||
            #            if not is_safe_valid_url(next):
 | 
			
		||||
            #                return flask.abort(400)
 | 
			
		||||
            return redirect(url_for('watchlist.index'))
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -28,11 +28,8 @@ from wtforms.utils import unset_value
 | 
			
		||||
 | 
			
		||||
from wtforms.validators import ValidationError
 | 
			
		||||
 | 
			
		||||
from validators.url import url as url_validator
 | 
			
		||||
 | 
			
		||||
from changedetectionio.widgets import TernaryNoneBooleanField
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# default
 | 
			
		||||
# each select <option data-enabled="enabled-0-0"
 | 
			
		||||
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
 | 
			
		||||
@@ -541,19 +538,10 @@ class validateURL(object):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def validate_url(test_url):
 | 
			
		||||
    # If hosts that only contain alphanumerics are allowed ("localhost" for example)
 | 
			
		||||
    try:
 | 
			
		||||
        url_validator(test_url, simple_host=allow_simplehost)
 | 
			
		||||
    except validators.ValidationError:
 | 
			
		||||
        #@todo check for xss
 | 
			
		||||
        message = f"'{test_url}' is not a valid URL."
 | 
			
		||||
    from changedetectionio.validate_url import is_safe_valid_url
 | 
			
		||||
    if not is_safe_valid_url(test_url):
 | 
			
		||||
        # This should be wtforms.validators.
 | 
			
		||||
        raise ValidationError(message)
 | 
			
		||||
 | 
			
		||||
    from changedetectionio.html_tools import is_safe_url
 | 
			
		||||
    if not is_safe_url(test_url):
 | 
			
		||||
        # This should be wtforms.validators.
 | 
			
		||||
        raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
 | 
			
		||||
        raise ValidationError('Watch protocol is not permitted or invalid URL format')
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class ValidateSinglePythonRegexString(object):
 | 
			
		||||
 
 | 
			
		||||
@@ -15,8 +15,6 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
 | 
			
		||||
META_CS  = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
 | 
			
		||||
META_CT  = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
 | 
			
		||||
 | 
			
		||||
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
 | 
			
		||||
 | 
			
		||||
# 'price' , 'lowPrice', 'highPrice' are usually under here
 | 
			
		||||
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
 | 
			
		||||
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
 | 
			
		||||
@@ -25,22 +23,6 @@ class JSONNotFound(ValueError):
 | 
			
		||||
    def __init__(self, msg):
 | 
			
		||||
        ValueError.__init__(self, msg)
 | 
			
		||||
 | 
			
		||||
def is_safe_url(test_url):
 | 
			
		||||
    import os
 | 
			
		||||
    # See https://github.com/dgtlmoon/changedetection.io/issues/1358
 | 
			
		||||
 | 
			
		||||
    # Remove 'source:' prefix so we dont get 'source:javascript:' etc
 | 
			
		||||
    # 'source:' is a valid way to tell us to return the source
 | 
			
		||||
 | 
			
		||||
    r = re.compile(re.escape('source:'), re.IGNORECASE)
 | 
			
		||||
    test_url = r.sub('', test_url)
 | 
			
		||||
 | 
			
		||||
    pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
 | 
			
		||||
    if not pattern.match(test_url.strip()):
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    return True
 | 
			
		||||
 | 
			
		||||
# Doesn't look like python supports forward slash auto enclosure in re.findall
 | 
			
		||||
# So convert it to inline flag "(?i)foobar" type configuration
 | 
			
		||||
@lru_cache(maxsize=100)
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,6 @@
 | 
			
		||||
from blinker import signal
 | 
			
		||||
from changedetectionio.html_tools import is_safe_url
 | 
			
		||||
from changedetectionio.validate_url import is_safe_valid_url
 | 
			
		||||
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from changedetectionio.jinja2_custom import render as jinja_render
 | 
			
		||||
from . import watch_base
 | 
			
		||||
@@ -12,9 +13,6 @@ from .. import jinja2_custom as safe_jinja
 | 
			
		||||
from ..diff import ADDED_PLACEMARKER_OPEN
 | 
			
		||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
 | 
			
		||||
 | 
			
		||||
# Allowable protocols, protects against javascript: etc
 | 
			
		||||
# file:// is further checked by ALLOW_FILE_URI
 | 
			
		||||
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
 | 
			
		||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -63,7 +61,7 @@ class model(watch_base):
 | 
			
		||||
    def link(self):
 | 
			
		||||
 | 
			
		||||
        url = self.get('url', '')
 | 
			
		||||
        if not is_safe_url(url):
 | 
			
		||||
        if not is_safe_valid_url(url):
 | 
			
		||||
            return 'DISABLED'
 | 
			
		||||
 | 
			
		||||
        ready_url = url
 | 
			
		||||
@@ -84,7 +82,7 @@ class model(watch_base):
 | 
			
		||||
            ready_url=ready_url.replace('source:', '')
 | 
			
		||||
 | 
			
		||||
        # Also double check it after any Jinja2 formatting just incase
 | 
			
		||||
        if not is_safe_url(ready_url):
 | 
			
		||||
        if not is_safe_valid_url(ready_url):
 | 
			
		||||
            return 'DISABLED'
 | 
			
		||||
        return ready_url
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -1,5 +1,6 @@
 | 
			
		||||
from changedetectionio.strtobool import strtobool
 | 
			
		||||
from changedetectionio.html_tools import is_safe_url
 | 
			
		||||
 | 
			
		||||
from changedetectionio.validate_url import is_safe_valid_url
 | 
			
		||||
 | 
			
		||||
from flask import (
 | 
			
		||||
    flash
 | 
			
		||||
@@ -341,8 +342,10 @@ class ChangeDetectionStore:
 | 
			
		||||
                logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
 | 
			
		||||
                flash("Error fetching metadata for {}".format(url), 'error')
 | 
			
		||||
                return False
 | 
			
		||||
        if not is_safe_url(url):
 | 
			
		||||
            flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
 | 
			
		||||
 | 
			
		||||
        if not is_safe_valid_url(url):
 | 
			
		||||
            flash('Watch protocol is not permitted or invalid URL format', 'error')
 | 
			
		||||
 | 
			
		||||
            return None
 | 
			
		||||
 | 
			
		||||
        if tag and type(tag) == str:
 | 
			
		||||
 
 | 
			
		||||
@@ -53,7 +53,7 @@
 | 
			
		||||
          <a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
 | 
			
		||||
            <strong>Change</strong>Detection.io</a>
 | 
			
		||||
        {% endif %}
 | 
			
		||||
        {% if current_diff_url and is_safe_url(current_diff_url) %}
 | 
			
		||||
        {% if current_diff_url and is_safe_valid_url(current_diff_url) %}
 | 
			
		||||
          <a class="current-diff-url" href="{{ current_diff_url }}">
 | 
			
		||||
            <span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
 | 
			
		||||
        {% else %}
 | 
			
		||||
 
 | 
			
		||||
@@ -64,29 +64,21 @@ def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usa
 | 
			
		||||
    # Should not have template error
 | 
			
		||||
    assert b'Invalid template' not in res.data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
 | 
			
		||||
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    # Add our URL to the import page
 | 
			
		||||
    test_url = url_for('test_return_query', _external=True)
 | 
			
		||||
 | 
			
		||||
    # because url_for() will URL-encode the var, but we dont here
 | 
			
		||||
    full_url = "{}?{}".format(test_url,
 | 
			
		||||
                              "date={{ ''.__class__.__mro__[1].__subclasses__()}}", )
 | 
			
		||||
    full_url = test_url + "?date={{ ''.__class__.__mro__[1].__subclasses__()}}"
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("ui.ui_views.form_quick_watch_add"),
 | 
			
		||||
        data={"url": full_url, "tags": "test"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
    assert b"Watch added" in res.data
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
    assert b"Watch added" not in res.data
 | 
			
		||||
 | 
			
		||||
    # It should report nothing found (no new 'has-unread-changes' class)
 | 
			
		||||
    res = client.get(url_for("watchlist.index"))
 | 
			
		||||
    assert b'is invalid and cannot be used' in res.data
 | 
			
		||||
    # Some of the spewed output from the subclasses
 | 
			
		||||
    assert b'dict_values' not in res.data
 | 
			
		||||
 | 
			
		||||
def test_timezone(mocker):
 | 
			
		||||
    """Verify that timezone is parsed."""
 | 
			
		||||
 
 | 
			
		||||
@@ -25,7 +25,7 @@ def set_original_response():
 | 
			
		||||
    return None
 | 
			
		||||
 | 
			
		||||
def test_bad_access(client, live_server, measure_memory_usage):
 | 
			
		||||
    
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("imports.import_page"),
 | 
			
		||||
        data={"urls": 'https://localhost'},
 | 
			
		||||
@@ -48,7 +48,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
 | 
			
		||||
    assert b'Watch protocol is not permitted or invalid URL format' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("ui.ui_views.form_quick_watch_add"),
 | 
			
		||||
@@ -56,7 +56,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
 | 
			
		||||
    assert b'Watch protocol is not permitted or invalid URL format' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("ui.ui_views.form_quick_watch_add"),
 | 
			
		||||
@@ -64,7 +64,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
 | 
			
		||||
    assert b'Watch protocol is not permitted or invalid URL format' in res.data
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
@@ -73,8 +73,15 @@ def test_bad_access(client, live_server, measure_memory_usage):
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
 | 
			
		||||
    assert b'Watch protocol is not permitted or invalid URL format' in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("ui.ui_views.form_quick_watch_add"),
 | 
			
		||||
        data={"url": 'https://i-wanna-xss-you.com?hereis=<script>alert(1)</script>', "tags": ''},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b'Watch protocol is not permitted or invalid URL format' in res.data
 | 
			
		||||
 | 
			
		||||
def _runner_test_various_file_slash(client, file_uri):
 | 
			
		||||
 | 
			
		||||
@@ -111,8 +118,8 @@ def test_file_slash_access(client, live_server, measure_memory_usage):
 | 
			
		||||
 | 
			
		||||
    test_file_path = os.path.abspath(__file__)
 | 
			
		||||
    _runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}")
 | 
			
		||||
    _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
 | 
			
		||||
    _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
 | 
			
		||||
#    _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
 | 
			
		||||
#    _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
 | 
			
		||||
 | 
			
		||||
def test_xss(client, live_server, measure_memory_usage):
 | 
			
		||||
    
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										109
									
								
								changedetectionio/validate_url.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										109
									
								
								changedetectionio/validate_url.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,109 @@
 | 
			
		||||
from functools import lru_cache
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def normalize_url_encoding(url):
 | 
			
		||||
    """
 | 
			
		||||
    Safely encode a URL's query parameters, regardless of whether they're already encoded.
 | 
			
		||||
 | 
			
		||||
    Why this is necessary:
 | 
			
		||||
    URLs can arrive in various states - some with already encoded query parameters (%20 for spaces),
 | 
			
		||||
    some with unencoded parameters (literal spaces), or a mix of both. The validators.url() function
 | 
			
		||||
    requires proper encoding, but simply encoding an already-encoded URL would double-encode it
 | 
			
		||||
    (e.g., %20 would become %2520).
 | 
			
		||||
 | 
			
		||||
    This function solves the problem by:
 | 
			
		||||
    1. Parsing the URL to extract query parameters
 | 
			
		||||
    2. parse_qsl() automatically decodes parameters if they're encoded
 | 
			
		||||
    3. urlencode() re-encodes them properly
 | 
			
		||||
    4. Returns a consistently encoded URL that will pass validation
 | 
			
		||||
 | 
			
		||||
    Example:
 | 
			
		||||
    - Input:  "http://example.com/test?time=2025-10-28 09:19"  (space not encoded)
 | 
			
		||||
    - Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
 | 
			
		||||
 | 
			
		||||
    - Input:  "http://example.com/test?time=2025-10-28%2009:19" (already encoded)
 | 
			
		||||
    - Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
 | 
			
		||||
 | 
			
		||||
    Returns a properly encoded URL string.
 | 
			
		||||
    """
 | 
			
		||||
    try:
 | 
			
		||||
        # Parse the URL into components (scheme, netloc, path, params, query, fragment)
 | 
			
		||||
        parsed = urlparse(url)
 | 
			
		||||
 | 
			
		||||
        # Parse query string - this automatically decodes it if encoded
 | 
			
		||||
        # parse_qsl handles both encoded and unencoded query strings gracefully
 | 
			
		||||
        query_params = parse_qsl(parsed.query, keep_blank_values=True)
 | 
			
		||||
 | 
			
		||||
        # Re-encode the query string properly using standard URL encoding
 | 
			
		||||
        encoded_query = urlencode(query_params, safe='')
 | 
			
		||||
 | 
			
		||||
        # Reconstruct the URL with properly encoded query string
 | 
			
		||||
        normalized = urlunparse((
 | 
			
		||||
            parsed.scheme,
 | 
			
		||||
            parsed.netloc,
 | 
			
		||||
            parsed.path,
 | 
			
		||||
            parsed.params,
 | 
			
		||||
            encoded_query,  # Use the re-encoded query
 | 
			
		||||
            parsed.fragment
 | 
			
		||||
        ))
 | 
			
		||||
 | 
			
		||||
        return normalized
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        # If parsing fails for any reason, return original URL
 | 
			
		||||
        logger.debug(f"URL normalization failed for '{url}': {e}")
 | 
			
		||||
        return url
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@lru_cache(maxsize=10000)
 | 
			
		||||
def is_safe_valid_url(test_url):
 | 
			
		||||
    from changedetectionio import strtobool
 | 
			
		||||
    from changedetectionio.jinja2_custom import render as jinja_render
 | 
			
		||||
    import os
 | 
			
		||||
    import re
 | 
			
		||||
    import validators
 | 
			
		||||
 | 
			
		||||
    allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
 | 
			
		||||
    safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
 | 
			
		||||
 | 
			
		||||
    # See https://github.com/dgtlmoon/changedetection.io/issues/1358
 | 
			
		||||
 | 
			
		||||
    # Remove 'source:' prefix so we dont get 'source:javascript:' etc
 | 
			
		||||
    # 'source:' is a valid way to tell us to return the source
 | 
			
		||||
 | 
			
		||||
    r = re.compile('^source:', re.IGNORECASE)
 | 
			
		||||
    test_url = r.sub('', test_url)
 | 
			
		||||
 | 
			
		||||
    # Check the actual rendered URL in case of any Jinja markup
 | 
			
		||||
    try:
 | 
			
		||||
        test_url = jinja_render(test_url)
 | 
			
		||||
    except Exception as e:
 | 
			
		||||
        logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    # Check query parameters and fragment
 | 
			
		||||
    if re.search(r'[<>]', test_url):
 | 
			
		||||
        logger.warning(f'URL "{test_url}" contains suspicious characters')
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    # Normalize URL encoding - handle both encoded and unencoded query parameters
 | 
			
		||||
    test_url = normalize_url_encoding(test_url)
 | 
			
		||||
 | 
			
		||||
    # Be sure the protocol is safe (no file, etcetc)
 | 
			
		||||
    pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', safe_protocol_regex), re.IGNORECASE)
 | 
			
		||||
    if not pattern.match(test_url.strip()):
 | 
			
		||||
        logger.warning(f'URL "{test_url}" is not safe, aborting.')
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    # If hosts that only contain alphanumerics are allowed ("localhost" for example)
 | 
			
		||||
    allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
 | 
			
		||||
    try:
 | 
			
		||||
        if not test_url.strip().lower().startswith('file:') and not validators.url(test_url, simple_host=allow_simplehost):
 | 
			
		||||
            logger.warning(f'URL "{test_url}" failed validation, aborting.')
 | 
			
		||||
            return False
 | 
			
		||||
    except validators.ValidationError:
 | 
			
		||||
        logger.warning(f'URL f"{test_url}" failed validation, aborting.')
 | 
			
		||||
        return False
 | 
			
		||||
 | 
			
		||||
    return True
 | 
			
		||||
		Reference in New Issue
	
	Block a user