Compare commits

...

1 Commits

3 changed files with 61 additions and 9 deletions
+22
View File
@@ -1,12 +1,15 @@
import asyncio
import re
import hashlib
from changedetectionio.browser_steps.browser_steps import browser_steps_get_valid_steps
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_private_hostname
from copy import deepcopy
from abc import abstractmethod
import os
from urllib.parse import urlparse
from loguru import logger
SCREENSHOT_FORMAT_JPEG = 'JPEG'
@@ -95,6 +98,23 @@ class difference_detection_processor():
self.last_raw_content_checksum = None
async def validate_iana_url(self):
"""Pre-flight SSRF check — runs DNS lookup in executor to avoid blocking the event loop.
Covers all fetchers (requests, playwright, puppeteer, plugins) since every fetch goes
through call_browser().
"""
if strtobool(os.getenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')):
return
parsed = urlparse(self.watch.link)
if not parsed.hostname:
return
loop = asyncio.get_running_loop()
if await loop.run_in_executor(None, is_private_hostname, parsed.hostname):
raise Exception(
f"Fetch blocked: '{self.watch.link}' resolves to a private/reserved IP address. "
f"Set ALLOW_IANA_RESTRICTED_ADDRESSES=true to allow."
)
async def call_browser(self, preferred_proxy_id=None):
from requests.structures import CaseInsensitiveDict
@@ -108,6 +128,8 @@ class difference_detection_processor():
"file:// type access is denied for security reasons."
)
await self.validate_iana_url()
# Requests, playwright, other browser via wss:// etc, fetch_extra_something
prefer_fetch_backend = self.watch.get('fetch_backend', 'system')
+33 -4
View File
@@ -34,6 +34,7 @@ def test_favicon(client, live_server, measure_memory_usage, datastore_path):
favicon_base_64=SVG_BASE64
)
res = client.get(url_for('static_content', group='favicon', filename=uuid))
assert res.status_code == 200
assert len(res.data) > 10
@@ -601,8 +602,6 @@ def test_ssrf_private_ip_blocked(client, live_server, monkeypatch, measure_memor
from changedetectionio.validate_url import is_safe_valid_url, is_private_hostname
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')
# Clear any URL results cached while the env var was 'true'
is_safe_valid_url.cache_clear()
# ------------------------------------------------------------------
# 1. is_private_hostname() — unit tests across all reserved ranges
@@ -644,13 +643,11 @@ def test_ssrf_private_ip_blocked(client, live_server, monkeypatch, measure_memor
# 3. ALLOW_IANA_RESTRICTED_ADDRESSES=true bypasses the block
# ------------------------------------------------------------------
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'true')
is_safe_valid_url.cache_clear()
assert is_safe_valid_url('http://127.0.0.1/'), \
"Private IP should be allowed when ALLOW_IANA_RESTRICTED_ADDRESSES=true"
# Restore the block for the remaining assertions
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')
is_safe_valid_url.cache_clear()
# ------------------------------------------------------------------
# 4. UI form rejects private-IP URLs
@@ -708,3 +705,35 @@ def test_ssrf_private_ip_blocked(client, live_server, monkeypatch, measure_memor
request_body=None,
request_method='GET',
)
def test_unresolvable_hostname_is_allowed(client, live_server, monkeypatch):
"""
Unresolvable hostnames must NOT be blocked at add-time when ALLOW_IANA_RESTRICTED_ADDRESSES=false.
DNS failure (gaierror) at add-time does not mean the URL resolves to a private IP
the domain may simply be offline or not yet live. Blocking it would be a false positive.
The real DNS-rebinding protection happens at fetch-time in call_browser().
"""
from changedetectionio.validate_url import is_safe_valid_url
monkeypatch.setenv('ALLOW_IANA_RESTRICTED_ADDRESSES', 'false')
url = 'http://this-host-does-not-exist-xyz987.invalid/some/path'
# Should pass URL validation despite being unresolvable
assert is_safe_valid_url(url), \
"Unresolvable hostname should pass is_safe_valid_url — DNS failure is not a private-IP signal"
# Should be accepted via the UI form and appear in the watch list
res = client.post(
url_for('ui.ui_views.form_quick_watch_add'),
data={'url': url, 'tags': ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' not in res.data, \
"UI should not reject a URL just because its hostname is unresolvable"
res = client.get(url_for('watchlist.index'))
assert b'this-host-does-not-exist-xyz987.invalid' in res.data, \
"Unresolvable hostname watch should appear in the watch overview list"
+6 -5
View File
@@ -61,7 +61,9 @@ def normalize_url_encoding(url):
def is_private_hostname(hostname):
"""Return True if hostname resolves to an IANA-restricted (private/reserved) IP address.
Fails closed: unresolvable hostnames return True (block them).
Unresolvable hostnames return False (allow them) DNS may be temporarily unavailable
or the domain not yet live. The actual DNS rebinding attack is mitigated by fetch-time
re-validation in requests.py, not by blocking unresolvable domains at add-time.
Never cached callers that need fresh DNS resolution (e.g. at fetch time) can call
this directly without going through the lru_cached is_safe_valid_url().
"""
@@ -70,12 +72,11 @@ def is_private_hostname(hostname):
ip = ipaddress.ip_address(info[4][0])
if ip.is_private or ip.is_loopback or ip.is_link_local or ip.is_reserved:
return True
except socket.gaierror:
return True
except socket.gaierror as e:
logger.warning(f"{hostname} error checking {str(e)}")
return False
return False
@lru_cache(maxsize=10000)
def is_safe_valid_url(test_url):
from changedetectionio import strtobool
from changedetectionio.jinja2_custom import render as jinja_render