Security - SSRF in ChangeDetection.io via urlparse/urllib3 Parser Differential

This commit is contained in:
dgtlmoon
2026-05-25 17:57:04 +02:00
parent 851c054f8b
commit 08017d66d6
5 changed files with 186 additions and 20 deletions
@@ -9,7 +9,7 @@ import asyncio
from changedetectionio import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.validate_url import is_private_hostname
from changedetectionio.validate_url import is_private_hostname, is_url_private_or_parser_confused
# "html_requests" is listed as the default fetcher in store.py!
@@ -87,10 +87,12 @@ class fetcher(Fetcher):
try:
# Fresh DNS check at fetch time — catches DNS rebinding regardless of add-time cache.
# Validates every hostname both urlparse and urllib3 see, so parser-differential
# payloads (GHSA-rph4-96w6-q594) cannot smuggle an internal target past the gate.
if not allow_iana_restricted:
parsed_initial = urlparse(url)
if parsed_initial.hostname and is_private_hostname(parsed_initial.hostname):
raise Exception(f"Fetch blocked: '{url}' resolves to a private/reserved IP address. "
if is_url_private_or_parser_confused(url):
raise Exception(f"Fetch blocked: '{url}' resolves to a private/reserved IP address "
f"or contains a parser-differential payload. "
f"Set ALLOW_IANA_RESTRICTED_ADDRESSES=true to allow.")
r = session.request(method=request_method,
@@ -111,9 +113,9 @@ class fetcher(Fetcher):
location = r.headers.get('Location', '')
redirect_url = urljoin(current_url, location)
if not allow_iana_restricted:
parsed_redirect = urlparse(redirect_url)
if parsed_redirect.hostname and is_private_hostname(parsed_redirect.hostname):
raise Exception(f"Redirect blocked: '{redirect_url}' resolves to a private/reserved IP address.")
if is_url_private_or_parser_confused(redirect_url):
raise Exception(f"Redirect blocked: '{redirect_url}' resolves to a private/reserved IP address "
f"or contains a parser-differential payload.")
current_url = redirect_url
r = session.request('GET', redirect_url,
headers=request_headers,
@@ -60,7 +60,7 @@ from apprise.utils.logic import dict_full_update
from loguru import logger
from requests.structures import CaseInsensitiveDict
from changedetectionio.validate_url import is_private_hostname
from changedetectionio.validate_url import is_private_hostname, is_url_private_or_parser_confused
SUPPORTED_HTTP_METHODS = {"get", "post", "put", "delete", "patch", "head"}
@@ -198,12 +198,14 @@ def apprise_http_custom_handler(
url = re.sub(rf"^{schema}", "https" if schema.endswith("s") else "http", parsed_url.get("url"))
# SSRF protection — block private/loopback addresses unless explicitly allowed
# SSRF protection — block private/loopback addresses unless explicitly allowed.
# Uses parser-agnostic check so urlparse/urllib3 differentials (GHSA-rph4-96w6-q594)
# can't smuggle an internal target past the gate.
if not os.getenv('ALLOW_IANA_RESTRICTED_ADDRESSES', '').lower() in ('true', '1', 'yes'):
hostname = urlparse(url).hostname or ''
if hostname and is_private_hostname(hostname):
if is_url_private_or_parser_confused(url):
raise ValueError(
f"Notification target '{hostname}' is a private/reserved address. "
f"Notification target '{url}' is a private/reserved address "
f"or contains a parser-differential payload. "
f"Set ALLOW_IANA_RESTRICTED_ADDRESSES=true to allow."
)
+6 -6
View File
@@ -5,7 +5,7 @@ import hashlib
from changedetectionio.browser_steps.browser_steps import browser_steps_get_valid_steps
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_private_hostname
from changedetectionio.validate_url import is_private_hostname, is_url_private_or_parser_confused
from copy import deepcopy
from abc import abstractmethod
import os
@@ -104,13 +104,13 @@ class difference_detection_processor():
"""
if strtobool(os.getenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')):
return
parsed = urlparse(self.watch.link)
if not parsed.hostname:
return
loop = asyncio.get_running_loop()
if await loop.run_in_executor(None, is_private_hostname, parsed.hostname):
# Use the parser-agnostic check so urlparse/urllib3 differentials (GHSA-rph4-96w6-q594)
# can't slip a private/internal hostname past this pre-flight gate.
if await loop.run_in_executor(None, is_url_private_or_parser_confused, self.watch.link):
raise Exception(
f"Fetch blocked: '{self.watch.link}' resolves to a private/reserved IP address. "
f"Fetch blocked: '{self.watch.link}' resolves to a private/reserved IP address "
f"or contains a parser-differential payload. "
f"Set ALLOW_IANA_RESTRICTED_ADDRESSES=true to allow."
)
+111 -2
View File
@@ -760,7 +760,9 @@ def test_ssrf_private_ip_blocked(client, live_server, monkeypatch, measure_memor
f = RequestsFetcher()
with patch('changedetectionio.content_fetchers.requests.is_private_hostname', return_value=True):
# Patch the underlying is_private_hostname in validate_url — the fetcher now goes through
# is_url_private_or_parser_confused() (GHSA-rph4-96w6-q594), which calls it transitively.
with patch('changedetectionio.validate_url.is_private_hostname', return_value=True):
with pytest.raises(Exception, match='private/reserved'):
f._run_sync(
url='http://example.com/',
@@ -784,7 +786,7 @@ def test_ssrf_private_ip_blocked(client, live_server, monkeypatch, measure_memor
return hostname in {'169.254.169.254', '10.0.0.1', '172.16.0.1',
'192.168.0.1', '127.0.0.1', '::1'}
with patch('changedetectionio.content_fetchers.requests.is_private_hostname',
with patch('changedetectionio.validate_url.is_private_hostname',
side_effect=_private_only_for_redirect):
with patch('requests.Session.request', return_value=mock_redirect):
with pytest.raises(Exception, match='Redirect blocked'):
@@ -829,6 +831,113 @@ def test_unresolvable_hostname_is_allowed(client, live_server, monkeypatch):
"Unresolvable hostname watch should appear in the watch overview list"
def test_ghsa_rph4_96w6_q594_urlparse_urllib3_parser_differential_ssrf(client, live_server, monkeypatch, measure_memory_usage, datastore_path):
"""
GHSA-rph4-96w6-q594: SSRF via urlparse/urllib3 parser differential.
A URL like http://INTERNAL:8888\\@PUBLIC/ is parsed two different ways:
- urlparse() treats \\@ as a credential separator hostname = PUBLIC
- urllib3 treats \\ as a path character hostname = INTERNAL
The pre-fetch SSRF check used urlparse(), but requests/urllib3 actually connected
to INTERNAL. Fix: parser-agnostic gate that (a) blocks any URL containing a
backslash and (b) validates every hostname both parsers produce.
Covers:
1. extract_url_hostnames() reveals BOTH hostnames for the payload
2. is_url_private_or_parser_confused() blocks backslash payloads outright
3. is_safe_valid_url() rejects backslash payloads at add-time
4. The /api/v1/watch add endpoint rejects the payload
5. The requests fetcher refuses the payload at fetch-time
6. The redirect-following loop refuses a backslash payload in Location
"""
from unittest.mock import patch, MagicMock
from changedetectionio.validate_url import (
extract_url_hostnames,
is_safe_valid_url,
is_url_private_or_parser_confused,
)
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')
# The published proof-of-concept payload — backslash splits the two parsers' views.
payload = "http://169.254.169.254:8888" + chr(92) + "@httpbin.org/latest/meta-data/"
# ---------------------------------------------------------------
# 1. extract_url_hostnames() returns BOTH parsers' hostnames
# ---------------------------------------------------------------
hosts = extract_url_hostnames(payload)
assert '169.254.169.254' in hosts, \
f"urllib3 sees 169.254.169.254 as the connect target; extract_url_hostnames must surface it. Got {hosts!r}"
assert 'httpbin.org' in hosts, \
f"urlparse sees httpbin.org; extract_url_hostnames must surface it too. Got {hosts!r}"
# ---------------------------------------------------------------
# 2. Parser-agnostic gate blocks the payload
# ---------------------------------------------------------------
assert is_url_private_or_parser_confused(payload), \
"Parser-differential payload must be blocked by the SSRF gate"
# And a plain backslash anywhere in the URL is enough to block, even without a private IP
assert is_url_private_or_parser_confused("http://example.com" + chr(92) + "@evil.com/"), \
"Any backslash in a URL must trigger the parser-differential block"
# Sanity: a regular public URL is not blocked
assert not is_url_private_or_parser_confused("http://example.com/path"), \
"Plain public URLs must continue to pass the gate"
# ---------------------------------------------------------------
# 3. is_safe_valid_url() rejects backslash payloads at add-time
# ---------------------------------------------------------------
assert not is_safe_valid_url(payload), \
"is_safe_valid_url must reject URLs containing a backslash (parser-differential vector)"
# ---------------------------------------------------------------
# 4. The watch-add API endpoint rejects the payload
# ---------------------------------------------------------------
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
res = client.post(
url_for('createwatch'),
data='{"url": "%s", "fetch_backend": "html_requests"}' % payload,
headers={'x-api-key': api_key, 'Content-Type': 'application/json'},
)
assert res.status_code >= 400, \
f"API must refuse to create a watch for parser-differential URL; got status {res.status_code} body {res.data!r}"
# ---------------------------------------------------------------
# 5. Requests fetcher refuses the payload at fetch-time
# ---------------------------------------------------------------
from changedetectionio.content_fetchers.requests import fetcher as RequestsFetcher
f = RequestsFetcher()
with pytest.raises(Exception, match='private/reserved|parser-differential'):
f._run_sync(
url=payload,
timeout=5,
request_headers={},
request_body=None,
request_method='GET',
)
# ---------------------------------------------------------------
# 6. A 302 Location header pointing at a backslash payload is blocked
# (open-redirect → SSRF via parser differential)
# ---------------------------------------------------------------
mock_redirect = MagicMock()
mock_redirect.is_redirect = True
mock_redirect.status_code = 302
mock_redirect.headers = {'Location': payload}
with patch('requests.Session.request', return_value=mock_redirect):
with pytest.raises(Exception, match='Redirect blocked'):
f._run_sync(
url='http://example.com/',
timeout=5,
request_headers={},
request_body=None,
request_method='GET',
)
def test_ghsa_8757_69j2_hx56_backup_restore_history_path_traversal(client, live_server, measure_memory_usage, datastore_path):
"""
GHSA-8757-69j2-hx56: Crafted backup ZIP with absolute path in history.txt must not
+53
View File
@@ -80,6 +80,52 @@ def is_private_hostname(hostname):
return False
def extract_url_hostnames(url):
"""Return every hostname this URL could resolve to under different URL parsers.
Why: urllib's urlparse() and urllib3's parse_url() disagree on URLs containing
a backslash (e.g. http://INTERNAL:8888\\@PUBLIC/ urlparse extracts PUBLIC, but
urllib3/requests will actually connect to INTERNAL). Any SSRF check that trusts
only one parser can be bypassed by the other. Callers should reject the fetch
if ANY hostname returned here is private/reserved.
See GHSA-rph4-96w6-q594.
"""
hostnames = set()
try:
h = urlparse(url).hostname
if h:
hostnames.add(h)
except Exception:
pass
try:
from urllib3.util.url import parse_url as _u3_parse_url
u3 = _u3_parse_url(url)
if u3.host:
# urllib3 keeps IPv6 brackets in `.host`; strip them so socket.getaddrinfo() accepts the literal.
hostnames.add(u3.host.strip('[]'))
except Exception:
pass
return hostnames
def is_url_private_or_parser_confused(url):
"""SSRF gate that defends against urlparse/urllib3 parser-differential attacks.
Returns True (block the fetch) when:
* the URL contains a backslash no legitimate URL needs one, and it is the
established vector for the parser-differential bypass (GHSA-rph4-96w6-q594), OR
* any hostname produced by urlparse OR urllib3 resolves to a private/reserved IP.
"""
if '\\' in url:
logger.warning(f"URL '{url}' contains a backslash — rejected to prevent urlparse/urllib3 parser-differential SSRF.")
return True
for hostname in extract_url_hostnames(url):
if is_private_hostname(hostname):
return True
return False
def is_llm_api_base_safe(api_base):
"""SSRF guard for the LLM `api_base` setting (GHSA-jrxm-qjfh-g54f).
@@ -178,6 +224,13 @@ def is_safe_valid_url(test_url):
logger.warning(f'URL "{test_url}" contains suspicious characters')
return False
# Reject backslashes — urllib's urlparse and urllib3's parse_url disagree on URLs containing
# a backslash (e.g. http://INTERNAL:8888\@PUBLIC/), which is the documented SSRF bypass in
# GHSA-rph4-96w6-q594. A backslash has no legitimate use in an HTTP URL, so block at add-time.
if '\\' in test_url:
logger.warning(f'URL "{test_url}" contains a backslash — rejected (parser-differential SSRF vector).')
return False
# Normalize URL encoding - handle both encoded and unencoded query parameters
test_url = normalize_url_encoding(test_url)