Compare commits

...

14 Commits

Author SHA1 Message Date
dgtlmoon
d450262d5b Small fix for notification format handling, enabling HTML Color for {{diff_removed}} and {{diff_added}} 2025-10-16 12:12:48 +02:00
dgtlmoon
6f926ed595 0.50.24
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-10-14 11:03:17 +02:00
dgtlmoon
249dc55212 Notification - Make sure all notification tokens have something set even for form validation, fixes hassio:// with {{ watch_uuid }} in notification URL form (#3504) 2025-10-14 10:58:53 +02:00
dgtlmoon
46252bc6f3 0.50.23
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
2025-10-13 11:54:01 +02:00
dgtlmoon
64350a2e78 Replace jinja2-time with arrow and improve timedate timezone integration, fixes timezones in templates such as {% now 'Europe/London', '%Y-%m-%d' %} etc (#3496) 2025-10-13 11:52:02 +02:00
dgtlmoon
2902c63a3b 0.50.22
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-12 22:48:14 +02:00
dgtlmoon
55b8588f1f Testing - Adding test for requests timeout setting #975 2025-10-12 21:54:25 +02:00
dgtlmoon
02ecc4ae9a UI - Add missing 'requests timeout in seconds' field to main settings, Re #975 2025-10-12 21:42:07 +02:00
dgtlmoon
3ee50b7832 UI - Proxy and external browser settings URL validation (#3494) 2025-10-12 21:24:59 +02:00
dgtlmoon
66ddd87ee4 Move proxy default selection to proxy tab 2025-10-12 19:26:04 +02:00
dgtlmoon
233189e4f7 Build - Splitting memory report (#3493) 2025-10-12 19:20:28 +02:00
dgtlmoon
b237fd7201 Replace stream/filetype detection library with puremagic, 20Mb less RAM usage (#3491) 2025-10-12 18:40:37 +02:00
dgtlmoon
3c81efe2f4 0.50.21
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 18:17:56 +02:00
dgtlmoon
0fcfb94690 Adding 'RSS reader mode' (see main Settings) (#3488) 2025-10-10 18:17:30 +02:00
50 changed files with 1847 additions and 198 deletions

View File

@@ -253,6 +253,30 @@ jobs:
docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt
docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt
- name: Extract and display memory test report
if: always()
run: |
# Extract test-memory.log from the container
echo "Extracting test-memory.log from container..."
docker cp test-cdio-basic-tests:/app/changedetectionio/test-memory.log output-logs/test-memory-${{ env.PYTHON_VERSION }}.log || echo "test-memory.log not found in container"
# Display the memory log contents for immediate visibility in workflow output
echo "=== Top 10 Highest Peak Memory Tests ==="
if [ -f output-logs/test-memory-${{ env.PYTHON_VERSION }}.log ]; then
# Sort by peak memory value (extract number before MB and sort numerically, reverse order)
grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | \
sed 's/.*Peak memory: //' | \
paste -d'|' - <(grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log) | \
sort -t'|' -k1 -nr | \
cut -d'|' -f2 | \
head -10
echo ""
echo "=== Full Memory Test Report ==="
cat output-logs/test-memory-${{ env.PYTHON_VERSION }}.log
else
echo "No memory log available"
fi
- name: Store everything including test-datastore
if: always()
uses: actions/upload-artifact@v4

View File

@@ -2,6 +2,7 @@ recursive-include changedetectionio/api *
recursive-include changedetectionio/blueprint *
recursive-include changedetectionio/conditions *
recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/jinja2_custom *
recursive-include changedetectionio/model *
recursive-include changedetectionio/notification *
recursive-include changedetectionio/processors *

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.20'
__version__ = '0.50.24'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -334,6 +334,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
# Explicitly delete large content variables to free memory IMMEDIATELY after saving
# These are no longer needed after being saved to history
del contents
# Send notifications on second+ check
if watch.history_n >= 2:
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
@@ -372,6 +376,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
'check_count': count})
# NOW clear fetcher content - after all processing is complete
# This is the last point where we need the fetcher data
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
update_handler.fetcher.clear_content()
logger.debug(f"Cleared fetcher content for UUID {uuid}")
except Exception as e:
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
@@ -392,7 +402,28 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
#logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}")
watch_check_update.send(watch_uuid=watch['uuid'])
update_handler = None
# Explicitly clean up update_handler and all its references
if update_handler:
# Clear fetcher content using the proper method
if hasattr(update_handler, 'fetcher') and update_handler.fetcher:
update_handler.fetcher.clear_content()
# Clear processor references
if hasattr(update_handler, 'content_processor'):
update_handler.content_processor = None
update_handler = None
# Clear local contents variable if it still exists
if 'contents' in locals():
del contents
# Note: We don't set watch = None here because:
# 1. watch is just a local reference to datastore.data['watching'][uuid]
# 2. Setting it to None doesn't affect the datastore
# 3. GC can't collect the object anyway (still referenced by datastore)
# 4. It would just cause confusion
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
except Exception as cleanup_error:
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")

View File

@@ -6,7 +6,7 @@ from loguru import logger
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT
from changedetectionio.content_fetchers.base import manage_user_agent
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render

View File

@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
def long_task(uuid, preferred_proxy):
import time
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
status = {'status': '', 'length': 0, 'text': ''}

View File

@@ -1,5 +1,5 @@
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.store import ChangeDetectionStore
from feedgen.feed import FeedGenerator
from flask import Blueprint, make_response, request, url_for, redirect

View File

@@ -119,7 +119,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
hide_remove_pass=os.getenv("SALTED_PASS", False),
min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)),
settings_application=datastore.data['settings']['application'],
timezone_default_config=datastore.data['settings']['application'].get('timezone'),
timezone_default_config=datastore.data['settings']['application'].get('scheduler_timezone_default'),
utc_time=utc_time,
)

View File

@@ -1,7 +1,7 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %}
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
@@ -72,25 +72,23 @@
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
{% if form.requests.proxy %}
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">
Choose a default proxy for all watches
</span>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
</div>
</div>
{% endif %}
</fieldset>
</div>
@@ -133,6 +131,10 @@
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.timeout) }}
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
</div>
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.default_ua) }}
<span class="pure-form-message-inline">
@@ -236,7 +238,7 @@ nav
<p><strong>UTC Time &amp Date from Server:</strong> <span id="utc-time" >{{ utc_time }}</span></p>
<p><strong>Local Time &amp Date in Browser:</strong> <span class="local-time" data-utc="{{ utc_time }}"></span></p>
<p>
{{ render_field(form.application.form.timezone) }}
{{ render_field(form.application.form.scheduler_timezone_default) }}
<datalist id="timezones" style="display: none;">
{% for tz_name in available_timezones %}
<option value="{{ tz_name }}">{{ tz_name }}</option>
@@ -314,17 +316,27 @@ nav
<p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites.
<div class="pure-control-group" id="extra-proxies-setting">
{{ render_field(form.requests.form.extra_proxies) }}
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }}
<span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br>
<span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span>
{% if form.requests.proxy %}
<div>
<br>
<div class="inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">Choose a default proxy for all watches</span>
</div>
</div>
{% endif %}
</div>
<div class="pure-control-group" id="extra-browsers-setting">
<p>
<span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br>
<span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span>
</p>
{{ render_field(form.requests.form.extra_browsers) }}
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }}
</div>
</div>
<div id="actions">
<div class="pure-control-group">

View File

@@ -187,7 +187,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
tz_name = time_schedule_limit.get('timezone')
if not tz_name:
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
if time_schedule_limit and time_schedule_limit.get('enabled'):
try:
@@ -257,7 +257,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
'using_global_webdriver_wait': not default['webdriver_delay'],
'uuid': uuid,
'watch': watch,

View File

@@ -2,6 +2,7 @@ from flask import Blueprint, request, make_response
import random
from loguru import logger
from changedetectionio.notification_service import NotificationContextData
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
@@ -19,6 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
import apprise
from changedetectionio.notification.handler import process_notification
from changedetectionio.notification.apprise_plugin.assets import apprise_asset
from changedetectionio.jinja2_custom import render as jinja_render
from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler
@@ -61,16 +63,20 @@ def construct_blueprint(datastore: ChangeDetectionStore):
return 'Error: No Notification URLs set/found'
for n_url in notification_urls:
# We are ONLY validating the apprise:// part here, convert all tags to something so as not to break apprise URLs
generic_notification_context_data = NotificationContextData()
generic_notification_context_data.set_random_for_validation()
n_url = jinja_render(template_str=n_url, **generic_notification_context_data).strip()
if len(n_url.strip()):
if not apobj.add(n_url):
return f'Error: {n_url} is not a valid AppRise URL.'
try:
# use the same as when it is triggered, but then override it with the form test values
n_object = {
n_object = NotificationContextData({
'watch_url': request.form.get('window_url', "https://changedetection.io"),
'notification_urls': notification_urls
}
})
# Only use if present, if not set in n_object it should use the default system value
if 'notification_format' in request.form and request.form['notification_format'].strip():

View File

@@ -64,6 +64,19 @@ class Fetcher():
# Time ONTOP of the system defined env minimum time
render_extract_delay = 0
def clear_content(self):
"""
Explicitly clear all content from memory to free up heap space.
Call this after content has been saved to disk.
"""
self.content = None
if hasattr(self, 'raw_content'):
self.raw_content = None
self.screenshot = None
self.xpath_data = None
# Keep headers and status_code as they're small
logger.trace("Fetcher content cleared from memory")
@abstractmethod
def get_error(self):
return self.error
@@ -128,7 +141,7 @@ class Fetcher():
async def iterate_browser_steps(self, start_url=None):
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
from playwright._impl._errors import TimeoutError, Error
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
step_n = 0
if self.browser_steps is not None and len(self.browser_steps):

View File

@@ -51,6 +51,7 @@ class fetcher(Fetcher):
session = requests.Session()
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
from requests_file import FileAdapter
session.mount('file://', FileAdapter())

View File

@@ -795,7 +795,7 @@ def ticker_thread_check_time_launch_checks():
else:
time_schedule_limit = watch.get('time_schedule_limit')
logger.trace(f"{uuid} Time scheduler - Using watch settings (not global settings)")
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
if time_schedule_limit and time_schedule_limit.get('enabled'):
try:

View File

@@ -5,6 +5,7 @@ from wtforms.widgets.core import TimeInput
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
from changedetectionio.conditions.form import ConditionFormRow
from changedetectionio.notification_service import NotificationContextData
from changedetectionio.strtobool import strtobool
from wtforms import (
@@ -469,11 +470,16 @@ class ValidateAppRiseServers(object):
import apprise
from .notification.apprise_plugin.assets import apprise_asset
from .notification.apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
from changedetectionio.jinja2_custom import render as jinja_render
apobj = apprise.Apprise(asset=apprise_asset)
for server_url in field.data:
url = server_url.strip()
generic_notification_context_data = NotificationContextData()
# Make sure something is atleast in all those regular token fields
generic_notification_context_data.set_random_for_validation()
url = jinja_render(template_str=server_url.strip(), **generic_notification_context_data).strip()
if url.startswith("#"):
continue
@@ -487,9 +493,8 @@ class ValidateJinja2Template(object):
"""
def __call__(self, form, field):
from changedetectionio import notification
from changedetectionio.jinja2_custom import create_jinja_env
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
from jinja2.sandbox import ImmutableSandboxedEnvironment
from jinja2.meta import find_undeclared_variables
import jinja2.exceptions
@@ -497,9 +502,11 @@ class ValidateJinja2Template(object):
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
try:
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader, extensions=['jinja2_time.TimeExtension'])
jinja2_env.globals.update(notification.valid_tokens)
# Extra validation tokens provided on the form_class(... extra_tokens={}) setup
# Use the shared helper to create a properly configured environment
jinja2_env = create_jinja_env(loader=BaseLoader)
# Add notification tokens for validation
jinja2_env.globals.update(NotificationContextData())
if hasattr(field, 'extra_notification_tokens'):
jinja2_env.globals.update(field.extra_notification_tokens)
@@ -511,6 +518,7 @@ class ValidateJinja2Template(object):
except jinja2.exceptions.SecurityError as e:
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
# Check for undeclared variables
ast = jinja2_env.parse(joined_data)
undefined = ", ".join(find_undeclared_variables(ast))
if undefined:
@@ -678,6 +686,51 @@ class ValidateCSSJSONXPATHInput(object):
except:
raise ValidationError("A system-error occurred when validating your jq expression")
class ValidateSimpleURL:
"""Validate that the value can be parsed by urllib.parse.urlparse() and has a scheme/netloc."""
def __init__(self, message=None):
self.message = message or "Invalid URL."
def __call__(self, form, field):
data = (field.data or "").strip()
if not data:
return # empty is OK — pair with validators.Optional()
from urllib.parse import urlparse
parsed = urlparse(data)
if not parsed.scheme or not parsed.netloc:
raise ValidationError(self.message)
class ValidateStartsWithRegex(object):
def __init__(self, regex, *, flags=0, message=None, allow_empty=True, split_lines=True):
# compile with given flags (well pass re.IGNORECASE below)
self.pattern = re.compile(regex, flags) if isinstance(regex, str) else regex
self.message = message
self.allow_empty = allow_empty
self.split_lines = split_lines
def __call__(self, form, field):
data = field.data
if not data:
return
# normalize into list of lines
if isinstance(data, str) and self.split_lines:
lines = data.splitlines()
elif isinstance(data, (list, tuple)):
lines = data
else:
lines = [data]
for line in lines:
stripped = line.strip()
if not stripped:
if self.allow_empty:
continue
raise ValidationError(self.message or "Empty value not allowed.")
if not self.pattern.match(stripped):
raise ValidationError(self.message or "Invalid value.")
class quickWatchForm(Form):
from . import processors
@@ -705,7 +758,7 @@ class commonSettingsForm(Form):
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
timezone = StringField("Timezone for watch schedule", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
scheduler_timezone_default = StringField("Default timezone for watch check scheduler", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
@@ -795,7 +848,7 @@ class processor_text_json_diff_form(commonSettingsForm):
if not super().validate():
return False
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
result = True
# Fail form validation when a body is set for a GET
@@ -858,23 +911,36 @@ class processor_text_json_diff_form(commonSettingsForm):
):
super().__init__(formdata, obj, prefix, data, meta, **kwargs)
if kwargs and kwargs.get('default_system_settings'):
default_tz = kwargs.get('default_system_settings').get('application', {}).get('timezone')
default_tz = kwargs.get('default_system_settings').get('application', {}).get('scheduler_timezone_default')
if default_tz:
self.time_schedule_limit.form.timezone.render_kw['placeholder'] = default_tz
class SingleExtraProxy(Form):
# maybe better to set some <script>var..
proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
# @todo do the validation here instead
proxy_url = StringField('Proxy URL', [
validators.Optional(),
ValidateStartsWithRegex(
regex=r'^(https?|socks5)://', # ✅ main pattern
flags=re.IGNORECASE, # ✅ makes it case-insensitive
message='Proxy URLs must start with http://, https:// or socks5://',
),
ValidateSimpleURL()
], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
class SingleExtraBrowser(Form):
browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
# @todo do the validation here instead
browser_connection_url = StringField('Browser connection URL', [
validators.Optional(),
ValidateStartsWithRegex(
regex=r'^(wss?|ws)://',
flags=re.IGNORECASE,
message='Browser URLs must start with wss:// or ws://'
),
ValidateSimpleURL()
], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
class DefaultUAInputForm(Form):
html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
@@ -885,7 +951,7 @@ class DefaultUAInputForm(Form):
class globalSettingsRequestForm(Form):
time_between_check = RequiredFormField(TimeBetweenCheckForm)
time_schedule_limit = FormField(ScheduleLimitForm)
proxy = RadioField('Proxy')
proxy = RadioField('Default proxy')
jitter_seconds = IntegerField('Random jitter seconds ± check',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
@@ -894,7 +960,12 @@ class globalSettingsRequestForm(Form):
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=1, max=50,
message="Should be between 1 and 50")])
timeout = IntegerField('Requests timeout in seconds',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=1, max=999,
message="Should be between 1 and 999")])
extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)
@@ -940,6 +1011,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
strip_ignored_lines = BooleanField('Strip ignored lines')
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0,

View File

@@ -0,0 +1,20 @@
"""
Jinja2 custom extensions and safe rendering utilities.
"""
from .extensions.TimeExtension import TimeExtension
from .safe_jinja import (
render,
render_fully_escaped,
create_jinja_env,
JINJA2_MAX_RETURN_PAYLOAD_SIZE,
DEFAULT_JINJA2_EXTENSIONS,
)
__all__ = [
'TimeExtension',
'render',
'render_fully_escaped',
'create_jinja_env',
'JINJA2_MAX_RETURN_PAYLOAD_SIZE',
'DEFAULT_JINJA2_EXTENSIONS',
]

View File

@@ -0,0 +1,221 @@
"""
Jinja2 TimeExtension - Custom date/time handling for templates.
This extension provides the {% now %} tag for Jinja2 templates, offering timezone-aware
date/time formatting with support for time offsets.
Why This Extension Exists:
The Arrow library has a now() function (arrow.now()), but Jinja2 templates cannot
directly call Python functions - they need extensions or filters to expose functionality.
This TimeExtension serves as a Jinja2-to-Arrow bridge that:
1. Makes Arrow accessible in templates - Jinja2 requires registering functions/tags
through extensions. You cannot use arrow.now() directly in a template.
2. Provides template-friendly syntax - Instead of complex Python code, you get clean tags:
{% now 'UTC' %}
{% now 'UTC' + 'hours=2' %}
{% now 'Europe/London', '%Y-%m-%d' %}
3. Adds convenience features on top of Arrow:
- Default timezone from environment variable (TZ) or config
- Default datetime format configuration
- Offset syntax parsing: 'hours=2,minutes=30' → shift(hours=2, minutes=30)
- Empty string timezone support to use configured defaults
4. Maintains security - Works within Jinja2's sandboxed environment so users
cannot access arbitrary Python code or objects.
Essentially, this is a Jinja2 wrapper around arrow.now() and arrow.shift() that
provides user-friendly template syntax while maintaining security.
Basic Usage:
{% now 'UTC' %}
# Output: Wed, 09 Dec 2015 23:33:01
Custom Format:
{% now 'UTC', '%Y-%m-%d %H:%M:%S' %}
# Output: 2015-12-09 23:33:01
Timezone Support:
{% now 'America/New_York' %}
{% now 'Europe/London' %}
{% now '' %} # Uses default timezone from environment.default_timezone
Time Offsets (Addition):
{% now 'UTC' + 'hours=2' %}
{% now 'UTC' + 'hours=2,minutes=30' %}
{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10' %}
Time Offsets (Subtraction):
{% now 'UTC' - 'minutes=11' %}
{% now 'UTC' - 'days=2,minutes=33,seconds=1' %}
Time Offsets with Custom Format:
{% now 'UTC' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}
# Output: 2015-12-10 01:33:01
Weekday Support (for finding next/previous weekday):
{% now 'UTC' + 'weekday=0' %} # Next Monday (0=Monday, 6=Sunday)
{% now 'UTC' + 'weekday=4' %} # Next Friday
Configuration:
- Default timezone: Set via TZ environment variable or override environment.default_timezone
- Default format: '%a, %d %b %Y %H:%M:%S' (can be overridden via environment.datetime_format)
Environment Customization:
from changedetectionio.jinja2_custom import create_jinja_env
jinja2_env = create_jinja_env()
jinja2_env.default_timezone = 'America/New_York' # Override default timezone
jinja2_env.datetime_format = '%Y-%m-%d %H:%M' # Override default format
Supported Offset Parameters:
- years, months, weeks, days
- hours, minutes, seconds, microseconds
- weekday (0=Monday through 6=Sunday, must be integer)
Note:
This extension uses the Arrow library for timezone-aware datetime handling.
All timezone names should be valid IANA timezone identifiers (e.g., 'America/New_York').
"""
import arrow
from jinja2 import nodes
from jinja2.ext import Extension
import os
class TimeExtension(Extension):
"""
Jinja2 Extension providing the {% now %} tag for timezone-aware date/time rendering.
This extension adds two attributes to the Jinja2 environment:
- datetime_format: Default strftime format string (default: '%a, %d %b %Y %H:%M:%S')
- default_timezone: Default timezone for rendering (default: TZ env var or 'UTC')
Both can be overridden after environment creation by setting the attributes directly.
"""
tags = {'now'}
def __init__(self, environment):
"""Jinja2 Extension constructor."""
super().__init__(environment)
environment.extend(
datetime_format='%a, %d %b %Y %H:%M:%S',
default_timezone=os.getenv('TZ', 'UTC').strip()
)
def _datetime(self, timezone, operator, offset, datetime_format):
"""
Get current datetime with time offset applied.
Args:
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
operator: '+' for addition or '-' for subtraction
offset: Comma-separated offset parameters (e.g., 'hours=2,minutes=30')
datetime_format: strftime format string or None to use environment default
Returns:
Formatted datetime string with offset applied
Example:
_datetime('UTC', '+', 'hours=2,minutes=30', '%Y-%m-%d %H:%M:%S')
# Returns current time + 2.5 hours
"""
# Use default timezone if none specified
if not timezone or timezone == '':
timezone = self.environment.default_timezone
d = arrow.now(timezone)
# parse shift params from offset and include operator
shift_params = {}
for param in offset.split(','):
interval, value = param.split('=')
shift_params[interval.strip()] = float(operator + value.strip())
# Fix weekday parameter can not be float
if 'weekday' in shift_params:
shift_params['weekday'] = int(shift_params['weekday'])
d = d.shift(**shift_params)
if datetime_format is None:
datetime_format = self.environment.datetime_format
return d.strftime(datetime_format)
def _now(self, timezone, datetime_format):
"""
Get current datetime without any offset.
Args:
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
datetime_format: strftime format string or None to use environment default
Returns:
Formatted datetime string for current time
Example:
_now('America/New_York', '%Y-%m-%d %H:%M:%S')
# Returns current time in New York timezone
"""
# Use default timezone if none specified
if not timezone or timezone == '':
timezone = self.environment.default_timezone
if datetime_format is None:
datetime_format = self.environment.datetime_format
return arrow.now(timezone).strftime(datetime_format)
def parse(self, parser):
"""
Parse the {% now %} tag and generate appropriate AST nodes.
This method is called by Jinja2 when it encounters a {% now %} tag.
It parses the tag syntax and determines whether to call _now() or _datetime()
based on whether offset operations (+ or -) are present.
Supported syntax:
{% now 'timezone' %} -> calls _now()
{% now 'timezone', 'format' %} -> calls _now()
{% now 'timezone' + 'offset' %} -> calls _datetime()
{% now 'timezone' + 'offset', 'format' %} -> calls _datetime()
{% now 'timezone' - 'offset', 'format' %} -> calls _datetime()
Args:
parser: Jinja2 parser instance
Returns:
nodes.Output: AST output node containing the formatted datetime string
"""
lineno = next(parser.stream).lineno
node = parser.parse_expression()
if parser.stream.skip_if('comma'):
datetime_format = parser.parse_expression()
else:
datetime_format = nodes.Const(None)
if isinstance(node, nodes.Add):
call_method = self.call_method(
'_datetime',
[node.left, nodes.Const('+'), node.right, datetime_format],
lineno=lineno,
)
elif isinstance(node, nodes.Sub):
call_method = self.call_method(
'_datetime',
[node.left, nodes.Const('-'), node.right, datetime_format],
lineno=lineno,
)
else:
call_method = self.call_method(
'_now',
[node, datetime_format],
lineno=lineno,
)
return nodes.Output([call_method], lineno=lineno)

View File

@@ -0,0 +1,55 @@
"""
Safe Jinja2 render with max payload sizes
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
"""
import jinja2.sandbox
import typing as t
import os
from .extensions.TimeExtension import TimeExtension
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
# Default extensions - can be overridden in create_jinja_env()
DEFAULT_JINJA2_EXTENSIONS = [TimeExtension]
def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment:
"""
Create a sandboxed Jinja2 environment with our custom extensions and default timezone.
Args:
extensions: List of extension classes to use (defaults to DEFAULT_JINJA2_EXTENSIONS)
**kwargs: Additional arguments to pass to ImmutableSandboxedEnvironment
Returns:
Configured Jinja2 environment
"""
if extensions is None:
extensions = DEFAULT_JINJA2_EXTENSIONS
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(
extensions=extensions,
**kwargs
)
# Get default timezone from environment variable
default_timezone = os.getenv('TZ', 'UTC').strip()
jinja2_env.default_timezone = default_timezone
return jinja2_env
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
# (Which also limits available functions that could be called)
def render(template_str, **args: t.Any) -> str:
jinja2_env = create_jinja_env()
output = jinja2_env.from_string(template_str).render(args)
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
def render_fully_escaped(content):
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
template = env.from_string("{{ some_html|e }}")
return template.render(some_html=content)

View File

@@ -55,11 +55,12 @@ class model(dict):
'rss_access_token': None,
'rss_content_format': RSS_FORMAT_TYPES[0][0],
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'scheduler_timezone_default': None, # Default IANA timezone name
'schema_version' : 0,
'shared_diff_access': False,
'strip_ignored_lines': False,
'tags': {}, #@todo use Tag.model initialisers
'timezone': None, # Default IANA timezone name
'webdriver_delay': None , # Extra delay in seconds before extracting text
'ui': {
'use_page_title_in_list': True,

View File

@@ -1,14 +1,14 @@
from blinker import signal
from changedetectionio.strtobool import strtobool
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
from . import watch_base
import os
import re
from pathlib import Path
from loguru import logger
from .. import safe_jinja
from .. import jinja2_custom as safe_jinja
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
# Allowable protocols, protects against javascript: etc

View File

@@ -16,20 +16,3 @@ valid_notification_formats = {
default_notification_format_for_watch: default_notification_format_for_watch
}
valid_tokens = {
'base_url': '',
'current_snapshot': '',
'diff': '',
'diff_added': '',
'diff_full': '',
'diff_patch': '',
'diff_removed': '',
'diff_url': '',
'preview_url': '',
'triggered_text': '',
'watch_tag': '',
'watch_title': '',
'watch_url': '',
'watch_uuid': '',
}

View File

@@ -3,16 +3,22 @@ import time
import apprise
from loguru import logger
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
from ..notification_service import NotificationContextData
def process_notification(n_object, datastore):
from changedetectionio.safe_jinja import render as jinja_render
def process_notification(n_object: NotificationContextData, datastore):
from changedetectionio.jinja2_custom import render as jinja_render
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
# be sure its registered
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
if not isinstance(n_object, NotificationContextData):
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
now = time.time()
if n_object.get('notification_timestamp'):
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
# Insert variables into the notification content
notification_parameters = create_notification_parameters(n_object, datastore)
@@ -47,7 +53,7 @@ def process_notification(n_object, datastore):
# Get the notification body from datastore
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
if n_object.get('notification_format', '').startswith('HTML'):
if n_format.lower().startswith('html'):
n_body = n_body.replace("\n", '<br>')
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
@@ -141,17 +147,15 @@ def process_notification(n_object, datastore):
# Notification title + body content parameters get created here.
# ( Where we prepare the tokens in the notification to be replaced with actual values )
def create_notification_parameters(n_object, datastore):
from copy import deepcopy
from . import valid_tokens
def create_notification_parameters(n_object: NotificationContextData, datastore):
if not isinstance(n_object, NotificationContextData):
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
# in the case we send a test notification from the main settings, there is no UUID.
uuid = n_object['uuid'] if 'uuid' in n_object else ''
if uuid:
watch_title = datastore.data['watching'][uuid].label
watch = datastore.data['watching'].get(n_object['uuid'])
if watch:
watch_title = datastore.data['watching'][n_object['uuid']].label
tag_list = []
tags = datastore.get_all_tags_for_watch(uuid)
tags = datastore.get_all_tags_for_watch(n_object['uuid'])
if tags:
for tag_uuid, tag in tags.items():
tag_list.append(tag.get('title'))
@@ -166,14 +170,10 @@ def create_notification_parameters(n_object, datastore):
watch_url = n_object['watch_url']
diff_url = "{}/diff/{}".format(base_url, uuid)
preview_url = "{}/preview/{}".format(base_url, uuid)
diff_url = "{}/diff/{}".format(base_url, n_object['uuid'])
preview_url = "{}/preview/{}".format(base_url, n_object['uuid'])
# Not sure deepcopy is needed here, but why not
tokens = deepcopy(valid_tokens)
# Valid_tokens also used as a field validator
tokens.update(
n_object.update(
{
'base_url': base_url,
'diff_url': diff_url,
@@ -181,13 +181,10 @@ def create_notification_parameters(n_object, datastore):
'watch_tag': watch_tag if watch_tag is not None else '',
'watch_title': watch_title if watch_title is not None else '',
'watch_url': watch_url,
'watch_uuid': uuid,
'watch_uuid': n_object['uuid'],
})
# n_object will contain diff, diff_added etc etc
tokens.update(n_object)
if watch:
n_object.update(datastore.data['watching'].get(n_object['uuid']).extra_notification_token_values())
if uuid:
tokens.update(datastore.data['watching'].get(uuid).extra_notification_token_values())
return tokens
return n_object

View File

@@ -6,9 +6,48 @@ Extracted from update_worker.py to provide standalone notification functionality
for both sync and async workers
"""
import time
from loguru import logger
import time
# What is passed around as notification context, also used as the complete list of valid {{ tokens }}
class NotificationContextData(dict):
def __init__(self, initial_data=None, **kwargs):
super().__init__({
'current_snapshot': None,
'diff': None,
'diff_added': None,
'diff_full': None,
'diff_patch': None,
'diff_removed': None,
'notification_timestamp': time.time(),
'screenshot': None,
'triggered_text': None,
'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', # Converted to 'watch_uuid' in create_notification_parameters
'watch_url': 'https://WATCH-PLACE-HOLDER/',
'base_url': None,
'diff_url': None,
'preview_url': None,
'watch_tag': None,
'watch_title': None
})
# Apply any initial data passed in
self.update({'watch_uuid': self.get('uuid')})
if initial_data:
self.update(initial_data)
# Apply any keyword arguments
if kwargs:
self.update(kwargs)
def set_random_for_validation(self):
import random, string
"""Randomly fills all dict keys with random strings (for validation/testing)."""
for key in self.keys():
if key in ['uuid', 'time', 'watch_uuid']:
continue
rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12))
self[key] = rand_str
class NotificationService:
"""
@@ -20,13 +59,16 @@ class NotificationService:
self.datastore = datastore
self.notification_q = notification_q
def queue_notification_for_watch(self, n_object, watch):
def queue_notification_for_watch(self, n_object: NotificationContextData, watch):
"""
Queue a notification for a watch with full diff rendering and template variables
"""
from changedetectionio import diff
from changedetectionio.notification import default_notification_format_for_watch
if not isinstance(n_object, NotificationContextData):
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
dates = []
trigger_text = ''
@@ -79,15 +121,15 @@ class NotificationService:
n_object.update({
'current_snapshot': snapshot_contents,
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
'notification_timestamp': now,
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'triggered_text': triggered_text,
'uuid': watch.get('uuid') if watch else None,
'watch_url': watch.get('url') if watch else None,
'watch_uuid': watch.get('uuid') if watch else None,
})
if watch:
@@ -140,7 +182,7 @@ class NotificationService:
"""
Send notification when content changes are detected
"""
n_object = {}
n_object = NotificationContextData()
watch = self.datastore.data['watching'].get(watch_uuid)
if not watch:
return
@@ -183,11 +225,13 @@ class NotificationService:
if not watch:
return
n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
", ".join(watch['include_filters']),
threshold),
'notification_format': 'text'}
n_object = NotificationContextData({
'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
", ".join(watch['include_filters']),
threshold),
'notification_format': 'text'
})
if len(watch['notification_urls']):
n_object['notification_urls'] = watch['notification_urls']
@@ -215,12 +259,14 @@ class NotificationService:
if not watch:
return
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1),
'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} "
"did not appear on the page after {} attempts, did the page change layout? "
"Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n"
"Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold),
'notification_format': 'text'}
n_object = NotificationContextData({
'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1),
'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} "
"did not appear on the page after {} attempts, did the page change layout? "
"Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n"
"Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold),
'notification_format': 'text'
})
if len(watch['notification_urls']):
n_object['notification_urls'] = watch['notification_urls']

View File

@@ -102,7 +102,7 @@ class difference_detection_processor():
self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid'))
# Tweak the base config with the per-watch ones
from changedetectionio.safe_jinja import render as jinja_render
from changedetectionio.jinja2_custom import render as jinja_render
request_headers = CaseInsensitiveDict()
ua = self.datastore.data['settings']['requests'].get('default_ua')

View File

@@ -64,24 +64,31 @@ class guess_stream_type():
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
test_content_normalized = re.sub(r'<\s+', '<', test_content)
# Magic will sometimes call text/plain as text/html!
# Use puremagic for lightweight MIME detection (saves ~14MB vs python-magic)
magic_result = None
try:
import magic
import puremagic
mime = magic.from_buffer(content[:200], mime=True) # Send the original content
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
if mime and "/" in mime:
magic_result = mime
# Ignore generic/fallback mime types from magic
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
logger.debug(f"Ignoring generic mime type '{mime}' from magic library")
# Trust magic for non-text types immediately
elif mime not in ['text/html', 'text/plain']:
magic_content_header = mime
# puremagic needs bytes, so encode if we have a string
content_bytes = content[:200].encode('utf-8') if isinstance(content, str) else content[:200]
# puremagic returns a list of PureMagic objects with confidence scores
detections = puremagic.magic_string(content_bytes)
if detections:
# Get the highest confidence detection
mime = detections[0].mime_type
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
if mime and "/" in mime:
magic_result = mime
# Ignore generic/fallback mime types
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
logger.debug(f"Ignoring generic mime type '{mime}' from puremagic library")
# Trust puremagic for non-text types immediately
elif mime not in ['text/html', 'text/plain']:
magic_content_header = mime
except Exception as e:
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection")
logger.error(f"Error getting a more precise mime type from 'puremagic' library ({str(e)}), using content-based detection")
# Content-based detection (most reliable for text formats)
# Check for HTML patterns first - if found, override magic's text/plain
@@ -103,7 +110,7 @@ class guess_stream_type():
# magic will call a rss document 'xml'
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
self.is_rss = True
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
# Only mark as generic XML if not already detected as RSS

View File

@@ -228,8 +228,21 @@ class ContentProcessor:
self.datastore = datastore
def preprocess_rss(self, content):
"""Convert CDATA/comments in RSS to usable text."""
return cdata_in_document_to_text(html_content=content)
"""
Convert CDATA/comments in RSS to usable text.
Supports two RSS processing modes:
- 'default': Inline CDATA replacement (original behavior)
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
"""
from changedetectionio import rss_tools
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
if rss_mode:
# Format RSS items nicely with CDATA content unmarked and converted to text
return rss_tools.format_rss_items(content)
else:
# Default: Original inline CDATA replacement
return cdata_in_document_to_text(html_content=content)
def preprocess_pdf(self, raw_content):
"""Convert PDF to HTML using external tool."""
@@ -384,6 +397,11 @@ class perform_site_check(difference_detection_processor):
# RSS preprocessing
if stream_content_type.is_rss:
content = content_processor.preprocess_rss(content)
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
stream_content_type.is_rss = False
stream_content_type.is_html = True
self.fetcher.content = content
# PDF preprocessing
if watch.is_pdf or stream_content_type.is_pdf:
@@ -538,6 +556,20 @@ class perform_site_check(difference_detection_processor):
else:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
# Note: Explicit cleanup is only needed here because text_json_diff handles
# large strings (100KB-300KB for RSS/HTML). The other processors work with
# small strings and don't need this.
#
# Python would clean these up automatically, but explicit `del` frees memory
# immediately rather than waiting for function return, reducing peak memory usage.
del content
if 'html_content' in locals() and html_content is not stripped_text:
del html_content
if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text:
del text_content_before_ignored_filter
if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text:
del text_for_checksuming
return changed_detected, update_obj, stripped_text
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):

View File

@@ -0,0 +1,130 @@
"""
RSS/Atom feed processing tools for changedetection.io
"""
from loguru import logger
import re
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
"""
Process CDATA sections in HTML/XML content - inline replacement.
Args:
html_content: The HTML/XML content to process
render_anchor_tag_content: Whether to render anchor tag content
Returns:
Processed HTML/XML content with CDATA sections replaced inline
"""
from xml.sax.saxutils import escape as xml_escape
from .html_tools import html_to_text
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
def repl(m):
text = m.group(1)
return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip()
return re.sub(pattern, repl, html_content)
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
Format RSS/Atom feed items in a readable text format using feedparser.
Converts RSS <item> or Atom <entry> elements to formatted text with:
- <title> → <h1>Title</h1>
- <link> → Link: [url]
- <guid> → Guid: [id]
- <pubDate> → PubDate: [date]
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
Args:
rss_content: The RSS/Atom feed content
render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility)
Returns:
Formatted HTML content ready for html_to_text conversion
"""
try:
import feedparser
from xml.sax.saxutils import escape as xml_escape
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
feed = feedparser.parse(rss_content)
formatted_items = []
# Determine feed type for appropriate labels when fields are missing
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
is_atom = feed.version and 'atom' in feed.version
for entry in feed.entries:
item_parts = []
# Title - feedparser handles CDATA and entity unescaping automatically
if hasattr(entry, 'title') and entry.title:
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
# Link
if hasattr(entry, 'link') and entry.link:
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
# GUID/ID
if hasattr(entry, 'id') and entry.id:
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
# Date - feedparser normalizes all date field names to 'published'
if hasattr(entry, 'published') and entry.published:
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
# Description/Content - feedparser handles CDATA and entity unescaping automatically
# Only add "Summary:" label for Atom <summary> tags
content = None
add_label = False
if hasattr(entry, 'content') and entry.content:
# Atom <content> - no label, just content
content = entry.content[0].value if entry.content[0].value else None
elif hasattr(entry, 'summary'):
# Could be RSS <description> or Atom <summary>
# feedparser maps both to entry.summary
content = entry.summary if entry.summary else None
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
if is_atom:
add_label = True
# Add content with or without label
if content:
if add_label:
item_parts.append(f'Summary:<br>{content}')
else:
item_parts.append(content)
else:
# No content - just show <none>
item_parts.append('&lt;none&gt;')
# Join all parts of this item
if item_parts:
formatted_items.append('\n'.join(item_parts))
# Wrap each item in a div with classes (first, last, item-N)
items_html = []
total_items = len(formatted_items)
for idx, item in enumerate(formatted_items):
classes = ['rss-item']
if idx == 0:
classes.append('first')
if idx == total_items - 1:
classes.append('last')
classes.append(f'item-{idx + 1}')
class_str = ' '.join(classes)
items_html.append(f'<div class="{class_str}">{item}</div>')
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
except Exception as e:
logger.warning(f"Error formatting RSS items: {str(e)}")
# Fall back to original content
return rss_content

View File

@@ -1,24 +0,0 @@
"""
Safe Jinja2 render with max payload sizes
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
"""
import jinja2.sandbox
import typing as t
import os
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
# (Which also limits available functions that could be called)
def render(template_str, **args: t.Any) -> str:
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
output = jinja2_env.from_string(template_str).render(args)
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
def render_fully_escaped(content):
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
template = env.from_string("{{ some_html|e }}")
return template.render(some_html=content)

View File

@@ -29,7 +29,7 @@ $(document).ready(function () {
$(this).text(new Date($(this).data("utc")).toLocaleString());
})
const timezoneInput = $('#application-timezone');
const timezoneInput = $('#application-scheduler_timezone_default');
if(timezoneInput.length) {
const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
if (!timezoneInput.val().trim()) {

View File

@@ -344,7 +344,7 @@ label {
}
}
#notification-customisation {
.grey-form-border {
border: 1px solid var(--color-border-notification);
padding: 0.5rem;
border-radius: 5px;

File diff suppressed because one or more lines are too long

View File

@@ -976,6 +976,10 @@ class ChangeDetectionStore:
if self.data['settings']['application'].get('extract_title_as_title'):
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def update_21(self):
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
del self.data['settings']['application']['timezone']
def add_notification_url(self, notification_url):

View File

@@ -33,7 +33,7 @@
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
</div>
</div>
<div id="notification-customisation" class="pure-control-group">
<div class="pure-control-group grey-form-border">
<div class="pure-control-group">
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
<span class="pure-form-message-inline">Title for all notifications</span>

View File

@@ -14,13 +14,31 @@
{% if field.errors is mapping and 'form' in field.errors %}
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
{% set errors = field.errors['form'] %}
{% for error in errors %}
<li>{{ error }}</li>
{% endfor %}
{% elif field.type == 'FieldList' %}
{# Handle FieldList of FormFields - errors is a list of dicts, one per entry #}
{% for idx, entry_errors in field.errors|enumerate %}
{% if entry_errors is mapping and entry_errors %}
{# Only show entries that have actual errors #}
<li><strong>Entry {{ idx + 1 }}:</strong>
<ul>
{% for field_name, messages in entry_errors.items() %}
{% for message in messages %}
<li>{{ field_name }}: {{ message }}</li>
{% endfor %}
{% endfor %}
</ul>
</li>
{% endif %}
{% endfor %}
{% else %}
{# regular list of errors with this field #}
{% set errors = field.errors %}
{% for error in field.errors %}
<li>{{ error }}</li>
{% endfor %}
{% endif %}
{% for error in errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
@@ -93,6 +111,39 @@
{{ field(**kwargs)|safe }}
{% endmacro %}
{% macro render_fieldlist_with_inline_errors(fieldlist) %}
{# Specialized macro for FieldList(FormField(...)) that renders errors inline with each field #}
<div {% if fieldlist.errors %} class="error" {% endif %}>{{ fieldlist.label }}</div>
<div {% if fieldlist.errors %} class="error" {% endif %}>
<ul id="{{ fieldlist.id }}">
{% for entry in fieldlist %}
<li {% if entry.errors %} class="error" {% endif %}>
<label for="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>{{ fieldlist.label.text }}-{{ loop.index0 }}</label>
<table id="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>
<tbody>
{% for subfield in entry %}
<tr {% if subfield.errors %} class="error" {% endif %}>
<th {% if subfield.errors %} class="error" {% endif %}><label for="{{ subfield.id }}" {% if subfield.errors %} class="error" {% endif %}>{{ subfield.label.text }}</label></th>
<td {% if subfield.errors %} class="error" {% endif %}>
{{ subfield(**kwargs)|safe }}
{% if subfield.errors %}
<ul class="errors">
{% for error in subfield.errors %}
<li class="error">{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</li>
{% endfor %}
</ul>
</div>
{% endmacro %}
{% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
<div class="fieldlist_formfields" id="{{ table_id }}">
<div class="fieldlist-header">

View File

@@ -4,6 +4,7 @@ import time
from threading import Thread
import pytest
import arrow
from changedetectionio import changedetection_app
from changedetectionio import store
import os
@@ -29,6 +30,17 @@ def reportlog(pytestconfig):
logger.remove(handler_id)
@pytest.fixture
def environment(mocker):
"""Mock arrow.now() to return a fixed datetime for testing jinja2 time extension."""
# Fixed datetime: Wed, 09 Dec 2015 23:33:01 UTC
# This is calculated to match the test expectations when offsets are applied
fixed_datetime = arrow.Arrow(2015, 12, 9, 23, 33, 1, tzinfo='UTC')
# Patch arrow.now in the TimeExtension module where it's actually used
mocker.patch('changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now', return_value=fixed_datetime)
return fixed_datetime
def format_memory_human(bytes_value):
"""Format memory in human-readable units (KB, MB, GB)"""
if bytes_value < 1024:

View File

@@ -49,3 +49,39 @@ def test_select_custom(client, live_server, measure_memory_usage):
#
# Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default
def test_custom_proxy_validation(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
# Goto settings, add our custom one
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
"requests-extra_proxies-0-proxy_url": "xxxxhtt/333??p://test:awesome@squid-custom:3128",
},
follow_redirects=True
)
assert b"Settings updated." not in res.data
assert b'Proxy URLs must start with' in res.data
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
"requests-extra_proxies-0-proxy_url": "https://",
},
follow_redirects=True
)
assert b"Settings updated." not in res.data
assert b"Invalid URL." in res.data

View File

@@ -165,6 +165,46 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
# Cleanup everything
delete_all_watches(client)
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
def test_requests_timeout(client, live_server, measure_memory_usage):
delay = 2
test_url = url_for('test_endpoint', delay=delay, _external=True)
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "",
"requests-time_between_check-minutes": 180,
"requests-timeout": delay - 1,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# requests takes >2 sec but we timeout at 1 second
res = client.get(url_for("watchlist.index"))
assert b'Read timed out. (read timeout=1)' in res.data
##### Now set a longer timeout
res = client.post(
url_for("settings.settings_page"),
data={"application-ui-use_page_title_in_list": "",
"requests-time_between_check-minutes": 180,
"requests-timeout": delay + 1, # timeout should be a second more than the reply time
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'Read timed out' not in res.data
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
"""
@@ -338,4 +378,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
assert b'&lt;string name=&#34;feed_update_receiver_name&#34;' in res.data
assert b'&lt;foobar' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -1,8 +1,10 @@
#!/usr/bin/env python3
import time
import arrow
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from ..jinja2_custom import render
# def test_setup(client, live_server, measure_memory_usage):
@@ -33,6 +35,35 @@ def test_jinja2_in_url_query(client, live_server, measure_memory_usage):
)
assert b'date=2' in res.data
# Test for issue #1493 - jinja2-time offset functionality
def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usage):
"""Test that jinja2 time offset expressions work in watch URLs (issue #1493)."""
# Add our URL to the import page with time offset expression
test_url = url_for('test_return_query', _external=True)
# Test the exact syntax from issue #1493 that was broken in jinja2-time
# This should work now with our custom TimeExtension
full_url = "{}?{}".format(test_url,
"timestamp={% now 'utc' - 'minutes=11', '%Y-%m-%d %H:%M' %}", )
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": full_url, "tags": "test"},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
# Verify the URL was processed correctly (should not have errors)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
# Should have a valid timestamp in the response
assert b'timestamp=' in res.data
# Should not have template error
assert b'Invalid template' not in res.data
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
@@ -56,3 +87,86 @@ def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
assert b'is invalid and cannot be used' in res.data
# Some of the spewed output from the subclasses
assert b'dict_values' not in res.data
def test_timezone(mocker):
"""Verify that timezone is parsed."""
timezone = 'America/Buenos_Aires'
currentDate = arrow.now(timezone)
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
arrowNowMock.return_value = currentDate
finalRender = render(f"{{% now '{timezone}' %}}")
assert finalRender == currentDate.strftime('%a, %d %b %Y %H:%M:%S')
def test_format(mocker):
"""Verify that format is parsed."""
timezone = 'utc'
format = '%d %b %Y %H:%M:%S'
currentDate = arrow.now(timezone)
arrowNowMock = mocker.patch("arrow.now")
arrowNowMock.return_value = currentDate
finalRender = render(f"{{% now '{timezone}', '{format}' %}}")
assert finalRender == currentDate.strftime(format)
def test_add_time(environment):
"""Verify that added time offset can be parsed."""
finalRender = render("{% now 'utc' + 'hours=2,seconds=30' %}")
assert finalRender == "Thu, 10 Dec 2015 01:33:31"
def test_add_weekday(mocker):
"""Verify that added weekday offset can be parsed."""
timezone = 'utc'
currentDate = arrow.now(timezone)
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
arrowNowMock.return_value = currentDate
finalRender = render(f"{{% now '{timezone}' + 'weekday=1' %}}")
assert finalRender == currentDate.shift(weekday=1).strftime('%a, %d %b %Y %H:%M:%S')
def test_substract_time(environment):
"""Verify that substracted time offset can be parsed."""
finalRender = render("{% now 'utc' - 'minutes=11' %}")
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
def test_offset_with_format(environment):
"""Verify that offset works together with datetime format."""
finalRender = render(
"{% now 'utc' - 'days=2,minutes=33,seconds=1', '%d %b %Y %H:%M:%S' %}"
)
assert finalRender == "07 Dec 2015 23:00:00"
def test_default_timezone_empty_string(environment):
"""Verify that empty timezone string uses the default timezone (UTC in test environment)."""
# Empty string should use the default timezone which is 'UTC' (or from application settings)
finalRender = render("{% now '' %}")
# Should render with default format and UTC timezone (matches environment fixture)
assert finalRender == "Wed, 09 Dec 2015 23:33:01"
def test_default_timezone_with_offset(environment):
"""Verify that empty timezone works with offset operations."""
# Empty string with offset should use default timezone
finalRender = render("{% now '' + 'hours=2', '%d %b %Y %H:%M:%S' %}")
assert finalRender == "10 Dec 2015 01:33:01"
def test_default_timezone_subtraction(environment):
"""Verify that empty timezone works with subtraction offset."""
finalRender = render("{% now '' - 'minutes=11' %}")
assert finalRender == "Wed, 09 Dec 2015 23:22:01"

View File

@@ -284,6 +284,27 @@ def test_notification_validation(client, live_server, measure_memory_usage):
)
def test_notification_urls_jinja2_apprise_integration(client, live_server, measure_memory_usage):
#
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation
test_notification_url = "hassio://127.0.0.1/longaccesstoken?verify=no&nid={{watch_uuid}}"
res = client.post(
url_for("settings.settings_page"),
data={
"application-fetch_backend": "html_requests",
"application-minutes_between_check": 180,
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了" }',
"application-notification_format": default_notification_format,
"application-notification_urls": test_notification_url,
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} ",
},
follow_redirects=True
)
assert b'Settings updated' in res.data
def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_memory_usage):
@@ -294,7 +315,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
# CUSTOM JSON BODY CHECK for POST://
set_original_response()
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&xxx={{ watch_url }}&+custom-header=123&+second=hello+world%20%22space%22"
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&watch_uuid={{ watch_uuid }}&xxx={{ watch_url }}&now={% now 'Europe/London', '%Y-%m-%d' %}&+custom-header=123&+second=hello+world%20%22space%22"
res = client.post(
url_for("settings.settings_page"),
@@ -320,6 +341,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
)
assert b"Watch added" in res.data
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
wait_for_all_checks(client)
set_modified_response()
@@ -349,6 +371,11 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
assert 'xxx=http' in notification_url
# apprise style headers should be stripped
assert 'custom-header' not in notification_url
# Check jinja2 custom arrow/jinja2-time replace worked
assert 'now=2' in notification_url
# Check our watch_uuid appeared
assert f'watch_uuid={watch_uuid}' in notification_url
with open("test-datastore/notification-headers.txt", 'r') as f:
notification_headers = f.read()
@@ -416,7 +443,6 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
assert res.status_code != 400
assert res.status_code != 500
with open("test-datastore/notification.txt", 'r') as f:
x = f.read()
assert test_body in x

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
def set_original_cdata_xml():
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>Security Bulletins on wetscale</title>
<link>https://wetscale.com/security-bulletins/</link>
<description>Recent security bulletins from wetscale</description>
<lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate>
<docs>https://validator.w3.org/feed/docs/rss2.html</docs>
<generator>wetscale.com</generator>
<language>en-US</language>
<copyright>© 2025 wetscale Inc. All rights reserved.</copyright>
<atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/>
<item>
<title>TS-2025-005</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-005</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid>
<pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate>
<description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description>
</item>
<item>
<title>TS-2025-004</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-004</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid>
<pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate>
<description>
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
</description>
</item>
</channel>
</rss>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_rss_reader_mode(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents
delete_all_watches(client)
def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' not in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
delete_all_watches(client)

View File

@@ -24,7 +24,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
url_for("settings.settings_page"),
data={"application-empty_pages_are_a_change": "",
"requests-time_between_check-seconds": 1,
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
@@ -119,7 +119,7 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
data = {
"application-empty_pages_are_a_change": "",
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
'application-fetch_backend': "html_requests",
"requests-time_between_check-hours": 0,
"requests-time_between_check-minutes": 0,

View File

@@ -4,7 +4,7 @@
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
import unittest
from changedetectionio import safe_jinja
from changedetectionio import jinja2_custom as safe_jinja
# mostly

View File

@@ -6,7 +6,7 @@
import unittest
import os
from changedetectionio.processors import restock_diff
import changedetectionio.processors.restock_diff.processor as restock_diff
# mostly
class TestDiffBuilder(unittest.TestCase):

View File

@@ -1,11 +1,10 @@
#!/usr/bin/env python3
# run from dir above changedetectionio/ dir
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
# python3 -m unittest changedetectionio.tests.unit.test_scheduler
import unittest
from datetime import datetime, timedelta
from zoneinfo import ZoneInfo
import arrow
class TestScheduler(unittest.TestCase):
@@ -13,12 +12,13 @@ class TestScheduler(unittest.TestCase):
# UTC-12:00 (Baker Island, Howland Island) is the farthest behind, always one calendar day behind UTC.
def test_timezone_basic_time_within_schedule(self):
"""Test that current time is detected as within schedule window."""
from changedetectionio import time_handler
timezone_str = 'Europe/Berlin'
debug_datetime = datetime.now(ZoneInfo(timezone_str))
day_of_week = debug_datetime.strftime('%A')
time_str = str(debug_datetime.hour)+':00'
debug_datetime = arrow.now(timezone_str)
day_of_week = debug_datetime.format('dddd')
time_str = debug_datetime.format('HH:00')
duration = 60 # minutes
# The current time should always be within 60 minutes of [time_hour]:00
@@ -30,16 +30,17 @@ class TestScheduler(unittest.TestCase):
self.assertEqual(result, True, f"{debug_datetime} is within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
def test_timezone_basic_time_outside_schedule(self):
"""Test that time from yesterday is outside current schedule."""
from changedetectionio import time_handler
timezone_str = 'Europe/Berlin'
# We try a date in the future..
debug_datetime = datetime.now(ZoneInfo(timezone_str))+ timedelta(days=-1)
day_of_week = debug_datetime.strftime('%A')
time_str = str(debug_datetime.hour) + ':00'
duration = 60*24 # minutes
# We try a date in the past (yesterday)
debug_datetime = arrow.now(timezone_str).shift(days=-1)
day_of_week = debug_datetime.format('dddd')
time_str = debug_datetime.format('HH:00')
duration = 60 * 24 # minutes
# The current time should always be within 60 minutes of [time_hour]:00
# The current time should NOT be within yesterday's schedule
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
@@ -48,6 +49,58 @@ class TestScheduler(unittest.TestCase):
self.assertNotEqual(result, True,
f"{debug_datetime} is NOT within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
def test_timezone_utc_within_schedule(self):
"""Test UTC timezone works correctly."""
from changedetectionio import time_handler
timezone_str = 'UTC'
debug_datetime = arrow.now(timezone_str)
day_of_week = debug_datetime.format('dddd')
time_str = debug_datetime.format('HH:00')
duration = 120 # minutes
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration)
self.assertTrue(result, "Current time should be within UTC schedule")
def test_timezone_extreme_ahead(self):
"""Test with UTC+14 timezone (Line Islands, Kiribati)."""
from changedetectionio import time_handler
timezone_str = 'Pacific/Kiritimati' # UTC+14
debug_datetime = arrow.now(timezone_str)
day_of_week = debug_datetime.format('dddd')
time_str = debug_datetime.format('HH:00')
duration = 60
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration)
self.assertTrue(result, "Should work with extreme ahead timezone")
def test_timezone_extreme_behind(self):
"""Test with UTC-12 timezone (Baker Island)."""
from changedetectionio import time_handler
# Using Etc/GMT+12 which is UTC-12 (confusing, but that's how it works)
timezone_str = 'Etc/GMT+12' # UTC-12
debug_datetime = arrow.now(timezone_str)
day_of_week = debug_datetime.format('dddd')
time_str = debug_datetime.format('HH:00')
duration = 60
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration)
self.assertTrue(result, "Should work with extreme behind timezone")
if __name__ == '__main__':
unittest.main()

View File

@@ -0,0 +1,138 @@
#!/usr/bin/env python3
"""
Simple unit tests for TimeExtension that mimic how safe_jinja.py uses it.
These tests demonstrate that the environment.default_timezone override works
exactly as intended in the actual application code.
"""
import arrow
from jinja2.sandbox import ImmutableSandboxedEnvironment
from changedetectionio.jinja2_custom.extensions.TimeExtension import TimeExtension
def test_default_timezone_override_like_safe_jinja(mocker):
"""
Test that mirrors exactly how safe_jinja.py uses the TimeExtension.
This is the simplest demonstration that environment.default_timezone works.
"""
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
# Override the default timezone - exactly like safe_jinja.py does
jinja2_env.default_timezone = 'America/New_York'
# Mock arrow.now to return a fixed time
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='America/New_York')
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Use empty string timezone - should use the overridden default
template_str = "{% now '' %}"
output = jinja2_env.from_string(template_str).render()
# Verify arrow.now was called with the overridden timezone
mock.assert_called_with('America/New_York')
assert '2025' in output
assert 'Jan' in output
def test_default_timezone_not_overridden(mocker):
"""
Test that without override, the default 'UTC' from __init__ is used.
"""
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
# DON'T override - should use 'UTC' default
# Mock arrow.now
fixed_time = arrow.Arrow(2025, 1, 15, 17, 0, 0, tzinfo='UTC')
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Use empty string timezone - should use 'UTC' default
template_str = "{% now '' %}"
output = jinja2_env.from_string(template_str).render()
# Verify arrow.now was called with 'UTC'
mock.assert_called_with('UTC')
assert '2025' in output
def test_datetime_format_override_like_safe_jinja(mocker):
"""
Test that environment.datetime_format can be overridden after creation.
"""
# Create environment (default format is '%a, %d %b %Y %H:%M:%S')
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
# Override the datetime format
jinja2_env.datetime_format = '%Y-%m-%d %H:%M:%S'
# Mock arrow.now
fixed_time = arrow.Arrow(2025, 1, 15, 14, 30, 45, tzinfo='UTC')
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Don't specify format - should use overridden default
template_str = "{% now 'UTC' %}"
output = jinja2_env.from_string(template_str).render()
# Should use custom format YYYY-MM-DD HH:MM:SS
assert output == '2025-01-15 14:30:45'
def test_offset_with_overridden_timezone(mocker):
"""
Test that offset operations also respect the overridden default_timezone.
"""
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
# Override to use Europe/London
jinja2_env.default_timezone = 'Europe/London'
fixed_time = arrow.Arrow(2025, 1, 15, 10, 0, 0, tzinfo='Europe/London')
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Use offset with empty timezone string
template_str = "{% now '' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}"
output = jinja2_env.from_string(template_str).render()
# Should have called arrow.now with Europe/London
mock.assert_called_with('Europe/London')
# Should be 10:00 + 2 hours = 12:00
assert output == '2025-01-15 12:00:00'
def test_weekday_parameter_converted_to_int(mocker):
"""
Test that weekday parameter is properly converted from float to int.
This is important because arrow.shift() requires weekday as int, not float.
"""
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
# Wednesday, Jan 15, 2025
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='UTC')
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Add offset to next Monday (weekday=0)
template_str = "{% now 'UTC' + 'weekday=0', '%A' %}"
output = jinja2_env.from_string(template_str).render()
# Should be Monday
assert output == 'Monday'
def test_multiple_offset_parameters(mocker):
"""
Test that multiple offset parameters can be combined in one expression.
"""
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
fixed_time = arrow.Arrow(2025, 1, 15, 10, 30, 45, tzinfo='UTC')
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
# Test multiple parameters: days, hours, minutes, seconds
template_str = "{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10', '%Y-%m-%d %H:%M:%S' %}"
output = jinja2_env.from_string(template_str).render()
# 2025-01-15 10:30:45 + 1 day + 2 hours + 15 minutes + 10 seconds
# = 2025-01-16 12:45:55
assert output == '2025-01-16 12:45:55'

View File

@@ -0,0 +1,429 @@
#!/usr/bin/env python3
"""
Comprehensive tests for time_handler module refactored to use arrow.
Run from project root:
python3 -m pytest changedetectionio/tests/unit/test_time_handler.py -v
"""
import unittest
import arrow
from changedetectionio import time_handler
class TestAmIInsideTime(unittest.TestCase):
"""Tests for the am_i_inside_time function."""
def test_current_time_within_schedule(self):
"""Test that current time is detected as within schedule."""
# Get current time in a specific timezone
timezone_str = 'Europe/Berlin'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = now.format('HH:00') # Current hour, 0 minutes
duration = 60 # 60 minutes
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result, f"Current time should be within {duration} minute window starting at {time_str}")
def test_current_time_outside_schedule(self):
"""Test that time in the past is not within current schedule."""
timezone_str = 'Europe/Berlin'
# Get yesterday's date
yesterday = arrow.now(timezone_str).shift(days=-1)
day_of_week = yesterday.format('dddd')
time_str = yesterday.format('HH:mm')
duration = 30 # Only 30 minutes
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertFalse(result, "Yesterday's time should not be within current schedule")
def test_timezone_pacific_within_schedule(self):
"""Test with US/Pacific timezone."""
timezone_str = 'US/Pacific'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = now.format('HH:00')
duration = 120 # 2 hours
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result)
def test_timezone_tokyo_within_schedule(self):
"""Test with Asia/Tokyo timezone."""
timezone_str = 'Asia/Tokyo'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = now.format('HH:00')
duration = 90 # 1.5 hours
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result)
def test_schedule_crossing_midnight(self):
"""Test schedule that crosses midnight."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
# Set schedule to start 23:30 with 120 minute duration (crosses midnight)
day_of_week = now.format('dddd')
time_str = "23:30"
duration = 120 # 2 hours - goes into next day
# If we're at 00:15 the next day, we should still be in the schedule
if now.hour == 0 and now.minute < 30:
# We're in the time window that spilled over from yesterday
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
# This might be true or false depending on exact time
self.assertIsInstance(result, bool)
def test_invalid_day_of_week(self):
"""Test that invalid day raises ValueError."""
with self.assertRaises(ValueError) as context:
time_handler.am_i_inside_time(
day_of_week="Funday",
time_str="12:00",
timezone_str="UTC",
duration=60
)
self.assertIn("Invalid day_of_week", str(context.exception))
def test_invalid_time_format(self):
"""Test that invalid time format raises ValueError."""
with self.assertRaises(ValueError) as context:
time_handler.am_i_inside_time(
day_of_week="Monday",
time_str="25:99",
timezone_str="UTC",
duration=60
)
self.assertIn("Invalid time_str", str(context.exception))
def test_invalid_time_format_non_numeric(self):
"""Test that non-numeric time raises ValueError."""
with self.assertRaises(ValueError) as context:
time_handler.am_i_inside_time(
day_of_week="Monday",
time_str="twelve:thirty",
timezone_str="UTC",
duration=60
)
self.assertIn("Invalid time_str", str(context.exception))
def test_invalid_timezone(self):
"""Test that invalid timezone raises ValueError."""
with self.assertRaises(ValueError) as context:
time_handler.am_i_inside_time(
day_of_week="Monday",
time_str="12:00",
timezone_str="Invalid/Timezone",
duration=60
)
self.assertIn("Invalid timezone_str", str(context.exception))
def test_short_duration(self):
"""Test with very short duration (15 minutes default)."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = now.format('HH:mm')
duration = 15 # Default duration
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result, "Current time should be within 15 minute window")
def test_long_duration(self):
"""Test with long duration (24 hours)."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
# Set time to current hour
time_str = now.format('HH:00')
duration = 1440 # 24 hours in minutes
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result, "Current time should be within 24 hour window")
def test_case_insensitive_day(self):
"""Test that day of week is case insensitive."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd').lower() # lowercase day
time_str = now.format('HH:00')
duration = 60
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
self.assertTrue(result, "Lowercase day should work")
def test_edge_case_midnight(self):
"""Test edge case at exactly midnight."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = "00:00"
duration = 60
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
# Should be true if we're in the first hour of the day
if now.hour == 0:
self.assertTrue(result)
def test_edge_case_end_of_day(self):
"""Test edge case near end of day."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
day_of_week = now.format('dddd')
time_str = "23:45"
duration = 30 # 30 minutes crosses midnight
result = time_handler.am_i_inside_time(
day_of_week=day_of_week,
time_str=time_str,
timezone_str=timezone_str,
duration=duration
)
# Result depends on current time
self.assertIsInstance(result, bool)
class TestIsWithinSchedule(unittest.TestCase):
"""Tests for the is_within_schedule function."""
def test_schedule_disabled(self):
"""Test that disabled schedule returns False."""
time_schedule_limit = {'enabled': False}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertFalse(result)
def test_schedule_none(self):
"""Test that None schedule returns False."""
result = time_handler.is_within_schedule(None)
self.assertFalse(result)
def test_schedule_empty_dict(self):
"""Test that empty dict returns False."""
result = time_handler.is_within_schedule({})
self.assertFalse(result)
def test_schedule_enabled_but_day_disabled(self):
"""Test schedule enabled but current day disabled."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
current_day = now.format('dddd').lower()
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': False,
'start_time': '09:00',
'duration': {'hours': 8, 'minutes': 0}
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertFalse(result, "Disabled day should return False")
def test_schedule_enabled_within_time(self):
"""Test schedule enabled and within time window."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
current_day = now.format('dddd').lower()
current_hour = now.format('HH:00')
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': True,
'start_time': current_hour,
'duration': {'hours': 2, 'minutes': 0}
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertTrue(result, "Current time should be within schedule")
def test_schedule_enabled_outside_time(self):
"""Test schedule enabled but outside time window."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
current_day = now.format('dddd').lower()
# Set time to 3 hours ago
past_time = now.shift(hours=-3).format('HH:mm')
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': True,
'start_time': past_time,
'duration': {'hours': 1, 'minutes': 0} # Only 1 hour duration
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertFalse(result, "3 hours ago with 1 hour duration should be False")
def test_schedule_with_default_timezone(self):
"""Test schedule without timezone uses default."""
now = arrow.now('America/New_York')
current_day = now.format('dddd').lower()
current_hour = now.format('HH:00')
time_schedule_limit = {
'enabled': True,
# No timezone specified
current_day: {
'enabled': True,
'start_time': current_hour,
'duration': {'hours': 2, 'minutes': 0}
}
}
# Should use default UTC, but since we're testing with NY time,
# the result depends on time difference
result = time_handler.is_within_schedule(
time_schedule_limit,
default_tz='America/New_York'
)
self.assertTrue(result, "Should work with default timezone")
def test_schedule_different_timezones(self):
"""Test schedule works correctly across different timezones."""
# Test with Tokyo timezone
timezone_str = 'Asia/Tokyo'
now = arrow.now(timezone_str)
current_day = now.format('dddd').lower()
current_hour = now.format('HH:00')
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': True,
'start_time': current_hour,
'duration': {'hours': 1, 'minutes': 30}
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertTrue(result)
def test_schedule_with_minutes_in_duration(self):
"""Test schedule with minutes specified in duration."""
timezone_str = 'UTC'
now = arrow.now(timezone_str)
current_day = now.format('dddd').lower()
current_time = now.format('HH:mm')
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': True,
'start_time': current_time,
'duration': {'hours': 0, 'minutes': 45}
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertTrue(result, "Should handle minutes in duration")
def test_schedule_with_timezone_whitespace(self):
"""Test that timezone with whitespace is handled."""
timezone_str = ' UTC '
now = arrow.now('UTC')
current_day = now.format('dddd').lower()
current_hour = now.format('HH:00')
time_schedule_limit = {
'enabled': True,
'timezone': timezone_str,
current_day: {
'enabled': True,
'start_time': current_hour,
'duration': {'hours': 1, 'minutes': 0}
}
}
result = time_handler.is_within_schedule(time_schedule_limit)
self.assertTrue(result, "Should handle timezone with whitespace")
class TestWeekdayEnum(unittest.TestCase):
"""Tests for the Weekday enum."""
def test_weekday_values(self):
"""Test that weekday enum has correct values."""
self.assertEqual(time_handler.Weekday.Monday, 0)
self.assertEqual(time_handler.Weekday.Tuesday, 1)
self.assertEqual(time_handler.Weekday.Wednesday, 2)
self.assertEqual(time_handler.Weekday.Thursday, 3)
self.assertEqual(time_handler.Weekday.Friday, 4)
self.assertEqual(time_handler.Weekday.Saturday, 5)
self.assertEqual(time_handler.Weekday.Sunday, 6)
def test_weekday_string_access(self):
"""Test accessing weekday enum by string."""
self.assertEqual(time_handler.Weekday['Monday'], 0)
self.assertEqual(time_handler.Weekday['Sunday'], 6)
if __name__ == '__main__':
unittest.main()

View File

@@ -188,6 +188,10 @@ def new_live_server_setup(live_server):
ctype = request.args.get('content_type')
status_code = request.args.get('status_code')
content = request.args.get('content') or None
delay = int(request.args.get('delay', 0))
if delay:
time.sleep(delay)
# Used to just try to break the header detection
uppercase_headers = request.args.get('uppercase_headers')

View File

@@ -1,6 +1,5 @@
from datetime import timedelta, datetime
import arrow
from enum import IntEnum
from zoneinfo import ZoneInfo
class Weekday(IntEnum):
@@ -40,54 +39,65 @@ def am_i_inside_time(
# Parse the start time
try:
target_time = datetime.strptime(time_str, '%H:%M').time()
except ValueError:
hour, minute = map(int, time_str.split(':'))
if not (0 <= hour <= 23 and 0 <= minute <= 59):
raise ValueError
except (ValueError, AttributeError):
raise ValueError(f"Invalid time_str: '{time_str}'. Must be in 'HH:MM' format.")
# Define the timezone
try:
tz = ZoneInfo(timezone_str)
except Exception:
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
# Get the current time in the specified timezone
now_tz = datetime.now(tz)
try:
now_tz = arrow.now(timezone_str.strip())
except Exception as e:
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
# Check if the current day matches the target day or overlaps due to duration
current_weekday = now_tz.weekday()
start_datetime_tz = datetime.combine(now_tz.date(), target_time, tzinfo=tz)
# Create start datetime for today in target timezone
start_datetime_tz = now_tz.replace(hour=hour, minute=minute, second=0, microsecond=0)
# Handle previous day's overlap
if target_weekday == (current_weekday - 1) % 7:
# Calculate start and end times for the overlap from the previous day
start_datetime_tz -= timedelta(days=1)
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
start_datetime_tz = start_datetime_tz.shift(days=-1)
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
if start_datetime_tz <= now_tz < end_datetime_tz:
return True
# Handle current day's range
if target_weekday == current_weekday:
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
if start_datetime_tz <= now_tz < end_datetime_tz:
return True
# Handle next day's overlap
if target_weekday == (current_weekday + 1) % 7:
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
if now_tz < start_datetime_tz and now_tz + timedelta(days=1) < end_datetime_tz:
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
if now_tz < start_datetime_tz and now_tz.shift(days=1) < end_datetime_tz:
return True
return False
def is_within_schedule(time_schedule_limit, default_tz="UTC"):
"""
Check if the current time is within a scheduled time window.
Parameters:
time_schedule_limit (dict): Schedule configuration with timezone, day settings, etc.
default_tz (str): Default timezone to use if not specified. Default is 'UTC'.
Returns:
bool: True if current time is within the schedule, False otherwise.
"""
if time_schedule_limit and time_schedule_limit.get('enabled'):
# Get the timezone the time schedule is in, so we know what day it is there
tz_name = time_schedule_limit.get('timezone')
if not tz_name:
tz_name = default_tz
now_day_name_in_tz = datetime.now(ZoneInfo(tz_name.strip())).strftime('%A')
# Get current day name in the target timezone
now_day_name_in_tz = arrow.now(tz_name.strip()).format('dddd')
selected_day_schedule = time_schedule_limit.get(now_day_name_in_tz.lower())
if not selected_day_schedule.get('enabled'):
return False

View File

@@ -1,5 +1,6 @@
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
feedgen~=0.9
feedparser~=6.0 # For parsing RSS/Atom feeds
flask-compress
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
flask-login>=0.6.3
@@ -72,7 +73,7 @@ werkzeug==3.0.6
# Templating, so far just in the URLs but in the future can be for the notifications also
jinja2~=3.1
jinja2-time
arrow
openpyxl
# https://peps.python.org/pep-0508/#environment-markers
# https://github.com/dgtlmoon/changedetection.io/pull/1009
@@ -88,6 +89,7 @@ pyppeteerstealth>=0.0.4
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
pytest ~=7.2
pytest-flask ~=1.2
pytest-mock ~=3.15
# Anything 4.0 and up but not 5.0
jsonschema ~= 4.0
@@ -124,8 +126,9 @@ price-parser
# flask_socket_io - incorrect package name, already have flask-socketio above
# So far for detecting correct favicon type, but for other things in the future
python-magic
# Lightweight MIME type detection (saves ~14MB memory vs python-magic/libmagic)
# Used for detecting correct favicon type and content-type detection
puremagic
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
tzdata