mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 06:37:41 +00:00 
			
		
		
		
	Compare commits
	
		
			11 Commits
		
	
	
		
			send-test-
			...
			url-valida
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d07cb2a7a8 | ||
|   | f9cd4da2bb | ||
|   | 4e699cc13b | ||
|   | 964302cf3c | ||
|   | 00b31cabd5 | ||
|   | ac13d8cbde | ||
|   | a037cf7b9a | ||
|   | 8a7ea79fb3 | ||
|   | ec0d7cff21 | ||
|   | d48c82052a | ||
|   | 552e98519b | 
| @@ -1,10 +1,9 @@ | ||||
| import os | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from flask_restful import abort, Resource | ||||
| from flask import request | ||||
| import validators | ||||
| from functools import wraps | ||||
| from . import auth, validate_openapi_request | ||||
| from ..validate_url import is_safe_valid_url | ||||
|  | ||||
|  | ||||
| def default_content_type(content_type='text/plain'): | ||||
| @@ -50,14 +49,13 @@ class Import(Resource): | ||||
|  | ||||
|         urls = request.get_data().decode('utf8').splitlines() | ||||
|         added = [] | ||||
|         allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False')) | ||||
|         for url in urls: | ||||
|             url = url.strip() | ||||
|             if not len(url): | ||||
|                 continue | ||||
|  | ||||
|             # If hosts that only contain alphanumerics are allowed ("localhost" for example) | ||||
|             if not validators.url(url, simple_host=allow_simplehost): | ||||
|             if not is_safe_valid_url(url): | ||||
|                 return f"Invalid or unsupported URL - {url}", 400 | ||||
|  | ||||
|             if dedupe and self.datastore.url_exists(url): | ||||
|   | ||||
| @@ -1,14 +1,12 @@ | ||||
| import os | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
| from changedetectionio.validate_url import is_safe_valid_url | ||||
|  | ||||
| from flask_expects_json import expects_json | ||||
| from changedetectionio import queuedWatchMetaData | ||||
| from changedetectionio import worker_handler | ||||
| from flask_restful import abort, Resource | ||||
| from flask import request, make_response, send_from_directory | ||||
| import validators | ||||
| from . import auth | ||||
| import copy | ||||
|  | ||||
| @@ -124,7 +122,7 @@ class Watch(Resource): | ||||
|             return validation_error, 400 | ||||
|  | ||||
|         # XSS etc protection | ||||
|         if request.json.get('url') and not is_safe_url(request.json.get('url')): | ||||
|         if request.json.get('url') and not is_safe_valid_url(request.json.get('url')): | ||||
|             return "Invalid URL", 400 | ||||
|  | ||||
|         watch.update(request.json) | ||||
| @@ -232,9 +230,7 @@ class CreateWatch(Resource): | ||||
|         json_data = request.get_json() | ||||
|         url = json_data['url'].strip() | ||||
|  | ||||
|         # If hosts that only contain alphanumerics are allowed ("localhost" for example) | ||||
|         allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False')) | ||||
|         if not validators.url(url, simple_host=allow_simplehost): | ||||
|         if not is_safe_valid_url(url): | ||||
|             return "Invalid or unsupported URL", 400 | ||||
|  | ||||
|         if json_data.get('proxy'): | ||||
|   | ||||
| @@ -133,10 +133,10 @@ def get_socketio_path(): | ||||
|     # Socket.IO will be available at {prefix}/socket.io/ | ||||
|     return prefix | ||||
|  | ||||
| @app.template_global('is_safe_url') | ||||
| def _is_safe_url(test_url): | ||||
|     from .html_tools import is_safe_url | ||||
|     return is_safe_url(test_url) | ||||
| @app.template_global('is_safe_valid_url') | ||||
| def _is_safe_valid_url(test_url): | ||||
|     from .validate_url import is_safe_valid_url | ||||
|     return is_safe_valid_url(test_url) | ||||
|  | ||||
|  | ||||
| @app.template_filter('format_number_locale') | ||||
| @@ -387,7 +387,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|             # We would sometimes get login loop errors on sites hosted in sub-paths | ||||
|  | ||||
|             # note for the future: | ||||
|             #            if not is_safe_url(next): | ||||
|             #            if not is_safe_valid_url(next): | ||||
|             #                return flask.abort(400) | ||||
|             return redirect(url_for('watchlist.index')) | ||||
|  | ||||
|   | ||||
| @@ -28,11 +28,8 @@ from wtforms.utils import unset_value | ||||
|  | ||||
| from wtforms.validators import ValidationError | ||||
|  | ||||
| from validators.url import url as url_validator | ||||
|  | ||||
| from changedetectionio.widgets import TernaryNoneBooleanField | ||||
|  | ||||
|  | ||||
| # default | ||||
| # each select <option data-enabled="enabled-0-0" | ||||
| from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config | ||||
| @@ -541,19 +538,10 @@ class validateURL(object): | ||||
|  | ||||
|  | ||||
| def validate_url(test_url): | ||||
|     # If hosts that only contain alphanumerics are allowed ("localhost" for example) | ||||
|     try: | ||||
|         url_validator(test_url, simple_host=allow_simplehost) | ||||
|     except validators.ValidationError: | ||||
|         #@todo check for xss | ||||
|         message = f"'{test_url}' is not a valid URL." | ||||
|     from changedetectionio.validate_url import is_safe_valid_url | ||||
|     if not is_safe_valid_url(test_url): | ||||
|         # This should be wtforms.validators. | ||||
|         raise ValidationError(message) | ||||
|  | ||||
|     from changedetectionio.html_tools import is_safe_url | ||||
|     if not is_safe_url(test_url): | ||||
|         # This should be wtforms.validators. | ||||
|         raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format') | ||||
|         raise ValidationError('Watch protocol is not permitted or invalid URL format') | ||||
|  | ||||
|  | ||||
| class ValidateSinglePythonRegexString(object): | ||||
|   | ||||
| @@ -15,8 +15,6 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S) | ||||
| META_CS  = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I) | ||||
| META_CT  = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I) | ||||
|  | ||||
| SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):' | ||||
|  | ||||
| # 'price' , 'lowPrice', 'highPrice' are usually under here | ||||
| # All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here | ||||
| LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"] | ||||
| @@ -25,22 +23,6 @@ class JSONNotFound(ValueError): | ||||
|     def __init__(self, msg): | ||||
|         ValueError.__init__(self, msg) | ||||
|  | ||||
| def is_safe_url(test_url): | ||||
|     import os | ||||
|     # See https://github.com/dgtlmoon/changedetection.io/issues/1358 | ||||
|  | ||||
|     # Remove 'source:' prefix so we dont get 'source:javascript:' etc | ||||
|     # 'source:' is a valid way to tell us to return the source | ||||
|  | ||||
|     r = re.compile(re.escape('source:'), re.IGNORECASE) | ||||
|     test_url = r.sub('', test_url) | ||||
|  | ||||
|     pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE) | ||||
|     if not pattern.match(test_url.strip()): | ||||
|         return False | ||||
|  | ||||
|     return True | ||||
|  | ||||
| # Doesn't look like python supports forward slash auto enclosure in re.findall | ||||
| # So convert it to inline flag "(?i)foobar" type configuration | ||||
| @lru_cache(maxsize=100) | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| from blinker import signal | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
| from changedetectionio.validate_url import is_safe_valid_url | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.jinja2_custom import render as jinja_render | ||||
| from . import watch_base | ||||
| @@ -12,9 +13,6 @@ from .. import jinja2_custom as safe_jinja | ||||
| from ..diff import ADDED_PLACEMARKER_OPEN | ||||
| from ..html_tools import TRANSLATE_WHITESPACE_TABLE | ||||
|  | ||||
| # Allowable protocols, protects against javascript: etc | ||||
| # file:// is further checked by ALLOW_FILE_URI | ||||
| SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):' | ||||
| FAVICON_RESAVE_THRESHOLD_SECONDS=86400 | ||||
|  | ||||
|  | ||||
| @@ -63,7 +61,7 @@ class model(watch_base): | ||||
|     def link(self): | ||||
|  | ||||
|         url = self.get('url', '') | ||||
|         if not is_safe_url(url): | ||||
|         if not is_safe_valid_url(url): | ||||
|             return 'DISABLED' | ||||
|  | ||||
|         ready_url = url | ||||
| @@ -84,7 +82,7 @@ class model(watch_base): | ||||
|             ready_url=ready_url.replace('source:', '') | ||||
|  | ||||
|         # Also double check it after any Jinja2 formatting just incase | ||||
|         if not is_safe_url(ready_url): | ||||
|         if not is_safe_valid_url(ready_url): | ||||
|             return 'DISABLED' | ||||
|         return ready_url | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.html_tools import is_safe_url | ||||
|  | ||||
| from changedetectionio.validate_url import is_safe_valid_url | ||||
|  | ||||
| from flask import ( | ||||
|     flash | ||||
| @@ -341,8 +342,10 @@ class ChangeDetectionStore: | ||||
|                 logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") | ||||
|                 flash("Error fetching metadata for {}".format(url), 'error') | ||||
|                 return False | ||||
|         if not is_safe_url(url): | ||||
|             flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error') | ||||
|  | ||||
|         if not is_safe_valid_url(url): | ||||
|             flash('Watch protocol is not permitted or invalid URL format', 'error') | ||||
|  | ||||
|             return None | ||||
|  | ||||
|         if tag and type(tag) == str: | ||||
|   | ||||
| @@ -53,7 +53,7 @@ | ||||
|           <a class="pure-menu-heading" href="{{url_for('watchlist.index')}}"> | ||||
|             <strong>Change</strong>Detection.io</a> | ||||
|         {% endif %} | ||||
|         {% if current_diff_url and is_safe_url(current_diff_url) %} | ||||
|         {% if current_diff_url and is_safe_valid_url(current_diff_url) %} | ||||
|           <a class="current-diff-url" href="{{ current_diff_url }}"> | ||||
|             <span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a> | ||||
|         {% else %} | ||||
|   | ||||
| @@ -64,29 +64,21 @@ def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usa | ||||
|     # Should not have template error | ||||
|     assert b'Invalid template' not in res.data | ||||
|  | ||||
|  | ||||
| # https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456 | ||||
| def test_jinja2_security_url_query(client, live_server, measure_memory_usage): | ||||
|      | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     test_url = url_for('test_return_query', _external=True) | ||||
|  | ||||
|     # because url_for() will URL-encode the var, but we dont here | ||||
|     full_url = "{}?{}".format(test_url, | ||||
|                               "date={{ ''.__class__.__mro__[1].__subclasses__()}}", ) | ||||
|     full_url = test_url + "?date={{ ''.__class__.__mro__[1].__subclasses__()}}" | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
|         data={"url": full_url, "tags": "test"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b"Watch added" in res.data | ||||
|     wait_for_all_checks(client) | ||||
|     assert b"Watch added" not in res.data | ||||
|  | ||||
|     # It should report nothing found (no new 'has-unread-changes' class) | ||||
|     res = client.get(url_for("watchlist.index")) | ||||
|     assert b'is invalid and cannot be used' in res.data | ||||
|     # Some of the spewed output from the subclasses | ||||
|     assert b'dict_values' not in res.data | ||||
|  | ||||
| def test_timezone(mocker): | ||||
|     """Verify that timezone is parsed.""" | ||||
|   | ||||
| @@ -25,7 +25,7 @@ def set_original_response(): | ||||
|     return None | ||||
|  | ||||
| def test_bad_access(client, live_server, measure_memory_usage): | ||||
|      | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("imports.import_page"), | ||||
|         data={"urls": 'https://localhost'}, | ||||
| @@ -48,7 +48,7 @@ def test_bad_access(client, live_server, measure_memory_usage): | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data | ||||
|     assert b'Watch protocol is not permitted or invalid URL format' in res.data | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
| @@ -56,7 +56,7 @@ def test_bad_access(client, live_server, measure_memory_usage): | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data | ||||
|     assert b'Watch protocol is not permitted or invalid URL format' in res.data | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
| @@ -64,7 +64,7 @@ def test_bad_access(client, live_server, measure_memory_usage): | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data | ||||
|     assert b'Watch protocol is not permitted or invalid URL format' in res.data | ||||
|  | ||||
|  | ||||
|     res = client.post( | ||||
| @@ -73,8 +73,15 @@ def test_bad_access(client, live_server, measure_memory_usage): | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data | ||||
|     assert b'Watch protocol is not permitted or invalid URL format' in res.data | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
|         data={"url": 'https://i-wanna-xss-you.com?hereis=<script>alert(1)</script>', "tags": ''}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b'Watch protocol is not permitted or invalid URL format' in res.data | ||||
|  | ||||
| def _runner_test_various_file_slash(client, file_uri): | ||||
|  | ||||
| @@ -111,8 +118,8 @@ def test_file_slash_access(client, live_server, measure_memory_usage): | ||||
|  | ||||
|     test_file_path = os.path.abspath(__file__) | ||||
|     _runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}") | ||||
|     _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}") | ||||
|     _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509 | ||||
| #    _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}") | ||||
| #    _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509 | ||||
|  | ||||
| def test_xss(client, live_server, measure_memory_usage): | ||||
|      | ||||
|   | ||||
							
								
								
									
										109
									
								
								changedetectionio/validate_url.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										109
									
								
								changedetectionio/validate_url.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,109 @@ | ||||
| from functools import lru_cache | ||||
| from loguru import logger | ||||
| from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode | ||||
|  | ||||
|  | ||||
| def normalize_url_encoding(url): | ||||
|     """ | ||||
|     Safely encode a URL's query parameters, regardless of whether they're already encoded. | ||||
|  | ||||
|     Why this is necessary: | ||||
|     URLs can arrive in various states - some with already encoded query parameters (%20 for spaces), | ||||
|     some with unencoded parameters (literal spaces), or a mix of both. The validators.url() function | ||||
|     requires proper encoding, but simply encoding an already-encoded URL would double-encode it | ||||
|     (e.g., %20 would become %2520). | ||||
|  | ||||
|     This function solves the problem by: | ||||
|     1. Parsing the URL to extract query parameters | ||||
|     2. parse_qsl() automatically decodes parameters if they're encoded | ||||
|     3. urlencode() re-encodes them properly | ||||
|     4. Returns a consistently encoded URL that will pass validation | ||||
|  | ||||
|     Example: | ||||
|     - Input:  "http://example.com/test?time=2025-10-28 09:19"  (space not encoded) | ||||
|     - Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded) | ||||
|  | ||||
|     - Input:  "http://example.com/test?time=2025-10-28%2009:19" (already encoded) | ||||
|     - Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded) | ||||
|  | ||||
|     Returns a properly encoded URL string. | ||||
|     """ | ||||
|     try: | ||||
|         # Parse the URL into components (scheme, netloc, path, params, query, fragment) | ||||
|         parsed = urlparse(url) | ||||
|  | ||||
|         # Parse query string - this automatically decodes it if encoded | ||||
|         # parse_qsl handles both encoded and unencoded query strings gracefully | ||||
|         query_params = parse_qsl(parsed.query, keep_blank_values=True) | ||||
|  | ||||
|         # Re-encode the query string properly using standard URL encoding | ||||
|         encoded_query = urlencode(query_params, safe='') | ||||
|  | ||||
|         # Reconstruct the URL with properly encoded query string | ||||
|         normalized = urlunparse(( | ||||
|             parsed.scheme, | ||||
|             parsed.netloc, | ||||
|             parsed.path, | ||||
|             parsed.params, | ||||
|             encoded_query,  # Use the re-encoded query | ||||
|             parsed.fragment | ||||
|         )) | ||||
|  | ||||
|         return normalized | ||||
|     except Exception as e: | ||||
|         # If parsing fails for any reason, return original URL | ||||
|         logger.debug(f"URL normalization failed for '{url}': {e}") | ||||
|         return url | ||||
|  | ||||
|  | ||||
| @lru_cache(maxsize=10000) | ||||
| def is_safe_valid_url(test_url): | ||||
|     from changedetectionio import strtobool | ||||
|     from changedetectionio.jinja2_custom import render as jinja_render | ||||
|     import os | ||||
|     import re | ||||
|     import validators | ||||
|  | ||||
|     allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false')) | ||||
|     safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):' | ||||
|  | ||||
|     # See https://github.com/dgtlmoon/changedetection.io/issues/1358 | ||||
|  | ||||
|     # Remove 'source:' prefix so we dont get 'source:javascript:' etc | ||||
|     # 'source:' is a valid way to tell us to return the source | ||||
|  | ||||
|     r = re.compile('^source:', re.IGNORECASE) | ||||
|     test_url = r.sub('', test_url) | ||||
|  | ||||
|     # Check the actual rendered URL in case of any Jinja markup | ||||
|     try: | ||||
|         test_url = jinja_render(test_url) | ||||
|     except Exception as e: | ||||
|         logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}') | ||||
|         return False | ||||
|  | ||||
|     # Check query parameters and fragment | ||||
|     if re.search(r'[<>]', test_url): | ||||
|         logger.warning(f'URL "{test_url}" contains suspicious characters') | ||||
|         return False | ||||
|  | ||||
|     # Normalize URL encoding - handle both encoded and unencoded query parameters | ||||
|     test_url = normalize_url_encoding(test_url) | ||||
|  | ||||
|     # Be sure the protocol is safe (no file, etcetc) | ||||
|     pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', safe_protocol_regex), re.IGNORECASE) | ||||
|     if not pattern.match(test_url.strip()): | ||||
|         logger.warning(f'URL "{test_url}" is not safe, aborting.') | ||||
|         return False | ||||
|  | ||||
|     # If hosts that only contain alphanumerics are allowed ("localhost" for example) | ||||
|     allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False')) | ||||
|     try: | ||||
|         if not test_url.strip().lower().startswith('file:') and not validators.url(test_url, simple_host=allow_simplehost): | ||||
|             logger.warning(f'URL "{test_url}" failed validation, aborting.') | ||||
|             return False | ||||
|     except validators.ValidationError: | ||||
|         logger.warning(f'URL f"{test_url}" failed validation, aborting.') | ||||
|         return False | ||||
|  | ||||
|     return True | ||||
		Reference in New Issue
	
	Block a user