Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
d07859f694 Restock - No need to extract the text because it's not used anyway 2026-03-04 12:12:11 +01:00
15 changed files with 85 additions and 259 deletions

View File

@@ -66,27 +66,27 @@ jobs:
echo ${{ github.ref }} > changedetectionio/tag.txt
- name: Set up QEMU
uses: docker/setup-qemu-action@v4
uses: docker/setup-qemu-action@v3
with:
image: tonistiigi/binfmt:latest
platforms: all
- name: Login to GitHub Container Registry
uses: docker/login-action@v4
uses: docker/login-action@v3
with:
registry: ghcr.io
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Login to Docker Hub Container Registry
uses: docker/login-action@v4
uses: docker/login-action@v3
with:
username: ${{ secrets.DOCKER_HUB_USERNAME }}
password: ${{ secrets.DOCKER_HUB_ACCESS_TOKEN }}
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v4
uses: docker/setup-buildx-action@v3
with:
install: true
version: latest
@@ -95,7 +95,7 @@ jobs:
# master branch -> :dev container tag
- name: Docker meta :dev
if: ${{ github.ref == 'refs/heads/master' && github.event_name != 'release' }}
uses: docker/metadata-action@v6
uses: docker/metadata-action@v5
id: meta_dev
with:
images: |
@@ -107,7 +107,7 @@ jobs:
- name: Build and push :dev
id: docker_build
if: ${{ github.ref == 'refs/heads/master' && github.event_name != 'release' }}
uses: docker/build-push-action@v7
uses: docker/build-push-action@v6
with:
context: ./
file: ./Dockerfile
@@ -131,7 +131,7 @@ jobs:
- name: Docker meta :tag
if: github.event_name == 'release' && startsWith(github.event.release.tag_name, '0.')
uses: docker/metadata-action@v6
uses: docker/metadata-action@v5
id: meta
with:
images: |
@@ -146,7 +146,7 @@ jobs:
- name: Build and push :tag
id: docker_build_tag_release
if: github.event_name == 'release' && startsWith(github.event.release.tag_name, '0.')
uses: docker/build-push-action@v7
uses: docker/build-push-action@v6
with:
context: ./
file: ./Dockerfile

View File

@@ -60,14 +60,14 @@ jobs:
# Just test that the build works, some libraries won't compile on ARM/rPi etc
- name: Set up QEMU
uses: docker/setup-qemu-action@v4
uses: docker/setup-qemu-action@v3
with:
image: tonistiigi/binfmt:latest
platforms: all
- name: Set up Docker Buildx
id: buildx
uses: docker/setup-buildx-action@v4
uses: docker/setup-buildx-action@v3
with:
install: true
version: latest
@@ -75,7 +75,7 @@ jobs:
- name: Test that the docker containers can build (${{ matrix.platform }} - ${{ matrix.dockerfile }})
id: docker_build
uses: docker/build-push-action@v7
uses: docker/build-push-action@v6
# https://github.com/docker/build-push-action#customizing
with:
context: ./

View File

@@ -42,10 +42,10 @@ jobs:
run: echo "date=$(date +'%Y-%m-%d')" >> $GITHUB_OUTPUT
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v4
uses: docker/setup-buildx-action@v3
- name: Build changedetection.io container for testing under Python ${{ env.PYTHON_VERSION }}
uses: docker/build-push-action@v7
uses: docker/build-push-action@v6
with:
context: ./
file: ./Dockerfile

View File

@@ -61,22 +61,8 @@ import time
# ==============================================================================
import multiprocessing
import os
import sys
# Limit glibc malloc arena count to prevent RSS growth from concurrent requests.
# Default: glibc creates up to 8×CPU_cores arenas. Each concurrent thread/connection
# can trigger a new arena, and freed memory stays mapped in those arenas as RSS forever.
# With MALLOC_ARENA_MAX=2, at most 2 arenas are used; freed pages return to the OS faster.
# Must be set before worker threads start; env var is read lazily by glibc on first arena creation.
if 'MALLOC_ARENA_MAX' not in os.environ:
os.environ['MALLOC_ARENA_MAX'] = '2'
try:
import ctypes as _ctypes
_ctypes.CDLL('libc.so.6').mallopt(-8, 2) # M_ARENA_MAX = -8
except Exception:
pass
# Set spawn as global default (safety net - all our code uses explicit contexts anyway)
# Skip in tests to avoid breaking pytest-flask's LiveServer fixture (uses unpicklable local functions)
if 'pytest' not in sys.modules:

View File

@@ -81,7 +81,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
sorted_tags = sorted(datastore.data['settings']['application'].get('tags').items(), key=lambda x: x[1]['title'])
proxy_list = datastore.proxy_list
output = render_template(
"watch-overview.html",
active_tag=active_tag,
@@ -93,7 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
form=form,
generate_tag_colors=processors.generate_processor_badge_colors,
guid=datastore.data['app_guid'],
has_proxies=proxy_list,
has_proxies=datastore.proxy_list,
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
now_time_server=round(time.time()),
pagination=pagination,
@@ -111,16 +110,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
watches=sorted_watches
)
# Return freed template-building memory to the OS immediately.
# render_template allocates ~20MB of intermediate strings that are freed on return,
# but glibc keeps those pages mapped in its arenas as RSS. malloc_trim() forces
# glibc to release them, preventing RSS growth from concurrent Chrome connections.
try:
import ctypes
ctypes.CDLL('libc.so.6').malloc_trim(0)
except Exception:
pass
if session.get('share-link'):
del (session['share-link'])

View File

@@ -213,13 +213,12 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
{%- set checking_now = is_checking_now(watch) -%}
{%- set history_n = watch.history_n -%}
{%- set favicon = watch.get_favicon_filename() -%}
{%- set error_texts = watch.compile_error_texts(has_proxies=has_proxies) -%}
{%- set system_use_url_watchlist = datastore.data['settings']['application']['ui'].get('use_page_title_in_list') -%}
{# Class settings mirrored in changedetectionio/static/js/realtime.js for the frontend #}
{%- set row_classes = [
loop.cycle('pure-table-odd', 'pure-table-even'),
'processor-' ~ watch['processor'],
'has-error' if error_texts|length > 2 else '',
'has-error' if watch.compile_error_texts()|length > 2 else '',
'paused' if watch.paused is defined and watch.paused != False else '',
'unviewed' if watch.has_unviewed else '',
'has-restock-info' if watch.has_restock_info else 'no-restock-info',
@@ -272,7 +271,7 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
{% endif %}
<a class="external" target="_blank" rel="noopener" href="{{ watch.link.replace('source:','') }}">&nbsp;</a>
</span>
<div class="error-text" style="display:none;">{{ error_texts|safe }}</div>
<div class="error-text" style="display:none;">{{ watch.compile_error_texts(has_proxies=datastore.proxy_list)|safe }}</div>
{%- if watch['processor'] == 'text_json_diff' -%}
{%- if watch['has_ldjson_price_data'] and not watch['track_ldjson_price_data'] -%}
<div class="ldjson-price-track-offer">Switch to Restock & Price watch mode? <a href="{{url_for('price_data_follower.accept', uuid=watch.uuid)}}" class="pure-button button-xsmall">Yes</a> <a href="{{url_for('price_data_follower.reject', uuid=watch.uuid)}}" class="">No</a></div>
@@ -306,20 +305,12 @@ html[data-darkmode="true"] .watch-tag-list.tag-{{ class_name }} {
{%- endif -%}
{%- if watch.get('restock') and watch['restock'].get('price') -%}
{%- set restock = watch['restock'] -%}
{%- set price = restock.get('price') -%}
{%- set cur = restock.get('currency','') -%}
{%- if price is not none and (price|string)|regex_search('\d') -%}
<span class="restock-label price" title="{{ _('Price') }}">
{# @todo: make parse_currency/parse_decimal aware of the locale of the actual web page and use that instead changedetectionio/processors/restock_diff/__init__.py #}
{%- if price is number -%}{# It's a number so we can convert it to their locale' #}
{{ price|format_number_locale }} {{ cur }}<!-- as number -->
{%- else -%}{# It's totally fine if it arrives as something else, the website might be something weird in this field #}
{{ price }} {{ cur }}<!-- as string -->
{%- if watch['restock']['price'] is number -%}
<span class="restock-label price" title="{{ _('Price') }}">
{{ watch['restock']['price']|format_number_locale if watch['restock'].get('price') else '' }} {{ watch['restock'].get('currency','') }}
</span>
{%- else -%} <!-- watch['restock']['price']' is not a number, cant output it -->
{%- endif -%}
</span>
{%- endif -%}
{%- elif not watch.has_restock_info -%}
<span class="restock-label error">{{ _('No information') }}</span>
{%- endif -%}

View File

@@ -148,32 +148,10 @@ class fetcher(Fetcher):
# Default to UTF-8 for XML if no encoding found
r.encoding = 'utf-8'
else:
# No charset in HTTP header - sniff encoding in priority order matching browsers
# (WHATWG encoding sniffing algorithm):
# 1. BOM - highest confidence, check before anything else
# 2. <meta charset> in first 2kb
# 3. chardet statistical detection - last resort
# See: https://github.com/dgtlmoon/changedetection.io/issues/3952
boms = [
(b'\xef\xbb\xbf', 'utf-8-sig'),
(b'\xff\xfe', 'utf-16-le'),
(b'\xfe\xff', 'utf-16-be'),
]
bom_encoding = next((enc for bom, enc in boms if r.content.startswith(bom)), None)
if bom_encoding:
logger.info(f"URL: {url} Using encoding '{bom_encoding}' detected from BOM")
r.encoding = bom_encoding
else:
meta_charset_match = re.search(rb'<meta[^>]+charset\s*=\s*["\']?\s*([^"\'\s;>]+)', r.content[:2000], re.IGNORECASE)
if meta_charset_match:
encoding = meta_charset_match.group(1).decode('ascii', errors='ignore')
logger.info(f"URL: {url} No content-type encoding in HTTP headers - Using encoding '{encoding}' from HTML meta charset tag")
r.encoding = encoding
else:
encoding = chardet.detect(r.content)['encoding']
logger.warning(f"URL: {url} No charset in headers or meta tag, guessed encoding as '{encoding}' via chardet")
if encoding:
r.encoding = encoding
# For other content types, use chardet
encoding = chardet.detect(r.content)['encoding']
if encoding:
r.encoding = encoding
self.headers = r.headers

View File

@@ -4,7 +4,6 @@ import flask_login
import locale
import os
import queue
import re
import sys
import threading
import time
@@ -218,12 +217,8 @@ def _jinja2_filter_format_number_locale(value: float) -> str:
"Formats for example 4000.10 to the local locale default of 4,000.10"
# Format the number with two decimal places (locale format string will return 6 decimal)
formatted_value = locale.format_string("%.2f", value, grouping=True)
return formatted_value
@app.template_filter('regex_search')
def _jinja2_filter_regex_search(value, pattern):
import re
return re.search(pattern, str(value)) is not None
return formatted_value
@app.template_global('is_checking_now')
def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"):
@@ -388,8 +383,6 @@ def _jinja2_filter_fetcher_status_icons(fetcher_name):
return ''
_RE_SANITIZE_TAG = re.compile(r'[^a-zA-Z0-9]')
@app.template_filter('sanitize_tag_class')
def _jinja2_filter_sanitize_tag_class(tag_title):
"""Sanitize a tag title to create a valid CSS class name.
@@ -401,8 +394,9 @@ def _jinja2_filter_sanitize_tag_class(tag_title):
Returns:
str: A sanitized string suitable for use as a CSS class name
"""
import re
# Remove all non-alphanumeric characters and convert to lowercase
sanitized = _RE_SANITIZE_TAG.sub('', tag_title).lower()
sanitized = re.sub(r'[^a-zA-Z0-9]', '', tag_title).lower()
# Ensure it starts with a letter (CSS requirement)
if sanitized and not sanitized[0].isalpha():
sanitized = 'tag' + sanitized
@@ -490,21 +484,28 @@ def changedetection_app(config=None, datastore_o=None):
available_languages = get_available_languages()
language_codes = get_language_codes()
_locale_aliases = {
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
}
_locale_match_list = language_codes + list(_locale_aliases.keys())
def get_locale():
# Locale aliases: map browser language codes to translation directory names
# This handles cases where browsers send standard codes (e.g., zh-TW)
# but our translations use more specific codes (e.g., zh_Hant_TW)
locale_aliases = {
'zh-TW': 'zh_Hant_TW', # Traditional Chinese: browser sends zh-TW, we use zh_Hant_TW
'zh_TW': 'zh_Hant_TW', # Also handle underscore variant
}
# 1. Try to get locale from session (user explicitly selected)
if 'locale' in session:
return session['locale']
# 2. Fall back to Accept-Language header
browser_locale = request.accept_languages.best_match(_locale_match_list)
# 3. Map browser locale to our internal locale if needed
return _locale_aliases.get(browser_locale, browser_locale)
# Get the best match from browser's Accept-Language header
browser_locale = request.accept_languages.best_match(language_codes + list(locale_aliases.keys()))
# 3. Check if we need to map the browser locale to our internal locale
if browser_locale in locale_aliases:
return locale_aliases[browser_locale]
return browser_locale
# Initialize Babel with locale selector
babel = Babel(app, locale_selector=get_locale)
@@ -1017,16 +1018,15 @@ def check_for_new_version():
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
session = requests.Session()
session.verify = False
while not app.config.exit.is_set():
try:
r = session.post("https://changedetection.io/check-ver.php",
r = requests.post("https://changedetection.io/check-ver.php",
data={'version': __version__,
'app_guid': datastore.data['app_guid'],
'watch_count': len(datastore.data['watching'])
})
},
verify=False)
except:
pass

View File

@@ -43,11 +43,6 @@ from ..html_tools import TRANSLATE_WHITESPACE_TABLE
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024*20))
# Module-level favicon filename cache: data_dir → basename (or None)
# Keyed by data_dir so it survives Watch object recreation, deepcopy, and concurrent requests.
# Invalidated explicitly in bump_favicon() when a new favicon is saved.
_FAVICON_FILENAME_CACHE: dict = {}
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
@@ -811,8 +806,9 @@ class model(EntityPersistenceMixin, watch_base):
with open(fname, 'wb') as f:
f.write(decoded)
# Invalidate module-level favicon filename cache for this watch
_FAVICON_FILENAME_CACHE.pop(self.data_dir, None)
# Invalidate favicon filename cache
if hasattr(self, '_favicon_filename_cache'):
delattr(self, '_favicon_filename_cache')
# A signal that could trigger the socket server to update the browser also
watch_check_update = signal('watch_favicon_bump')
@@ -827,23 +823,35 @@ class model(EntityPersistenceMixin, watch_base):
def get_favicon_filename(self) -> str | None:
"""
Find any favicon.* file in the watch data directory.
Find any favicon.* file in the current working directory
and return the contents of the newest one.
Uses a module-level cache keyed by data_dir to survive Watch object recreation,
deepcopy (which drops instance attrs), and concurrent request races.
Invalidated by bump_favicon() when a new favicon is saved.
MEMORY LEAK FIX: Cache the result to avoid repeated glob.glob() operations.
glob.glob() causes millions of fnmatch allocations when called for every watch on page load.
Returns:
str: Basename of the favicon file, or None if not found.
str: Basename of the newest favicon file, or None if not found.
"""
if self.data_dir in _FAVICON_FILENAME_CACHE:
return _FAVICON_FILENAME_CACHE[self.data_dir]
# Check cache first (prevents 26M+ allocations from repeated glob operations)
cache_key = '_favicon_filename_cache'
if hasattr(self, cache_key):
return getattr(self, cache_key)
import glob
# Search for all favicon.* files
files = glob.glob(os.path.join(self.data_dir, "favicon.*"))
fname = os.path.basename(files[0]) if files else None
_FAVICON_FILENAME_CACHE[self.data_dir] = fname
return fname
if not files:
result = None
else:
# Find the newest by modification time
newest_file = max(files, key=os.path.getmtime)
result = os.path.basename(newest_file)
# Cache the result
setattr(self, cache_key, result)
return result
def get_screenshot_as_thumbnail(self, max_age=3200):
"""Return path to a square thumbnail of the most recent screenshot.
@@ -1174,13 +1182,18 @@ class model(EntityPersistenceMixin, watch_base):
def compile_error_texts(self, has_proxies=None):
"""Compile error texts for this watch.
Accepts has_proxies parameter to ensure it works even outside app context"""
from flask import url_for, has_request_context
from flask import url_for
from markupsafe import Markup
output = [] # Initialize as list since we're using append
last_error = self.get('last_error','')
has_app_context = has_request_context()
try:
url_for('settings.settings_page')
except Exception as e:
has_app_context = False
else:
has_app_context = True
# has app+request context, we can use url_for()
if has_app_context:

View File

@@ -260,16 +260,6 @@ class difference_detection_processor():
# @todo .quit here could go on close object, so we can run JS if change-detected
await self.fetcher.quit(watch=self.watch)
# Sanitize lone surrogates - these can appear when servers return malformed/mixed-encoding
# content that gets decoded into surrogate characters (e.g. \udcad). Without this,
# encode('utf-8') raises UnicodeEncodeError downstream in checksums, diffs, file writes, etc.
# Covers all fetchers (requests, playwright, puppeteer, selenium) in one place.
# Also note: By this point we SHOULD know the original encoding so it can safely convert to utf-8 for the rest of the app.
# See: https://github.com/dgtlmoon/changedetection.io/issues/3952
if self.fetcher.content and isinstance(self.fetcher.content, str):
self.fetcher.content = self.fetcher.content.encode('utf-8', errors='replace').decode('utf-8')
# After init, call run_changedetection() which will do the actual change-detection
def get_extra_watch_config(self, filename):

View File

@@ -31,7 +31,6 @@ class Restock(dict):
if standardized_value:
# Convert to float
# @todo locale needs to be the locale of the webpage
return float(parse_decimal(standardized_value, locale='en'))
return None

View File

@@ -283,7 +283,4 @@ def query_price_availability(extracted_data):
if not result.get('availability') and 'availability' in microdata:
result['availability'] = microdata['availability']
# result['price'] could be float or str here, depending on the website, for example it might contain "1,00" commas, etc.
# using something like babel you need to know the locale of the website and even then it can be problematic
# we dont really do anything with the price data so far.. so just accept it the way it comes.
return result

View File

@@ -1,7 +1,6 @@
#!/usr/bin/env python3
# coding=utf-8
import hashlib
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
@@ -12,69 +11,6 @@ import os
def test_surrogate_characters_in_content_are_sanitized():
"""Lone surrogates can appear in requests' r.text when a server returns malformed/mixed-encoding
content. Without sanitization, encoding to UTF-8 raises UnicodeEncodeError.
See: https://github.com/dgtlmoon/changedetection.io/issues/3952
"""
content_with_surrogate = '<html><body>Hello \udcad World</body></html>'
# Confirm the raw problem exists
with pytest.raises(UnicodeEncodeError):
content_with_surrogate.encode('utf-8')
# Our fix: sanitize after fetcher.run() in processors/base.py call_browser()
sanitized = content_with_surrogate.encode('utf-8', errors='replace').decode('utf-8')
assert 'Hello' in sanitized
assert 'World' in sanitized
assert '\udcad' not in sanitized
# Checksum computation (processors/base.py get_raw_document_checksum) must not crash
hashlib.md5(sanitized.encode('utf-8')).hexdigest()
def test_utf8_content_without_charset_header(client, live_server, datastore_path):
"""Server returns UTF-8 content but no charset in Content-Type header.
chardet can misdetect such pages as UTF-7 (Python 3.14 then produces surrogates).
Our fix tries UTF-8 first before falling back to chardet.
See: https://github.com/dgtlmoon/changedetection.io/issues/3952
"""
from .util import write_test_file_and_sync
# UTF-8 encoded content with non-ASCII chars - no charset will be in the header
html = '<html><body><p>Español</p><p>Français</p><p>日本語</p></body></html>'
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), html.encode('utf-8'), mode='wb')
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("ui.ui_preview.preview_page", uuid="first"), follow_redirects=True)
# Should decode correctly as UTF-8, not produce mojibake (Español) or replacement chars
assert 'Español'.encode('utf-8') in res.data
assert 'Français'.encode('utf-8') in res.data
assert '日本語'.encode('utf-8') in res.data
def test_shiftjis_with_meta_charset(client, live_server, datastore_path):
"""Server returns Shift-JIS content with no charset in HTTP header, but the HTML
declares <meta charset="Shift-JIS">. We should use the meta tag, not chardet.
Real-world case: https://github.com/dgtlmoon/changedetection.io/issues/3952
"""
from .util import write_test_file_and_sync
japanese_text = '日本語のページ'
html = f'<html><head><meta http-equiv="Content-Type" content="text/html;charset=Shift-JIS"></head><body><p>{japanese_text}</p></body></html>'
write_test_file_and_sync(os.path.join(datastore_path, "endpoint-content.txt"), html.encode('shift_jis'), mode='wb')
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("ui.ui_preview.preview_page", uuid="first"), follow_redirects=True)
assert japanese_text.encode('utf-8') in res.data
def set_html_response(datastore_path):
test_return_data = """
<html><body><span class="nav_second_img_text">

View File

@@ -467,38 +467,3 @@ def test_special_prop_examples(client, live_server, measure_memory_usage, datast
assert b'155.55' in res.data
delete_all_watches(client)
def test_itemprop_as_str(client, live_server, measure_memory_usage, datastore_path):
test_return_data = f"""<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<span itemprop="offers" itemscope itemtype="http://schema.org/Offer">
<meta content="767.55" itemprop="price"/>
<meta content="EUR" itemprop="priceCurrency"/>
<meta content="InStock" itemprop="availability"/>
<meta content="https://www.123-test.dk" itemprop="url"/>
</span>
</body>
</html>
"""
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
f.write(test_return_data)
test_url = url_for('test_endpoint', _external=True)
client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'restock tests', 'processor': 'restock_diff'},
follow_redirects=True
)
client.get(url_for("ui.form_watch_checknow"))
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'767.55' in res.data

View File

@@ -100,19 +100,6 @@ def is_safe_valid_url(test_url):
logger.warning('URL validation failed: URL is empty or whitespace only')
return False
# Per-request cache: same URL is often validated 2-3x per watchlist render (sort + display).
# Flask's g is scoped to one request and auto-cleared on teardown, so dynamic Jinja2 URLs
# like {{microtime()}} are always re-evaluated on the next request.
# Falls back gracefully when called outside a request context (e.g. background workers).
_cache_key = test_url
try:
from flask import g
_cache = g.setdefault('_url_validation_cache', {})
if _cache_key in _cache:
return _cache[_cache_key]
except RuntimeError:
_cache = None # No app context
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
@@ -125,14 +112,11 @@ def is_safe_valid_url(test_url):
test_url = r.sub('', test_url)
# Check the actual rendered URL in case of any Jinja markup
# Only run jinja_render when the URL actually contains Jinja2 syntax - creating a new
# ImmutableSandboxedEnvironment is expensive and is called once per watch per page load
if '{%' in test_url or '{{' in test_url:
try:
test_url = jinja_render(test_url)
except Exception as e:
logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
return False
try:
test_url = jinja_render(test_url)
except Exception as e:
logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
return False
# Check query parameters and fragment
if re.search(r'[<>]', test_url):
@@ -158,6 +142,4 @@ def is_safe_valid_url(test_url):
logger.warning(f'URL f"{test_url}" failed validation, aborting.')
return False
if _cache is not None:
_cache[_cache_key] = True
return True