mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-01 15:17:20 +00:00
Compare commits
14 Commits
rss-reader
...
flask-upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2f07adc59 | ||
|
|
804e8c249b | ||
|
|
b26333b01c | ||
|
|
46252bc6f3 | ||
|
|
64350a2e78 | ||
|
|
2902c63a3b | ||
|
|
55b8588f1f | ||
|
|
02ecc4ae9a | ||
|
|
3ee50b7832 | ||
|
|
66ddd87ee4 | ||
|
|
233189e4f7 | ||
|
|
b237fd7201 | ||
|
|
3c81efe2f4 | ||
|
|
0fcfb94690 |
@@ -253,6 +253,30 @@ jobs:
|
||||
docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt
|
||||
docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt
|
||||
|
||||
- name: Extract and display memory test report
|
||||
if: always()
|
||||
run: |
|
||||
# Extract test-memory.log from the container
|
||||
echo "Extracting test-memory.log from container..."
|
||||
docker cp test-cdio-basic-tests:/app/changedetectionio/test-memory.log output-logs/test-memory-${{ env.PYTHON_VERSION }}.log || echo "test-memory.log not found in container"
|
||||
|
||||
# Display the memory log contents for immediate visibility in workflow output
|
||||
echo "=== Top 10 Highest Peak Memory Tests ==="
|
||||
if [ -f output-logs/test-memory-${{ env.PYTHON_VERSION }}.log ]; then
|
||||
# Sort by peak memory value (extract number before MB and sort numerically, reverse order)
|
||||
grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | \
|
||||
sed 's/.*Peak memory: //' | \
|
||||
paste -d'|' - <(grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log) | \
|
||||
sort -t'|' -k1 -nr | \
|
||||
cut -d'|' -f2 | \
|
||||
head -10
|
||||
echo ""
|
||||
echo "=== Full Memory Test Report ==="
|
||||
cat output-logs/test-memory-${{ env.PYTHON_VERSION }}.log
|
||||
else
|
||||
echo "No memory log available"
|
||||
fi
|
||||
|
||||
- name: Store everything including test-datastore
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
|
||||
@@ -2,6 +2,7 @@ recursive-include changedetectionio/api *
|
||||
recursive-include changedetectionio/blueprint *
|
||||
recursive-include changedetectionio/conditions *
|
||||
recursive-include changedetectionio/content_fetchers *
|
||||
recursive-include changedetectionio/jinja2_custom *
|
||||
recursive-include changedetectionio/model *
|
||||
recursive-include changedetectionio/notification *
|
||||
recursive-include changedetectionio/processors *
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.20'
|
||||
__version__ = '0.50.23'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -334,6 +334,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
|
||||
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
|
||||
|
||||
# Explicitly delete large content variables to free memory IMMEDIATELY after saving
|
||||
# These are no longer needed after being saved to history
|
||||
del contents
|
||||
|
||||
# Send notifications on second+ check
|
||||
if watch.history_n >= 2:
|
||||
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
|
||||
@@ -372,6 +376,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
|
||||
'check_count': count})
|
||||
|
||||
# NOW clear fetcher content - after all processing is complete
|
||||
# This is the last point where we need the fetcher data
|
||||
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||
update_handler.fetcher.clear_content()
|
||||
logger.debug(f"Cleared fetcher content for UUID {uuid}")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
|
||||
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
|
||||
@@ -392,7 +402,28 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
||||
#logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}")
|
||||
watch_check_update.send(watch_uuid=watch['uuid'])
|
||||
|
||||
update_handler = None
|
||||
# Explicitly clean up update_handler and all its references
|
||||
if update_handler:
|
||||
# Clear fetcher content using the proper method
|
||||
if hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||
update_handler.fetcher.clear_content()
|
||||
|
||||
# Clear processor references
|
||||
if hasattr(update_handler, 'content_processor'):
|
||||
update_handler.content_processor = None
|
||||
|
||||
update_handler = None
|
||||
|
||||
# Clear local contents variable if it still exists
|
||||
if 'contents' in locals():
|
||||
del contents
|
||||
|
||||
# Note: We don't set watch = None here because:
|
||||
# 1. watch is just a local reference to datastore.data['watching'][uuid]
|
||||
# 2. Setting it to None doesn't affect the datastore
|
||||
# 3. GC can't collect the object anyway (still referenced by datastore)
|
||||
# 4. It would just cause confusion
|
||||
|
||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||
except Exception as cleanup_error:
|
||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||
|
||||
@@ -6,7 +6,7 @@ from loguru import logger
|
||||
|
||||
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT
|
||||
from changedetectionio.content_fetchers.base import manage_user_agent
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
|
||||
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
def long_task(uuid, preferred_proxy):
|
||||
import time
|
||||
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
|
||||
status = {'status': '', 'length': 0, 'text': ''}
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from feedgen.feed import FeedGenerator
|
||||
from flask import Blueprint, make_response, request, url_for, redirect
|
||||
|
||||
@@ -119,7 +119,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
hide_remove_pass=os.getenv("SALTED_PASS", False),
|
||||
min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)),
|
||||
settings_application=datastore.data['settings']['application'],
|
||||
timezone_default_config=datastore.data['settings']['application'].get('timezone'),
|
||||
timezone_default_config=datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||
utc_time=utc_time,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
{% extends 'base.html' %}
|
||||
|
||||
{% block content %}
|
||||
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %}
|
||||
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %}
|
||||
{% from '_common_fields.html' import render_common_settings_form %}
|
||||
<script>
|
||||
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
|
||||
@@ -72,25 +72,23 @@
|
||||
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
|
||||
</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_content_format) }}
|
||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
|
||||
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
|
||||
</div>
|
||||
{% if form.requests.proxy %}
|
||||
<div class="pure-control-group inline-radio">
|
||||
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
||||
<span class="pure-form-message-inline">
|
||||
Choose a default proxy for all watches
|
||||
</span>
|
||||
<div class="grey-form-border">
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.rss_content_format) }}
|
||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
|
||||
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</fieldset>
|
||||
</div>
|
||||
|
||||
@@ -133,6 +131,10 @@
|
||||
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
|
||||
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.requests.form.timeout) }}
|
||||
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
|
||||
</div>
|
||||
<div class="pure-control-group inline-radio">
|
||||
{{ render_field(form.requests.form.default_ua) }}
|
||||
<span class="pure-form-message-inline">
|
||||
@@ -236,7 +238,7 @@ nav
|
||||
<p><strong>UTC Time & Date from Server:</strong> <span id="utc-time" >{{ utc_time }}</span></p>
|
||||
<p><strong>Local Time & Date in Browser:</strong> <span class="local-time" data-utc="{{ utc_time }}"></span></p>
|
||||
<p>
|
||||
{{ render_field(form.application.form.timezone) }}
|
||||
{{ render_field(form.application.form.scheduler_timezone_default) }}
|
||||
<datalist id="timezones" style="display: none;">
|
||||
{% for tz_name in available_timezones %}
|
||||
<option value="{{ tz_name }}">{{ tz_name }}</option>
|
||||
@@ -314,17 +316,27 @@ nav
|
||||
<p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites.
|
||||
|
||||
<div class="pure-control-group" id="extra-proxies-setting">
|
||||
{{ render_field(form.requests.form.extra_proxies) }}
|
||||
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }}
|
||||
<span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br>
|
||||
<span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span>
|
||||
{% if form.requests.proxy %}
|
||||
<div>
|
||||
<br>
|
||||
<div class="inline-radio">
|
||||
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
||||
<span class="pure-form-message-inline">Choose a default proxy for all watches</span>
|
||||
</div>
|
||||
</div>
|
||||
{% endif %}
|
||||
</div>
|
||||
<div class="pure-control-group" id="extra-browsers-setting">
|
||||
<p>
|
||||
<span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br>
|
||||
<span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span>
|
||||
</p>
|
||||
{{ render_field(form.requests.form.extra_browsers) }}
|
||||
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }}
|
||||
</div>
|
||||
|
||||
</div>
|
||||
<div id="actions">
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -187,7 +187,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
tz_name = time_schedule_limit.get('timezone')
|
||||
if not tz_name:
|
||||
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
|
||||
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
|
||||
|
||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||
try:
|
||||
@@ -257,7 +257,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
|
||||
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||
'using_global_webdriver_wait': not default['webdriver_delay'],
|
||||
'uuid': uuid,
|
||||
'watch': watch,
|
||||
|
||||
@@ -64,6 +64,19 @@ class Fetcher():
|
||||
# Time ONTOP of the system defined env minimum time
|
||||
render_extract_delay = 0
|
||||
|
||||
def clear_content(self):
|
||||
"""
|
||||
Explicitly clear all content from memory to free up heap space.
|
||||
Call this after content has been saved to disk.
|
||||
"""
|
||||
self.content = None
|
||||
if hasattr(self, 'raw_content'):
|
||||
self.raw_content = None
|
||||
self.screenshot = None
|
||||
self.xpath_data = None
|
||||
# Keep headers and status_code as they're small
|
||||
logger.trace("Fetcher content cleared from memory")
|
||||
|
||||
@abstractmethod
|
||||
def get_error(self):
|
||||
return self.error
|
||||
@@ -128,7 +141,7 @@ class Fetcher():
|
||||
async def iterate_browser_steps(self, start_url=None):
|
||||
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
|
||||
from playwright._impl._errors import TimeoutError, Error
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
step_n = 0
|
||||
|
||||
if self.browser_steps is not None and len(self.browser_steps):
|
||||
|
||||
@@ -51,6 +51,7 @@ class fetcher(Fetcher):
|
||||
|
||||
session = requests.Session()
|
||||
|
||||
|
||||
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
|
||||
from requests_file import FileAdapter
|
||||
session.mount('file://', FileAdapter())
|
||||
|
||||
@@ -795,7 +795,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
else:
|
||||
time_schedule_limit = watch.get('time_schedule_limit')
|
||||
logger.trace(f"{uuid} Time scheduler - Using watch settings (not global settings)")
|
||||
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
|
||||
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
|
||||
|
||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||
try:
|
||||
|
||||
@@ -487,9 +487,8 @@ class ValidateJinja2Template(object):
|
||||
"""
|
||||
def __call__(self, form, field):
|
||||
from changedetectionio import notification
|
||||
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
|
||||
from jinja2.sandbox import ImmutableSandboxedEnvironment
|
||||
from jinja2.meta import find_undeclared_variables
|
||||
import jinja2.exceptions
|
||||
|
||||
@@ -497,9 +496,11 @@ class ValidateJinja2Template(object):
|
||||
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
|
||||
|
||||
try:
|
||||
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader, extensions=['jinja2_time.TimeExtension'])
|
||||
# Use the shared helper to create a properly configured environment
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
|
||||
# Add notification tokens for validation
|
||||
jinja2_env.globals.update(notification.valid_tokens)
|
||||
# Extra validation tokens provided on the form_class(... extra_tokens={}) setup
|
||||
if hasattr(field, 'extra_notification_tokens'):
|
||||
jinja2_env.globals.update(field.extra_notification_tokens)
|
||||
|
||||
@@ -511,6 +512,7 @@ class ValidateJinja2Template(object):
|
||||
except jinja2.exceptions.SecurityError as e:
|
||||
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
|
||||
|
||||
# Check for undeclared variables
|
||||
ast = jinja2_env.parse(joined_data)
|
||||
undefined = ", ".join(find_undeclared_variables(ast))
|
||||
if undefined:
|
||||
@@ -678,6 +680,51 @@ class ValidateCSSJSONXPATHInput(object):
|
||||
except:
|
||||
raise ValidationError("A system-error occurred when validating your jq expression")
|
||||
|
||||
class ValidateSimpleURL:
|
||||
"""Validate that the value can be parsed by urllib.parse.urlparse() and has a scheme/netloc."""
|
||||
def __init__(self, message=None):
|
||||
self.message = message or "Invalid URL."
|
||||
|
||||
def __call__(self, form, field):
|
||||
data = (field.data or "").strip()
|
||||
if not data:
|
||||
return # empty is OK — pair with validators.Optional()
|
||||
from urllib.parse import urlparse
|
||||
|
||||
parsed = urlparse(data)
|
||||
if not parsed.scheme or not parsed.netloc:
|
||||
raise ValidationError(self.message)
|
||||
|
||||
class ValidateStartsWithRegex(object):
|
||||
def __init__(self, regex, *, flags=0, message=None, allow_empty=True, split_lines=True):
|
||||
# compile with given flags (we’ll pass re.IGNORECASE below)
|
||||
self.pattern = re.compile(regex, flags) if isinstance(regex, str) else regex
|
||||
self.message = message
|
||||
self.allow_empty = allow_empty
|
||||
self.split_lines = split_lines
|
||||
|
||||
def __call__(self, form, field):
|
||||
data = field.data
|
||||
if not data:
|
||||
return
|
||||
|
||||
# normalize into list of lines
|
||||
if isinstance(data, str) and self.split_lines:
|
||||
lines = data.splitlines()
|
||||
elif isinstance(data, (list, tuple)):
|
||||
lines = data
|
||||
else:
|
||||
lines = [data]
|
||||
|
||||
for line in lines:
|
||||
stripped = line.strip()
|
||||
if not stripped:
|
||||
if self.allow_empty:
|
||||
continue
|
||||
raise ValidationError(self.message or "Empty value not allowed.")
|
||||
if not self.pattern.match(stripped):
|
||||
raise ValidationError(self.message or "Invalid value.")
|
||||
|
||||
class quickWatchForm(Form):
|
||||
from . import processors
|
||||
|
||||
@@ -705,7 +752,7 @@ class commonSettingsForm(Form):
|
||||
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
|
||||
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
|
||||
timezone = StringField("Timezone for watch schedule", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
|
||||
scheduler_timezone_default = StringField("Default timezone for watch check scheduler", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
|
||||
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
|
||||
|
||||
|
||||
@@ -795,7 +842,7 @@ class processor_text_json_diff_form(commonSettingsForm):
|
||||
if not super().validate():
|
||||
return False
|
||||
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
result = True
|
||||
|
||||
# Fail form validation when a body is set for a GET
|
||||
@@ -858,23 +905,36 @@ class processor_text_json_diff_form(commonSettingsForm):
|
||||
):
|
||||
super().__init__(formdata, obj, prefix, data, meta, **kwargs)
|
||||
if kwargs and kwargs.get('default_system_settings'):
|
||||
default_tz = kwargs.get('default_system_settings').get('application', {}).get('timezone')
|
||||
default_tz = kwargs.get('default_system_settings').get('application', {}).get('scheduler_timezone_default')
|
||||
if default_tz:
|
||||
self.time_schedule_limit.form.timezone.render_kw['placeholder'] = default_tz
|
||||
|
||||
|
||||
|
||||
class SingleExtraProxy(Form):
|
||||
|
||||
# maybe better to set some <script>var..
|
||||
proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
||||
proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
|
||||
# @todo do the validation here instead
|
||||
proxy_url = StringField('Proxy URL', [
|
||||
validators.Optional(),
|
||||
ValidateStartsWithRegex(
|
||||
regex=r'^(https?|socks5)://', # ✅ main pattern
|
||||
flags=re.IGNORECASE, # ✅ makes it case-insensitive
|
||||
message='Proxy URLs must start with http://, https:// or socks5://',
|
||||
),
|
||||
ValidateSimpleURL()
|
||||
], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
|
||||
|
||||
class SingleExtraBrowser(Form):
|
||||
browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
||||
browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
|
||||
# @todo do the validation here instead
|
||||
browser_connection_url = StringField('Browser connection URL', [
|
||||
validators.Optional(),
|
||||
ValidateStartsWithRegex(
|
||||
regex=r'^(wss?|ws)://',
|
||||
flags=re.IGNORECASE,
|
||||
message='Browser URLs must start with wss:// or ws://'
|
||||
),
|
||||
ValidateSimpleURL()
|
||||
], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
|
||||
|
||||
class DefaultUAInputForm(Form):
|
||||
html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
|
||||
@@ -885,7 +945,7 @@ class DefaultUAInputForm(Form):
|
||||
class globalSettingsRequestForm(Form):
|
||||
time_between_check = RequiredFormField(TimeBetweenCheckForm)
|
||||
time_schedule_limit = FormField(ScheduleLimitForm)
|
||||
proxy = RadioField('Proxy')
|
||||
proxy = RadioField('Default proxy')
|
||||
jitter_seconds = IntegerField('Random jitter seconds ± check',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||
@@ -894,7 +954,12 @@ class globalSettingsRequestForm(Form):
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=1, max=50,
|
||||
message="Should be between 1 and 50")])
|
||||
|
||||
|
||||
timeout = IntegerField('Requests timeout in seconds',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=1, max=999,
|
||||
message="Should be between 1 and 999")])
|
||||
|
||||
extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
|
||||
extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)
|
||||
|
||||
@@ -940,6 +1005,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
||||
strip_ignored_lines = BooleanField('Strip ignored lines')
|
||||
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
|
||||
validators=[validators.Optional()])
|
||||
|
||||
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
|
||||
validators=[validators.Optional()])
|
||||
|
||||
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0,
|
||||
|
||||
20
changedetectionio/jinja2_custom/__init__.py
Normal file
20
changedetectionio/jinja2_custom/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
||||
"""
|
||||
Jinja2 custom extensions and safe rendering utilities.
|
||||
"""
|
||||
from .extensions.TimeExtension import TimeExtension
|
||||
from .safe_jinja import (
|
||||
render,
|
||||
render_fully_escaped,
|
||||
create_jinja_env,
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE,
|
||||
DEFAULT_JINJA2_EXTENSIONS,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
'TimeExtension',
|
||||
'render',
|
||||
'render_fully_escaped',
|
||||
'create_jinja_env',
|
||||
'JINJA2_MAX_RETURN_PAYLOAD_SIZE',
|
||||
'DEFAULT_JINJA2_EXTENSIONS',
|
||||
]
|
||||
221
changedetectionio/jinja2_custom/extensions/TimeExtension.py
Normal file
221
changedetectionio/jinja2_custom/extensions/TimeExtension.py
Normal file
@@ -0,0 +1,221 @@
|
||||
"""
|
||||
Jinja2 TimeExtension - Custom date/time handling for templates.
|
||||
|
||||
This extension provides the {% now %} tag for Jinja2 templates, offering timezone-aware
|
||||
date/time formatting with support for time offsets.
|
||||
|
||||
Why This Extension Exists:
|
||||
The Arrow library has a now() function (arrow.now()), but Jinja2 templates cannot
|
||||
directly call Python functions - they need extensions or filters to expose functionality.
|
||||
|
||||
This TimeExtension serves as a Jinja2-to-Arrow bridge that:
|
||||
|
||||
1. Makes Arrow accessible in templates - Jinja2 requires registering functions/tags
|
||||
through extensions. You cannot use arrow.now() directly in a template.
|
||||
|
||||
2. Provides template-friendly syntax - Instead of complex Python code, you get clean tags:
|
||||
{% now 'UTC' %}
|
||||
{% now 'UTC' + 'hours=2' %}
|
||||
{% now 'Europe/London', '%Y-%m-%d' %}
|
||||
|
||||
3. Adds convenience features on top of Arrow:
|
||||
- Default timezone from environment variable (TZ) or config
|
||||
- Default datetime format configuration
|
||||
- Offset syntax parsing: 'hours=2,minutes=30' → shift(hours=2, minutes=30)
|
||||
- Empty string timezone support to use configured defaults
|
||||
|
||||
4. Maintains security - Works within Jinja2's sandboxed environment so users
|
||||
cannot access arbitrary Python code or objects.
|
||||
|
||||
Essentially, this is a Jinja2 wrapper around arrow.now() and arrow.shift() that
|
||||
provides user-friendly template syntax while maintaining security.
|
||||
|
||||
Basic Usage:
|
||||
{% now 'UTC' %}
|
||||
# Output: Wed, 09 Dec 2015 23:33:01
|
||||
|
||||
Custom Format:
|
||||
{% now 'UTC', '%Y-%m-%d %H:%M:%S' %}
|
||||
# Output: 2015-12-09 23:33:01
|
||||
|
||||
Timezone Support:
|
||||
{% now 'America/New_York' %}
|
||||
{% now 'Europe/London' %}
|
||||
{% now '' %} # Uses default timezone from environment.default_timezone
|
||||
|
||||
Time Offsets (Addition):
|
||||
{% now 'UTC' + 'hours=2' %}
|
||||
{% now 'UTC' + 'hours=2,minutes=30' %}
|
||||
{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10' %}
|
||||
|
||||
Time Offsets (Subtraction):
|
||||
{% now 'UTC' - 'minutes=11' %}
|
||||
{% now 'UTC' - 'days=2,minutes=33,seconds=1' %}
|
||||
|
||||
Time Offsets with Custom Format:
|
||||
{% now 'UTC' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}
|
||||
# Output: 2015-12-10 01:33:01
|
||||
|
||||
Weekday Support (for finding next/previous weekday):
|
||||
{% now 'UTC' + 'weekday=0' %} # Next Monday (0=Monday, 6=Sunday)
|
||||
{% now 'UTC' + 'weekday=4' %} # Next Friday
|
||||
|
||||
Configuration:
|
||||
- Default timezone: Set via TZ environment variable or override environment.default_timezone
|
||||
- Default format: '%a, %d %b %Y %H:%M:%S' (can be overridden via environment.datetime_format)
|
||||
|
||||
Environment Customization:
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
|
||||
jinja2_env = create_jinja_env()
|
||||
jinja2_env.default_timezone = 'America/New_York' # Override default timezone
|
||||
jinja2_env.datetime_format = '%Y-%m-%d %H:%M' # Override default format
|
||||
|
||||
Supported Offset Parameters:
|
||||
- years, months, weeks, days
|
||||
- hours, minutes, seconds, microseconds
|
||||
- weekday (0=Monday through 6=Sunday, must be integer)
|
||||
|
||||
Note:
|
||||
This extension uses the Arrow library for timezone-aware datetime handling.
|
||||
All timezone names should be valid IANA timezone identifiers (e.g., 'America/New_York').
|
||||
"""
|
||||
import arrow
|
||||
|
||||
from jinja2 import nodes
|
||||
from jinja2.ext import Extension
|
||||
import os
|
||||
|
||||
class TimeExtension(Extension):
|
||||
"""
|
||||
Jinja2 Extension providing the {% now %} tag for timezone-aware date/time rendering.
|
||||
|
||||
This extension adds two attributes to the Jinja2 environment:
|
||||
- datetime_format: Default strftime format string (default: '%a, %d %b %Y %H:%M:%S')
|
||||
- default_timezone: Default timezone for rendering (default: TZ env var or 'UTC')
|
||||
|
||||
Both can be overridden after environment creation by setting the attributes directly.
|
||||
"""
|
||||
|
||||
tags = {'now'}
|
||||
|
||||
def __init__(self, environment):
|
||||
"""Jinja2 Extension constructor."""
|
||||
super().__init__(environment)
|
||||
|
||||
environment.extend(
|
||||
datetime_format='%a, %d %b %Y %H:%M:%S',
|
||||
default_timezone=os.getenv('TZ', 'UTC').strip()
|
||||
)
|
||||
|
||||
def _datetime(self, timezone, operator, offset, datetime_format):
|
||||
"""
|
||||
Get current datetime with time offset applied.
|
||||
|
||||
Args:
|
||||
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
|
||||
operator: '+' for addition or '-' for subtraction
|
||||
offset: Comma-separated offset parameters (e.g., 'hours=2,minutes=30')
|
||||
datetime_format: strftime format string or None to use environment default
|
||||
|
||||
Returns:
|
||||
Formatted datetime string with offset applied
|
||||
|
||||
Example:
|
||||
_datetime('UTC', '+', 'hours=2,minutes=30', '%Y-%m-%d %H:%M:%S')
|
||||
# Returns current time + 2.5 hours
|
||||
"""
|
||||
# Use default timezone if none specified
|
||||
if not timezone or timezone == '':
|
||||
timezone = self.environment.default_timezone
|
||||
|
||||
d = arrow.now(timezone)
|
||||
|
||||
# parse shift params from offset and include operator
|
||||
shift_params = {}
|
||||
for param in offset.split(','):
|
||||
interval, value = param.split('=')
|
||||
shift_params[interval.strip()] = float(operator + value.strip())
|
||||
|
||||
# Fix weekday parameter can not be float
|
||||
if 'weekday' in shift_params:
|
||||
shift_params['weekday'] = int(shift_params['weekday'])
|
||||
|
||||
d = d.shift(**shift_params)
|
||||
|
||||
if datetime_format is None:
|
||||
datetime_format = self.environment.datetime_format
|
||||
return d.strftime(datetime_format)
|
||||
|
||||
def _now(self, timezone, datetime_format):
|
||||
"""
|
||||
Get current datetime without any offset.
|
||||
|
||||
Args:
|
||||
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
|
||||
datetime_format: strftime format string or None to use environment default
|
||||
|
||||
Returns:
|
||||
Formatted datetime string for current time
|
||||
|
||||
Example:
|
||||
_now('America/New_York', '%Y-%m-%d %H:%M:%S')
|
||||
# Returns current time in New York timezone
|
||||
"""
|
||||
# Use default timezone if none specified
|
||||
if not timezone or timezone == '':
|
||||
timezone = self.environment.default_timezone
|
||||
|
||||
if datetime_format is None:
|
||||
datetime_format = self.environment.datetime_format
|
||||
return arrow.now(timezone).strftime(datetime_format)
|
||||
|
||||
def parse(self, parser):
|
||||
"""
|
||||
Parse the {% now %} tag and generate appropriate AST nodes.
|
||||
|
||||
This method is called by Jinja2 when it encounters a {% now %} tag.
|
||||
It parses the tag syntax and determines whether to call _now() or _datetime()
|
||||
based on whether offset operations (+ or -) are present.
|
||||
|
||||
Supported syntax:
|
||||
{% now 'timezone' %} -> calls _now()
|
||||
{% now 'timezone', 'format' %} -> calls _now()
|
||||
{% now 'timezone' + 'offset' %} -> calls _datetime()
|
||||
{% now 'timezone' + 'offset', 'format' %} -> calls _datetime()
|
||||
{% now 'timezone' - 'offset', 'format' %} -> calls _datetime()
|
||||
|
||||
Args:
|
||||
parser: Jinja2 parser instance
|
||||
|
||||
Returns:
|
||||
nodes.Output: AST output node containing the formatted datetime string
|
||||
"""
|
||||
lineno = next(parser.stream).lineno
|
||||
|
||||
node = parser.parse_expression()
|
||||
|
||||
if parser.stream.skip_if('comma'):
|
||||
datetime_format = parser.parse_expression()
|
||||
else:
|
||||
datetime_format = nodes.Const(None)
|
||||
|
||||
if isinstance(node, nodes.Add):
|
||||
call_method = self.call_method(
|
||||
'_datetime',
|
||||
[node.left, nodes.Const('+'), node.right, datetime_format],
|
||||
lineno=lineno,
|
||||
)
|
||||
elif isinstance(node, nodes.Sub):
|
||||
call_method = self.call_method(
|
||||
'_datetime',
|
||||
[node.left, nodes.Const('-'), node.right, datetime_format],
|
||||
lineno=lineno,
|
||||
)
|
||||
else:
|
||||
call_method = self.call_method(
|
||||
'_now',
|
||||
[node, datetime_format],
|
||||
lineno=lineno,
|
||||
)
|
||||
return nodes.Output([call_method], lineno=lineno)
|
||||
55
changedetectionio/jinja2_custom/safe_jinja.py
Normal file
55
changedetectionio/jinja2_custom/safe_jinja.py
Normal file
@@ -0,0 +1,55 @@
|
||||
"""
|
||||
Safe Jinja2 render with max payload sizes
|
||||
|
||||
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
|
||||
"""
|
||||
|
||||
import jinja2.sandbox
|
||||
import typing as t
|
||||
import os
|
||||
from .extensions.TimeExtension import TimeExtension
|
||||
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
||||
|
||||
# Default extensions - can be overridden in create_jinja_env()
|
||||
DEFAULT_JINJA2_EXTENSIONS = [TimeExtension]
|
||||
|
||||
|
||||
def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment:
|
||||
"""
|
||||
Create a sandboxed Jinja2 environment with our custom extensions and default timezone.
|
||||
|
||||
Args:
|
||||
extensions: List of extension classes to use (defaults to DEFAULT_JINJA2_EXTENSIONS)
|
||||
**kwargs: Additional arguments to pass to ImmutableSandboxedEnvironment
|
||||
|
||||
Returns:
|
||||
Configured Jinja2 environment
|
||||
"""
|
||||
if extensions is None:
|
||||
extensions = DEFAULT_JINJA2_EXTENSIONS
|
||||
|
||||
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(
|
||||
extensions=extensions,
|
||||
**kwargs
|
||||
)
|
||||
|
||||
# Get default timezone from environment variable
|
||||
default_timezone = os.getenv('TZ', 'UTC').strip()
|
||||
jinja2_env.default_timezone = default_timezone
|
||||
|
||||
return jinja2_env
|
||||
|
||||
|
||||
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
|
||||
# (Which also limits available functions that could be called)
|
||||
def render(template_str, **args: t.Any) -> str:
|
||||
jinja2_env = create_jinja_env()
|
||||
output = jinja2_env.from_string(template_str).render(args)
|
||||
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
|
||||
|
||||
def render_fully_escaped(content):
|
||||
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
|
||||
template = env.from_string("{{ some_html|e }}")
|
||||
return template.render(some_html=content)
|
||||
|
||||
@@ -55,11 +55,12 @@ class model(dict):
|
||||
'rss_access_token': None,
|
||||
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
||||
'rss_hide_muted_watches': True,
|
||||
'rss_reader_mode': False,
|
||||
'scheduler_timezone_default': None, # Default IANA timezone name
|
||||
'schema_version' : 0,
|
||||
'shared_diff_access': False,
|
||||
'strip_ignored_lines': False,
|
||||
'tags': {}, #@todo use Tag.model initialisers
|
||||
'timezone': None, # Default IANA timezone name
|
||||
'webdriver_delay': None , # Extra delay in seconds before extracting text
|
||||
'ui': {
|
||||
'use_page_title_in_list': True,
|
||||
|
||||
@@ -1,14 +1,14 @@
|
||||
from blinker import signal
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import watch_base
|
||||
import os
|
||||
import re
|
||||
from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
from .. import safe_jinja
|
||||
from .. import jinja2_custom as safe_jinja
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
# Allowable protocols, protects against javascript: etc
|
||||
@@ -89,9 +89,8 @@ class model(watch_base):
|
||||
ready_url = jinja_render(template_str=url)
|
||||
except Exception as e:
|
||||
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
|
||||
from flask import (
|
||||
flash, Markup, url_for
|
||||
)
|
||||
from flask import flash, url_for
|
||||
from markupsafe import Markup
|
||||
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
|
||||
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
|
||||
flash(message, 'error')
|
||||
|
||||
@@ -5,7 +5,7 @@ from loguru import logger
|
||||
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
||||
|
||||
def process_notification(n_object, datastore):
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
||||
# be sure its registered
|
||||
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||
|
||||
@@ -102,7 +102,7 @@ class difference_detection_processor():
|
||||
self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid'))
|
||||
|
||||
# Tweak the base config with the per-watch ones
|
||||
from changedetectionio.safe_jinja import render as jinja_render
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
request_headers = CaseInsensitiveDict()
|
||||
|
||||
ua = self.datastore.data['settings']['requests'].get('default_ua')
|
||||
|
||||
@@ -64,24 +64,31 @@ class guess_stream_type():
|
||||
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
|
||||
test_content_normalized = re.sub(r'<\s+', '<', test_content)
|
||||
|
||||
# Magic will sometimes call text/plain as text/html!
|
||||
# Use puremagic for lightweight MIME detection (saves ~14MB vs python-magic)
|
||||
magic_result = None
|
||||
try:
|
||||
import magic
|
||||
import puremagic
|
||||
|
||||
mime = magic.from_buffer(content[:200], mime=True) # Send the original content
|
||||
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
|
||||
if mime and "/" in mime:
|
||||
magic_result = mime
|
||||
# Ignore generic/fallback mime types from magic
|
||||
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
|
||||
logger.debug(f"Ignoring generic mime type '{mime}' from magic library")
|
||||
# Trust magic for non-text types immediately
|
||||
elif mime not in ['text/html', 'text/plain']:
|
||||
magic_content_header = mime
|
||||
# puremagic needs bytes, so encode if we have a string
|
||||
content_bytes = content[:200].encode('utf-8') if isinstance(content, str) else content[:200]
|
||||
|
||||
# puremagic returns a list of PureMagic objects with confidence scores
|
||||
detections = puremagic.magic_string(content_bytes)
|
||||
if detections:
|
||||
# Get the highest confidence detection
|
||||
mime = detections[0].mime_type
|
||||
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
|
||||
if mime and "/" in mime:
|
||||
magic_result = mime
|
||||
# Ignore generic/fallback mime types
|
||||
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
|
||||
logger.debug(f"Ignoring generic mime type '{mime}' from puremagic library")
|
||||
# Trust puremagic for non-text types immediately
|
||||
elif mime not in ['text/html', 'text/plain']:
|
||||
magic_content_header = mime
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection")
|
||||
logger.error(f"Error getting a more precise mime type from 'puremagic' library ({str(e)}), using content-based detection")
|
||||
|
||||
# Content-based detection (most reliable for text formats)
|
||||
# Check for HTML patterns first - if found, override magic's text/plain
|
||||
@@ -103,7 +110,7 @@ class guess_stream_type():
|
||||
# magic will call a rss document 'xml'
|
||||
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
|
||||
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
|
||||
self.is_rss = True
|
||||
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
|
||||
# Only mark as generic XML if not already detected as RSS
|
||||
|
||||
@@ -228,8 +228,21 @@ class ContentProcessor:
|
||||
self.datastore = datastore
|
||||
|
||||
def preprocess_rss(self, content):
|
||||
"""Convert CDATA/comments in RSS to usable text."""
|
||||
return cdata_in_document_to_text(html_content=content)
|
||||
"""
|
||||
Convert CDATA/comments in RSS to usable text.
|
||||
|
||||
Supports two RSS processing modes:
|
||||
- 'default': Inline CDATA replacement (original behavior)
|
||||
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
|
||||
"""
|
||||
from changedetectionio import rss_tools
|
||||
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
|
||||
if rss_mode:
|
||||
# Format RSS items nicely with CDATA content unmarked and converted to text
|
||||
return rss_tools.format_rss_items(content)
|
||||
else:
|
||||
# Default: Original inline CDATA replacement
|
||||
return cdata_in_document_to_text(html_content=content)
|
||||
|
||||
def preprocess_pdf(self, raw_content):
|
||||
"""Convert PDF to HTML using external tool."""
|
||||
@@ -384,6 +397,11 @@ class perform_site_check(difference_detection_processor):
|
||||
# RSS preprocessing
|
||||
if stream_content_type.is_rss:
|
||||
content = content_processor.preprocess_rss(content)
|
||||
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
|
||||
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
|
||||
stream_content_type.is_rss = False
|
||||
stream_content_type.is_html = True
|
||||
self.fetcher.content = content
|
||||
|
||||
# PDF preprocessing
|
||||
if watch.is_pdf or stream_content_type.is_pdf:
|
||||
@@ -538,6 +556,20 @@ class perform_site_check(difference_detection_processor):
|
||||
else:
|
||||
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
||||
|
||||
# Note: Explicit cleanup is only needed here because text_json_diff handles
|
||||
# large strings (100KB-300KB for RSS/HTML). The other processors work with
|
||||
# small strings and don't need this.
|
||||
#
|
||||
# Python would clean these up automatically, but explicit `del` frees memory
|
||||
# immediately rather than waiting for function return, reducing peak memory usage.
|
||||
del content
|
||||
if 'html_content' in locals() and html_content is not stripped_text:
|
||||
del html_content
|
||||
if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text:
|
||||
del text_content_before_ignored_filter
|
||||
if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text:
|
||||
del text_for_checksuming
|
||||
|
||||
return changed_detected, update_obj, stripped_text
|
||||
|
||||
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
|
||||
|
||||
130
changedetectionio/rss_tools.py
Normal file
130
changedetectionio/rss_tools.py
Normal file
@@ -0,0 +1,130 @@
|
||||
"""
|
||||
RSS/Atom feed processing tools for changedetection.io
|
||||
"""
|
||||
|
||||
from loguru import logger
|
||||
import re
|
||||
|
||||
|
||||
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
|
||||
"""
|
||||
Process CDATA sections in HTML/XML content - inline replacement.
|
||||
|
||||
Args:
|
||||
html_content: The HTML/XML content to process
|
||||
render_anchor_tag_content: Whether to render anchor tag content
|
||||
|
||||
Returns:
|
||||
Processed HTML/XML content with CDATA sections replaced inline
|
||||
"""
|
||||
from xml.sax.saxutils import escape as xml_escape
|
||||
from .html_tools import html_to_text
|
||||
|
||||
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
|
||||
|
||||
def repl(m):
|
||||
text = m.group(1)
|
||||
return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip()
|
||||
|
||||
return re.sub(pattern, repl, html_content)
|
||||
|
||||
|
||||
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
|
||||
"""
|
||||
Format RSS/Atom feed items in a readable text format using feedparser.
|
||||
|
||||
Converts RSS <item> or Atom <entry> elements to formatted text with:
|
||||
- <title> → <h1>Title</h1>
|
||||
- <link> → Link: [url]
|
||||
- <guid> → Guid: [id]
|
||||
- <pubDate> → PubDate: [date]
|
||||
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
|
||||
|
||||
Args:
|
||||
rss_content: The RSS/Atom feed content
|
||||
render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility)
|
||||
|
||||
Returns:
|
||||
Formatted HTML content ready for html_to_text conversion
|
||||
"""
|
||||
try:
|
||||
import feedparser
|
||||
from xml.sax.saxutils import escape as xml_escape
|
||||
|
||||
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
|
||||
feed = feedparser.parse(rss_content)
|
||||
|
||||
formatted_items = []
|
||||
|
||||
# Determine feed type for appropriate labels when fields are missing
|
||||
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
|
||||
is_atom = feed.version and 'atom' in feed.version
|
||||
|
||||
for entry in feed.entries:
|
||||
item_parts = []
|
||||
|
||||
# Title - feedparser handles CDATA and entity unescaping automatically
|
||||
if hasattr(entry, 'title') and entry.title:
|
||||
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
|
||||
|
||||
# Link
|
||||
if hasattr(entry, 'link') and entry.link:
|
||||
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
|
||||
|
||||
# GUID/ID
|
||||
if hasattr(entry, 'id') and entry.id:
|
||||
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
|
||||
|
||||
# Date - feedparser normalizes all date field names to 'published'
|
||||
if hasattr(entry, 'published') and entry.published:
|
||||
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
|
||||
|
||||
# Description/Content - feedparser handles CDATA and entity unescaping automatically
|
||||
# Only add "Summary:" label for Atom <summary> tags
|
||||
content = None
|
||||
add_label = False
|
||||
|
||||
if hasattr(entry, 'content') and entry.content:
|
||||
# Atom <content> - no label, just content
|
||||
content = entry.content[0].value if entry.content[0].value else None
|
||||
elif hasattr(entry, 'summary'):
|
||||
# Could be RSS <description> or Atom <summary>
|
||||
# feedparser maps both to entry.summary
|
||||
content = entry.summary if entry.summary else None
|
||||
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
|
||||
if is_atom:
|
||||
add_label = True
|
||||
|
||||
# Add content with or without label
|
||||
if content:
|
||||
if add_label:
|
||||
item_parts.append(f'Summary:<br>{content}')
|
||||
else:
|
||||
item_parts.append(content)
|
||||
else:
|
||||
# No content - just show <none>
|
||||
item_parts.append('<none>')
|
||||
|
||||
# Join all parts of this item
|
||||
if item_parts:
|
||||
formatted_items.append('\n'.join(item_parts))
|
||||
|
||||
# Wrap each item in a div with classes (first, last, item-N)
|
||||
items_html = []
|
||||
total_items = len(formatted_items)
|
||||
for idx, item in enumerate(formatted_items):
|
||||
classes = ['rss-item']
|
||||
if idx == 0:
|
||||
classes.append('first')
|
||||
if idx == total_items - 1:
|
||||
classes.append('last')
|
||||
classes.append(f'item-{idx + 1}')
|
||||
|
||||
class_str = ' '.join(classes)
|
||||
items_html.append(f'<div class="{class_str}">{item}</div>')
|
||||
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Error formatting RSS items: {str(e)}")
|
||||
# Fall back to original content
|
||||
return rss_content
|
||||
@@ -1,24 +0,0 @@
|
||||
"""
|
||||
Safe Jinja2 render with max payload sizes
|
||||
|
||||
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
|
||||
"""
|
||||
|
||||
import jinja2.sandbox
|
||||
import typing as t
|
||||
import os
|
||||
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
||||
|
||||
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
|
||||
# (Which also limits available functions that could be called)
|
||||
def render(template_str, **args: t.Any) -> str:
|
||||
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
|
||||
output = jinja2_env.from_string(template_str).render(args)
|
||||
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
|
||||
|
||||
def render_fully_escaped(content):
|
||||
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
|
||||
template = env.from_string("{{ some_html|e }}")
|
||||
return template.render(some_html=content)
|
||||
|
||||
@@ -29,7 +29,7 @@ $(document).ready(function () {
|
||||
$(this).text(new Date($(this).data("utc")).toLocaleString());
|
||||
})
|
||||
|
||||
const timezoneInput = $('#application-timezone');
|
||||
const timezoneInput = $('#application-scheduler_timezone_default');
|
||||
if(timezoneInput.length) {
|
||||
const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
|
||||
if (!timezoneInput.val().trim()) {
|
||||
|
||||
@@ -344,7 +344,7 @@ label {
|
||||
}
|
||||
}
|
||||
|
||||
#notification-customisation {
|
||||
.grey-form-border {
|
||||
border: 1px solid var(--color-border-notification);
|
||||
padding: 0.5rem;
|
||||
border-radius: 5px;
|
||||
|
||||
File diff suppressed because one or more lines are too long
@@ -976,6 +976,10 @@ class ChangeDetectionStore:
|
||||
if self.data['settings']['application'].get('extract_title_as_title'):
|
||||
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
|
||||
|
||||
def update_21(self):
|
||||
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
|
||||
del self.data['settings']['application']['timezone']
|
||||
|
||||
|
||||
def add_notification_url(self, notification_url):
|
||||
|
||||
|
||||
@@ -33,7 +33,7 @@
|
||||
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
|
||||
</div>
|
||||
</div>
|
||||
<div id="notification-customisation" class="pure-control-group">
|
||||
<div class="pure-control-group grey-form-border">
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
|
||||
<span class="pure-form-message-inline">Title for all notifications</span>
|
||||
|
||||
@@ -14,13 +14,31 @@
|
||||
{% if field.errors is mapping and 'form' in field.errors %}
|
||||
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
|
||||
{% set errors = field.errors['form'] %}
|
||||
{% for error in errors %}
|
||||
<li>{{ error }}</li>
|
||||
{% endfor %}
|
||||
{% elif field.type == 'FieldList' %}
|
||||
{# Handle FieldList of FormFields - errors is a list of dicts, one per entry #}
|
||||
{% for idx, entry_errors in field.errors|enumerate %}
|
||||
{% if entry_errors is mapping and entry_errors %}
|
||||
{# Only show entries that have actual errors #}
|
||||
<li><strong>Entry {{ idx + 1 }}:</strong>
|
||||
<ul>
|
||||
{% for field_name, messages in entry_errors.items() %}
|
||||
{% for message in messages %}
|
||||
<li>{{ field_name }}: {{ message }}</li>
|
||||
{% endfor %}
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</li>
|
||||
{% endif %}
|
||||
{% endfor %}
|
||||
{% else %}
|
||||
{# regular list of errors with this field #}
|
||||
{% set errors = field.errors %}
|
||||
{% for error in field.errors %}
|
||||
<li>{{ error }}</li>
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
{% for error in errors %}
|
||||
<li>{{ error }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% endif %}
|
||||
</div>
|
||||
@@ -93,6 +111,39 @@
|
||||
{{ field(**kwargs)|safe }}
|
||||
{% endmacro %}
|
||||
|
||||
{% macro render_fieldlist_with_inline_errors(fieldlist) %}
|
||||
{# Specialized macro for FieldList(FormField(...)) that renders errors inline with each field #}
|
||||
<div {% if fieldlist.errors %} class="error" {% endif %}>{{ fieldlist.label }}</div>
|
||||
<div {% if fieldlist.errors %} class="error" {% endif %}>
|
||||
<ul id="{{ fieldlist.id }}">
|
||||
{% for entry in fieldlist %}
|
||||
<li {% if entry.errors %} class="error" {% endif %}>
|
||||
<label for="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>{{ fieldlist.label.text }}-{{ loop.index0 }}</label>
|
||||
<table id="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>
|
||||
<tbody>
|
||||
{% for subfield in entry %}
|
||||
<tr {% if subfield.errors %} class="error" {% endif %}>
|
||||
<th {% if subfield.errors %} class="error" {% endif %}><label for="{{ subfield.id }}" {% if subfield.errors %} class="error" {% endif %}>{{ subfield.label.text }}</label></th>
|
||||
<td {% if subfield.errors %} class="error" {% endif %}>
|
||||
{{ subfield(**kwargs)|safe }}
|
||||
{% if subfield.errors %}
|
||||
<ul class="errors">
|
||||
{% for error in subfield.errors %}
|
||||
<li class="error">{{ error }}</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
{% endif %}
|
||||
</td>
|
||||
</tr>
|
||||
{% endfor %}
|
||||
</tbody>
|
||||
</table>
|
||||
</li>
|
||||
{% endfor %}
|
||||
</ul>
|
||||
</div>
|
||||
{% endmacro %}
|
||||
|
||||
{% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
|
||||
<div class="fieldlist_formfields" id="{{ table_id }}">
|
||||
<div class="fieldlist-header">
|
||||
|
||||
@@ -4,6 +4,7 @@ import time
|
||||
from threading import Thread
|
||||
|
||||
import pytest
|
||||
import arrow
|
||||
from changedetectionio import changedetection_app
|
||||
from changedetectionio import store
|
||||
import os
|
||||
@@ -29,6 +30,17 @@ def reportlog(pytestconfig):
|
||||
logger.remove(handler_id)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def environment(mocker):
|
||||
"""Mock arrow.now() to return a fixed datetime for testing jinja2 time extension."""
|
||||
# Fixed datetime: Wed, 09 Dec 2015 23:33:01 UTC
|
||||
# This is calculated to match the test expectations when offsets are applied
|
||||
fixed_datetime = arrow.Arrow(2015, 12, 9, 23, 33, 1, tzinfo='UTC')
|
||||
# Patch arrow.now in the TimeExtension module where it's actually used
|
||||
mocker.patch('changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now', return_value=fixed_datetime)
|
||||
return fixed_datetime
|
||||
|
||||
|
||||
def format_memory_human(bytes_value):
|
||||
"""Format memory in human-readable units (KB, MB, GB)"""
|
||||
if bytes_value < 1024:
|
||||
|
||||
@@ -49,3 +49,39 @@ def test_select_custom(client, live_server, measure_memory_usage):
|
||||
#
|
||||
# Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default
|
||||
|
||||
|
||||
def test_custom_proxy_validation(client, live_server, measure_memory_usage):
|
||||
# live_server_setup(live_server) # Setup on conftest per function
|
||||
|
||||
# Goto settings, add our custom one
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={
|
||||
"requests-time_between_check-minutes": 180,
|
||||
"application-ignore_whitespace": "y",
|
||||
"application-fetch_backend": 'html_requests',
|
||||
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
|
||||
"requests-extra_proxies-0-proxy_url": "xxxxhtt/333??p://test:awesome@squid-custom:3128",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." not in res.data
|
||||
assert b'Proxy URLs must start with' in res.data
|
||||
|
||||
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={
|
||||
"requests-time_between_check-minutes": 180,
|
||||
"application-ignore_whitespace": "y",
|
||||
"application-fetch_backend": 'html_requests',
|
||||
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
|
||||
"requests-extra_proxies-0-proxy_url": "https://",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." not in res.data
|
||||
assert b"Invalid URL." in res.data
|
||||
|
||||
@@ -165,6 +165,46 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
||||
# Cleanup everything
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
|
||||
def test_requests_timeout(client, live_server, measure_memory_usage):
|
||||
delay = 2
|
||||
test_url = url_for('test_endpoint', delay=delay, _external=True)
|
||||
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-ui-use_page_title_in_list": "",
|
||||
"requests-time_between_check-minutes": 180,
|
||||
"requests-timeout": delay - 1,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Add our URL to the import page
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# requests takes >2 sec but we timeout at 1 second
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'Read timed out. (read timeout=1)' in res.data
|
||||
|
||||
##### Now set a longer timeout
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-ui-use_page_title_in_list": "",
|
||||
"requests-time_between_check-minutes": 180,
|
||||
"requests-timeout": delay + 1, # timeout should be a second more than the reply time
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert b'Read timed out' not in res.data
|
||||
|
||||
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
|
||||
"""
|
||||
|
||||
@@ -338,4 +378,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
|
||||
assert b'<string name="feed_update_receiver_name"' in res.data
|
||||
assert b'<foobar' not in res.data
|
||||
|
||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
@@ -1,8 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import time
|
||||
import arrow
|
||||
from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks
|
||||
from ..jinja2_custom import render
|
||||
|
||||
|
||||
# def test_setup(client, live_server, measure_memory_usage):
|
||||
@@ -33,6 +35,35 @@ def test_jinja2_in_url_query(client, live_server, measure_memory_usage):
|
||||
)
|
||||
assert b'date=2' in res.data
|
||||
|
||||
# Test for issue #1493 - jinja2-time offset functionality
|
||||
def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usage):
|
||||
"""Test that jinja2 time offset expressions work in watch URLs (issue #1493)."""
|
||||
|
||||
# Add our URL to the import page with time offset expression
|
||||
test_url = url_for('test_return_query', _external=True)
|
||||
|
||||
# Test the exact syntax from issue #1493 that was broken in jinja2-time
|
||||
# This should work now with our custom TimeExtension
|
||||
full_url = "{}?{}".format(test_url,
|
||||
"timestamp={% now 'utc' - 'minutes=11', '%Y-%m-%d %H:%M' %}", )
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": full_url, "tags": "test"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Watch added" in res.data
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Verify the URL was processed correctly (should not have errors)
|
||||
res = client.get(
|
||||
url_for("ui.ui_views.preview_page", uuid="first"),
|
||||
follow_redirects=True
|
||||
)
|
||||
# Should have a valid timestamp in the response
|
||||
assert b'timestamp=' in res.data
|
||||
# Should not have template error
|
||||
assert b'Invalid template' not in res.data
|
||||
|
||||
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
|
||||
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
||||
|
||||
@@ -56,3 +87,86 @@ def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
||||
assert b'is invalid and cannot be used' in res.data
|
||||
# Some of the spewed output from the subclasses
|
||||
assert b'dict_values' not in res.data
|
||||
|
||||
def test_timezone(mocker):
|
||||
"""Verify that timezone is parsed."""
|
||||
|
||||
timezone = 'America/Buenos_Aires'
|
||||
currentDate = arrow.now(timezone)
|
||||
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
|
||||
arrowNowMock.return_value = currentDate
|
||||
finalRender = render(f"{{% now '{timezone}' %}}")
|
||||
|
||||
assert finalRender == currentDate.strftime('%a, %d %b %Y %H:%M:%S')
|
||||
|
||||
def test_format(mocker):
|
||||
"""Verify that format is parsed."""
|
||||
|
||||
timezone = 'utc'
|
||||
format = '%d %b %Y %H:%M:%S'
|
||||
currentDate = arrow.now(timezone)
|
||||
arrowNowMock = mocker.patch("arrow.now")
|
||||
arrowNowMock.return_value = currentDate
|
||||
finalRender = render(f"{{% now '{timezone}', '{format}' %}}")
|
||||
|
||||
assert finalRender == currentDate.strftime(format)
|
||||
|
||||
def test_add_time(environment):
|
||||
"""Verify that added time offset can be parsed."""
|
||||
|
||||
finalRender = render("{% now 'utc' + 'hours=2,seconds=30' %}")
|
||||
|
||||
assert finalRender == "Thu, 10 Dec 2015 01:33:31"
|
||||
|
||||
def test_add_weekday(mocker):
|
||||
"""Verify that added weekday offset can be parsed."""
|
||||
|
||||
timezone = 'utc'
|
||||
currentDate = arrow.now(timezone)
|
||||
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
|
||||
arrowNowMock.return_value = currentDate
|
||||
finalRender = render(f"{{% now '{timezone}' + 'weekday=1' %}}")
|
||||
|
||||
assert finalRender == currentDate.shift(weekday=1).strftime('%a, %d %b %Y %H:%M:%S')
|
||||
|
||||
|
||||
def test_substract_time(environment):
|
||||
"""Verify that substracted time offset can be parsed."""
|
||||
|
||||
finalRender = render("{% now 'utc' - 'minutes=11' %}")
|
||||
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||
|
||||
|
||||
def test_offset_with_format(environment):
|
||||
"""Verify that offset works together with datetime format."""
|
||||
|
||||
finalRender = render(
|
||||
"{% now 'utc' - 'days=2,minutes=33,seconds=1', '%d %b %Y %H:%M:%S' %}"
|
||||
)
|
||||
|
||||
assert finalRender == "07 Dec 2015 23:00:00"
|
||||
|
||||
def test_default_timezone_empty_string(environment):
|
||||
"""Verify that empty timezone string uses the default timezone (UTC in test environment)."""
|
||||
|
||||
# Empty string should use the default timezone which is 'UTC' (or from application settings)
|
||||
finalRender = render("{% now '' %}")
|
||||
|
||||
# Should render with default format and UTC timezone (matches environment fixture)
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:33:01"
|
||||
|
||||
def test_default_timezone_with_offset(environment):
|
||||
"""Verify that empty timezone works with offset operations."""
|
||||
|
||||
# Empty string with offset should use default timezone
|
||||
finalRender = render("{% now '' + 'hours=2', '%d %b %Y %H:%M:%S' %}")
|
||||
|
||||
assert finalRender == "10 Dec 2015 01:33:01"
|
||||
|
||||
def test_default_timezone_subtraction(environment):
|
||||
"""Verify that empty timezone works with subtraction offset."""
|
||||
|
||||
finalRender = render("{% now '' - 'minutes=11' %}")
|
||||
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||
@@ -2,7 +2,8 @@
|
||||
# coding=utf-8
|
||||
|
||||
import time
|
||||
from flask import url_for, escape
|
||||
from flask import url_for
|
||||
from markupsafe import escape
|
||||
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
import pytest
|
||||
jq_support = True
|
||||
|
||||
98
changedetectionio/tests/test_rss_reader_mode.py
Normal file
98
changedetectionio/tests/test_rss_reader_mode.py
Normal file
@@ -0,0 +1,98 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
import time
|
||||
from flask import url_for
|
||||
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
|
||||
extract_UUID_from_client, delete_all_watches
|
||||
|
||||
|
||||
def set_original_cdata_xml():
|
||||
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
|
||||
<channel>
|
||||
<title>Security Bulletins on wetscale</title>
|
||||
<link>https://wetscale.com/security-bulletins/</link>
|
||||
<description>Recent security bulletins from wetscale</description>
|
||||
<lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate>
|
||||
<docs>https://validator.w3.org/feed/docs/rss2.html</docs>
|
||||
<generator>wetscale.com</generator>
|
||||
<language>en-US</language>
|
||||
<copyright>© 2025 wetscale Inc. All rights reserved.</copyright>
|
||||
<atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/>
|
||||
<item>
|
||||
<title>TS-2025-005</title>
|
||||
<link>https://wetscale.com/security-bulletins/#ts-2025-005</link>
|
||||
<guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid>
|
||||
<pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate>
|
||||
<description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description>
|
||||
</item>
|
||||
|
||||
|
||||
<item>
|
||||
<title>TS-2025-004</title>
|
||||
<link>https://wetscale.com/security-bulletins/#ts-2025-004</link>
|
||||
<guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid>
|
||||
<pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate>
|
||||
<description>
|
||||
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
|
||||
</description>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>
|
||||
"""
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write(test_return_data)
|
||||
|
||||
|
||||
|
||||
def test_rss_reader_mode(client, live_server, measure_memory_usage):
|
||||
set_original_cdata_xml()
|
||||
|
||||
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
|
||||
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
|
||||
|
||||
|
||||
# Add our URL to the import page
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
assert 'Wet noodles escape' in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
assert 'The days of Terminator and The Matrix' in snapshot_contents
|
||||
assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage):
|
||||
set_original_cdata_xml()
|
||||
|
||||
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
|
||||
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
|
||||
|
||||
|
||||
# Add our URL to the import page
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
assert 'Wet noodles escape' not in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
assert 'The days of Terminator and The Matrix' in snapshot_contents
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -24,7 +24,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-empty_pages_are_a_change": "",
|
||||
"requests-time_between_check-seconds": 1,
|
||||
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
@@ -119,7 +119,7 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
|
||||
|
||||
data = {
|
||||
"application-empty_pages_are_a_change": "",
|
||||
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||
'application-fetch_backend': "html_requests",
|
||||
"requests-time_between_check-hours": 0,
|
||||
"requests-time_between_check-minutes": 0,
|
||||
|
||||
@@ -4,7 +4,7 @@
|
||||
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
|
||||
|
||||
import unittest
|
||||
from changedetectionio import safe_jinja
|
||||
from changedetectionio import jinja2_custom as safe_jinja
|
||||
|
||||
|
||||
# mostly
|
||||
|
||||
@@ -6,7 +6,7 @@
|
||||
import unittest
|
||||
import os
|
||||
|
||||
from changedetectionio.processors import restock_diff
|
||||
import changedetectionio.processors.restock_diff.processor as restock_diff
|
||||
|
||||
# mostly
|
||||
class TestDiffBuilder(unittest.TestCase):
|
||||
|
||||
@@ -1,11 +1,10 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
# run from dir above changedetectionio/ dir
|
||||
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
|
||||
# python3 -m unittest changedetectionio.tests.unit.test_scheduler
|
||||
|
||||
import unittest
|
||||
from datetime import datetime, timedelta
|
||||
from zoneinfo import ZoneInfo
|
||||
import arrow
|
||||
|
||||
class TestScheduler(unittest.TestCase):
|
||||
|
||||
@@ -13,12 +12,13 @@ class TestScheduler(unittest.TestCase):
|
||||
# UTC-12:00 (Baker Island, Howland Island) is the farthest behind, always one calendar day behind UTC.
|
||||
|
||||
def test_timezone_basic_time_within_schedule(self):
|
||||
"""Test that current time is detected as within schedule window."""
|
||||
from changedetectionio import time_handler
|
||||
|
||||
timezone_str = 'Europe/Berlin'
|
||||
debug_datetime = datetime.now(ZoneInfo(timezone_str))
|
||||
day_of_week = debug_datetime.strftime('%A')
|
||||
time_str = str(debug_datetime.hour)+':00'
|
||||
debug_datetime = arrow.now(timezone_str)
|
||||
day_of_week = debug_datetime.format('dddd')
|
||||
time_str = debug_datetime.format('HH:00')
|
||||
duration = 60 # minutes
|
||||
|
||||
# The current time should always be within 60 minutes of [time_hour]:00
|
||||
@@ -30,16 +30,17 @@ class TestScheduler(unittest.TestCase):
|
||||
self.assertEqual(result, True, f"{debug_datetime} is within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
||||
|
||||
def test_timezone_basic_time_outside_schedule(self):
|
||||
"""Test that time from yesterday is outside current schedule."""
|
||||
from changedetectionio import time_handler
|
||||
|
||||
timezone_str = 'Europe/Berlin'
|
||||
# We try a date in the future..
|
||||
debug_datetime = datetime.now(ZoneInfo(timezone_str))+ timedelta(days=-1)
|
||||
day_of_week = debug_datetime.strftime('%A')
|
||||
time_str = str(debug_datetime.hour) + ':00'
|
||||
duration = 60*24 # minutes
|
||||
# We try a date in the past (yesterday)
|
||||
debug_datetime = arrow.now(timezone_str).shift(days=-1)
|
||||
day_of_week = debug_datetime.format('dddd')
|
||||
time_str = debug_datetime.format('HH:00')
|
||||
duration = 60 * 24 # minutes
|
||||
|
||||
# The current time should always be within 60 minutes of [time_hour]:00
|
||||
# The current time should NOT be within yesterday's schedule
|
||||
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
@@ -48,6 +49,58 @@ class TestScheduler(unittest.TestCase):
|
||||
self.assertNotEqual(result, True,
|
||||
f"{debug_datetime} is NOT within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
||||
|
||||
def test_timezone_utc_within_schedule(self):
|
||||
"""Test UTC timezone works correctly."""
|
||||
from changedetectionio import time_handler
|
||||
|
||||
timezone_str = 'UTC'
|
||||
debug_datetime = arrow.now(timezone_str)
|
||||
day_of_week = debug_datetime.format('dddd')
|
||||
time_str = debug_datetime.format('HH:00')
|
||||
duration = 120 # minutes
|
||||
|
||||
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration)
|
||||
|
||||
self.assertTrue(result, "Current time should be within UTC schedule")
|
||||
|
||||
def test_timezone_extreme_ahead(self):
|
||||
"""Test with UTC+14 timezone (Line Islands, Kiribati)."""
|
||||
from changedetectionio import time_handler
|
||||
|
||||
timezone_str = 'Pacific/Kiritimati' # UTC+14
|
||||
debug_datetime = arrow.now(timezone_str)
|
||||
day_of_week = debug_datetime.format('dddd')
|
||||
time_str = debug_datetime.format('HH:00')
|
||||
duration = 60
|
||||
|
||||
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration)
|
||||
|
||||
self.assertTrue(result, "Should work with extreme ahead timezone")
|
||||
|
||||
def test_timezone_extreme_behind(self):
|
||||
"""Test with UTC-12 timezone (Baker Island)."""
|
||||
from changedetectionio import time_handler
|
||||
|
||||
# Using Etc/GMT+12 which is UTC-12 (confusing, but that's how it works)
|
||||
timezone_str = 'Etc/GMT+12' # UTC-12
|
||||
debug_datetime = arrow.now(timezone_str)
|
||||
day_of_week = debug_datetime.format('dddd')
|
||||
time_str = debug_datetime.format('HH:00')
|
||||
duration = 60
|
||||
|
||||
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration)
|
||||
|
||||
self.assertTrue(result, "Should work with extreme behind timezone")
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
||||
138
changedetectionio/tests/unit/test_time_extension.py
Normal file
138
changedetectionio/tests/unit/test_time_extension.py
Normal file
@@ -0,0 +1,138 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Simple unit tests for TimeExtension that mimic how safe_jinja.py uses it.
|
||||
These tests demonstrate that the environment.default_timezone override works
|
||||
exactly as intended in the actual application code.
|
||||
"""
|
||||
|
||||
import arrow
|
||||
from jinja2.sandbox import ImmutableSandboxedEnvironment
|
||||
from changedetectionio.jinja2_custom.extensions.TimeExtension import TimeExtension
|
||||
|
||||
|
||||
def test_default_timezone_override_like_safe_jinja(mocker):
|
||||
"""
|
||||
Test that mirrors exactly how safe_jinja.py uses the TimeExtension.
|
||||
This is the simplest demonstration that environment.default_timezone works.
|
||||
"""
|
||||
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
# Override the default timezone - exactly like safe_jinja.py does
|
||||
jinja2_env.default_timezone = 'America/New_York'
|
||||
|
||||
# Mock arrow.now to return a fixed time
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='America/New_York')
|
||||
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Use empty string timezone - should use the overridden default
|
||||
template_str = "{% now '' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# Verify arrow.now was called with the overridden timezone
|
||||
mock.assert_called_with('America/New_York')
|
||||
assert '2025' in output
|
||||
assert 'Jan' in output
|
||||
|
||||
|
||||
def test_default_timezone_not_overridden(mocker):
|
||||
"""
|
||||
Test that without override, the default 'UTC' from __init__ is used.
|
||||
"""
|
||||
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
# DON'T override - should use 'UTC' default
|
||||
|
||||
# Mock arrow.now
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 17, 0, 0, tzinfo='UTC')
|
||||
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Use empty string timezone - should use 'UTC' default
|
||||
template_str = "{% now '' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# Verify arrow.now was called with 'UTC'
|
||||
mock.assert_called_with('UTC')
|
||||
assert '2025' in output
|
||||
|
||||
|
||||
def test_datetime_format_override_like_safe_jinja(mocker):
|
||||
"""
|
||||
Test that environment.datetime_format can be overridden after creation.
|
||||
"""
|
||||
# Create environment (default format is '%a, %d %b %Y %H:%M:%S')
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
# Override the datetime format
|
||||
jinja2_env.datetime_format = '%Y-%m-%d %H:%M:%S'
|
||||
|
||||
# Mock arrow.now
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 14, 30, 45, tzinfo='UTC')
|
||||
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Don't specify format - should use overridden default
|
||||
template_str = "{% now 'UTC' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# Should use custom format YYYY-MM-DD HH:MM:SS
|
||||
assert output == '2025-01-15 14:30:45'
|
||||
|
||||
|
||||
def test_offset_with_overridden_timezone(mocker):
|
||||
"""
|
||||
Test that offset operations also respect the overridden default_timezone.
|
||||
"""
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
# Override to use Europe/London
|
||||
jinja2_env.default_timezone = 'Europe/London'
|
||||
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 10, 0, 0, tzinfo='Europe/London')
|
||||
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Use offset with empty timezone string
|
||||
template_str = "{% now '' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# Should have called arrow.now with Europe/London
|
||||
mock.assert_called_with('Europe/London')
|
||||
# Should be 10:00 + 2 hours = 12:00
|
||||
assert output == '2025-01-15 12:00:00'
|
||||
|
||||
|
||||
def test_weekday_parameter_converted_to_int(mocker):
|
||||
"""
|
||||
Test that weekday parameter is properly converted from float to int.
|
||||
This is important because arrow.shift() requires weekday as int, not float.
|
||||
"""
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
# Wednesday, Jan 15, 2025
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='UTC')
|
||||
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Add offset to next Monday (weekday=0)
|
||||
template_str = "{% now 'UTC' + 'weekday=0', '%A' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# Should be Monday
|
||||
assert output == 'Monday'
|
||||
|
||||
|
||||
def test_multiple_offset_parameters(mocker):
|
||||
"""
|
||||
Test that multiple offset parameters can be combined in one expression.
|
||||
"""
|
||||
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||
|
||||
fixed_time = arrow.Arrow(2025, 1, 15, 10, 30, 45, tzinfo='UTC')
|
||||
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||
|
||||
# Test multiple parameters: days, hours, minutes, seconds
|
||||
template_str = "{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10', '%Y-%m-%d %H:%M:%S' %}"
|
||||
output = jinja2_env.from_string(template_str).render()
|
||||
|
||||
# 2025-01-15 10:30:45 + 1 day + 2 hours + 15 minutes + 10 seconds
|
||||
# = 2025-01-16 12:45:55
|
||||
assert output == '2025-01-16 12:45:55'
|
||||
429
changedetectionio/tests/unit/test_time_handler.py
Normal file
429
changedetectionio/tests/unit/test_time_handler.py
Normal file
@@ -0,0 +1,429 @@
|
||||
#!/usr/bin/env python3
|
||||
|
||||
"""
|
||||
Comprehensive tests for time_handler module refactored to use arrow.
|
||||
|
||||
Run from project root:
|
||||
python3 -m pytest changedetectionio/tests/unit/test_time_handler.py -v
|
||||
"""
|
||||
|
||||
import unittest
|
||||
import arrow
|
||||
from changedetectionio import time_handler
|
||||
|
||||
|
||||
class TestAmIInsideTime(unittest.TestCase):
|
||||
"""Tests for the am_i_inside_time function."""
|
||||
|
||||
def test_current_time_within_schedule(self):
|
||||
"""Test that current time is detected as within schedule."""
|
||||
# Get current time in a specific timezone
|
||||
timezone_str = 'Europe/Berlin'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = now.format('HH:00') # Current hour, 0 minutes
|
||||
duration = 60 # 60 minutes
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result, f"Current time should be within {duration} minute window starting at {time_str}")
|
||||
|
||||
def test_current_time_outside_schedule(self):
|
||||
"""Test that time in the past is not within current schedule."""
|
||||
timezone_str = 'Europe/Berlin'
|
||||
# Get yesterday's date
|
||||
yesterday = arrow.now(timezone_str).shift(days=-1)
|
||||
day_of_week = yesterday.format('dddd')
|
||||
time_str = yesterday.format('HH:mm')
|
||||
duration = 30 # Only 30 minutes
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertFalse(result, "Yesterday's time should not be within current schedule")
|
||||
|
||||
def test_timezone_pacific_within_schedule(self):
|
||||
"""Test with US/Pacific timezone."""
|
||||
timezone_str = 'US/Pacific'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = now.format('HH:00')
|
||||
duration = 120 # 2 hours
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_timezone_tokyo_within_schedule(self):
|
||||
"""Test with Asia/Tokyo timezone."""
|
||||
timezone_str = 'Asia/Tokyo'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = now.format('HH:00')
|
||||
duration = 90 # 1.5 hours
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_schedule_crossing_midnight(self):
|
||||
"""Test schedule that crosses midnight."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
|
||||
# Set schedule to start 23:30 with 120 minute duration (crosses midnight)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = "23:30"
|
||||
duration = 120 # 2 hours - goes into next day
|
||||
|
||||
# If we're at 00:15 the next day, we should still be in the schedule
|
||||
if now.hour == 0 and now.minute < 30:
|
||||
# We're in the time window that spilled over from yesterday
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
# This might be true or false depending on exact time
|
||||
self.assertIsInstance(result, bool)
|
||||
|
||||
def test_invalid_day_of_week(self):
|
||||
"""Test that invalid day raises ValueError."""
|
||||
with self.assertRaises(ValueError) as context:
|
||||
time_handler.am_i_inside_time(
|
||||
day_of_week="Funday",
|
||||
time_str="12:00",
|
||||
timezone_str="UTC",
|
||||
duration=60
|
||||
)
|
||||
self.assertIn("Invalid day_of_week", str(context.exception))
|
||||
|
||||
def test_invalid_time_format(self):
|
||||
"""Test that invalid time format raises ValueError."""
|
||||
with self.assertRaises(ValueError) as context:
|
||||
time_handler.am_i_inside_time(
|
||||
day_of_week="Monday",
|
||||
time_str="25:99",
|
||||
timezone_str="UTC",
|
||||
duration=60
|
||||
)
|
||||
self.assertIn("Invalid time_str", str(context.exception))
|
||||
|
||||
def test_invalid_time_format_non_numeric(self):
|
||||
"""Test that non-numeric time raises ValueError."""
|
||||
with self.assertRaises(ValueError) as context:
|
||||
time_handler.am_i_inside_time(
|
||||
day_of_week="Monday",
|
||||
time_str="twelve:thirty",
|
||||
timezone_str="UTC",
|
||||
duration=60
|
||||
)
|
||||
self.assertIn("Invalid time_str", str(context.exception))
|
||||
|
||||
def test_invalid_timezone(self):
|
||||
"""Test that invalid timezone raises ValueError."""
|
||||
with self.assertRaises(ValueError) as context:
|
||||
time_handler.am_i_inside_time(
|
||||
day_of_week="Monday",
|
||||
time_str="12:00",
|
||||
timezone_str="Invalid/Timezone",
|
||||
duration=60
|
||||
)
|
||||
self.assertIn("Invalid timezone_str", str(context.exception))
|
||||
|
||||
def test_short_duration(self):
|
||||
"""Test with very short duration (15 minutes default)."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = now.format('HH:mm')
|
||||
duration = 15 # Default duration
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result, "Current time should be within 15 minute window")
|
||||
|
||||
def test_long_duration(self):
|
||||
"""Test with long duration (24 hours)."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
# Set time to current hour
|
||||
time_str = now.format('HH:00')
|
||||
duration = 1440 # 24 hours in minutes
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result, "Current time should be within 24 hour window")
|
||||
|
||||
def test_case_insensitive_day(self):
|
||||
"""Test that day of week is case insensitive."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd').lower() # lowercase day
|
||||
time_str = now.format('HH:00')
|
||||
duration = 60
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
self.assertTrue(result, "Lowercase day should work")
|
||||
|
||||
def test_edge_case_midnight(self):
|
||||
"""Test edge case at exactly midnight."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = "00:00"
|
||||
duration = 60
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
# Should be true if we're in the first hour of the day
|
||||
if now.hour == 0:
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_edge_case_end_of_day(self):
|
||||
"""Test edge case near end of day."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
day_of_week = now.format('dddd')
|
||||
time_str = "23:45"
|
||||
duration = 30 # 30 minutes crosses midnight
|
||||
|
||||
result = time_handler.am_i_inside_time(
|
||||
day_of_week=day_of_week,
|
||||
time_str=time_str,
|
||||
timezone_str=timezone_str,
|
||||
duration=duration
|
||||
)
|
||||
|
||||
# Result depends on current time
|
||||
self.assertIsInstance(result, bool)
|
||||
|
||||
|
||||
class TestIsWithinSchedule(unittest.TestCase):
|
||||
"""Tests for the is_within_schedule function."""
|
||||
|
||||
def test_schedule_disabled(self):
|
||||
"""Test that disabled schedule returns False."""
|
||||
time_schedule_limit = {'enabled': False}
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_schedule_none(self):
|
||||
"""Test that None schedule returns False."""
|
||||
result = time_handler.is_within_schedule(None)
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_schedule_empty_dict(self):
|
||||
"""Test that empty dict returns False."""
|
||||
result = time_handler.is_within_schedule({})
|
||||
self.assertFalse(result)
|
||||
|
||||
def test_schedule_enabled_but_day_disabled(self):
|
||||
"""Test schedule enabled but current day disabled."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
current_day = now.format('dddd').lower()
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': False,
|
||||
'start_time': '09:00',
|
||||
'duration': {'hours': 8, 'minutes': 0}
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertFalse(result, "Disabled day should return False")
|
||||
|
||||
def test_schedule_enabled_within_time(self):
|
||||
"""Test schedule enabled and within time window."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
current_day = now.format('dddd').lower()
|
||||
current_hour = now.format('HH:00')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': current_hour,
|
||||
'duration': {'hours': 2, 'minutes': 0}
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertTrue(result, "Current time should be within schedule")
|
||||
|
||||
def test_schedule_enabled_outside_time(self):
|
||||
"""Test schedule enabled but outside time window."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
current_day = now.format('dddd').lower()
|
||||
# Set time to 3 hours ago
|
||||
past_time = now.shift(hours=-3).format('HH:mm')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': past_time,
|
||||
'duration': {'hours': 1, 'minutes': 0} # Only 1 hour duration
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertFalse(result, "3 hours ago with 1 hour duration should be False")
|
||||
|
||||
def test_schedule_with_default_timezone(self):
|
||||
"""Test schedule without timezone uses default."""
|
||||
now = arrow.now('America/New_York')
|
||||
current_day = now.format('dddd').lower()
|
||||
current_hour = now.format('HH:00')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
# No timezone specified
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': current_hour,
|
||||
'duration': {'hours': 2, 'minutes': 0}
|
||||
}
|
||||
}
|
||||
|
||||
# Should use default UTC, but since we're testing with NY time,
|
||||
# the result depends on time difference
|
||||
result = time_handler.is_within_schedule(
|
||||
time_schedule_limit,
|
||||
default_tz='America/New_York'
|
||||
)
|
||||
self.assertTrue(result, "Should work with default timezone")
|
||||
|
||||
def test_schedule_different_timezones(self):
|
||||
"""Test schedule works correctly across different timezones."""
|
||||
# Test with Tokyo timezone
|
||||
timezone_str = 'Asia/Tokyo'
|
||||
now = arrow.now(timezone_str)
|
||||
current_day = now.format('dddd').lower()
|
||||
current_hour = now.format('HH:00')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': current_hour,
|
||||
'duration': {'hours': 1, 'minutes': 30}
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertTrue(result)
|
||||
|
||||
def test_schedule_with_minutes_in_duration(self):
|
||||
"""Test schedule with minutes specified in duration."""
|
||||
timezone_str = 'UTC'
|
||||
now = arrow.now(timezone_str)
|
||||
current_day = now.format('dddd').lower()
|
||||
current_time = now.format('HH:mm')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': current_time,
|
||||
'duration': {'hours': 0, 'minutes': 45}
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertTrue(result, "Should handle minutes in duration")
|
||||
|
||||
def test_schedule_with_timezone_whitespace(self):
|
||||
"""Test that timezone with whitespace is handled."""
|
||||
timezone_str = ' UTC '
|
||||
now = arrow.now('UTC')
|
||||
current_day = now.format('dddd').lower()
|
||||
current_hour = now.format('HH:00')
|
||||
|
||||
time_schedule_limit = {
|
||||
'enabled': True,
|
||||
'timezone': timezone_str,
|
||||
current_day: {
|
||||
'enabled': True,
|
||||
'start_time': current_hour,
|
||||
'duration': {'hours': 1, 'minutes': 0}
|
||||
}
|
||||
}
|
||||
|
||||
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||
self.assertTrue(result, "Should handle timezone with whitespace")
|
||||
|
||||
|
||||
class TestWeekdayEnum(unittest.TestCase):
|
||||
"""Tests for the Weekday enum."""
|
||||
|
||||
def test_weekday_values(self):
|
||||
"""Test that weekday enum has correct values."""
|
||||
self.assertEqual(time_handler.Weekday.Monday, 0)
|
||||
self.assertEqual(time_handler.Weekday.Tuesday, 1)
|
||||
self.assertEqual(time_handler.Weekday.Wednesday, 2)
|
||||
self.assertEqual(time_handler.Weekday.Thursday, 3)
|
||||
self.assertEqual(time_handler.Weekday.Friday, 4)
|
||||
self.assertEqual(time_handler.Weekday.Saturday, 5)
|
||||
self.assertEqual(time_handler.Weekday.Sunday, 6)
|
||||
|
||||
def test_weekday_string_access(self):
|
||||
"""Test accessing weekday enum by string."""
|
||||
self.assertEqual(time_handler.Weekday['Monday'], 0)
|
||||
self.assertEqual(time_handler.Weekday['Sunday'], 6)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
@@ -188,6 +188,10 @@ def new_live_server_setup(live_server):
|
||||
ctype = request.args.get('content_type')
|
||||
status_code = request.args.get('status_code')
|
||||
content = request.args.get('content') or None
|
||||
delay = int(request.args.get('delay', 0))
|
||||
|
||||
if delay:
|
||||
time.sleep(delay)
|
||||
|
||||
# Used to just try to break the header detection
|
||||
uppercase_headers = request.args.get('uppercase_headers')
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from datetime import timedelta, datetime
|
||||
import arrow
|
||||
from enum import IntEnum
|
||||
from zoneinfo import ZoneInfo
|
||||
|
||||
|
||||
class Weekday(IntEnum):
|
||||
@@ -40,54 +39,65 @@ def am_i_inside_time(
|
||||
|
||||
# Parse the start time
|
||||
try:
|
||||
target_time = datetime.strptime(time_str, '%H:%M').time()
|
||||
except ValueError:
|
||||
hour, minute = map(int, time_str.split(':'))
|
||||
if not (0 <= hour <= 23 and 0 <= minute <= 59):
|
||||
raise ValueError
|
||||
except (ValueError, AttributeError):
|
||||
raise ValueError(f"Invalid time_str: '{time_str}'. Must be in 'HH:MM' format.")
|
||||
|
||||
# Define the timezone
|
||||
try:
|
||||
tz = ZoneInfo(timezone_str)
|
||||
except Exception:
|
||||
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
|
||||
|
||||
# Get the current time in the specified timezone
|
||||
now_tz = datetime.now(tz)
|
||||
try:
|
||||
now_tz = arrow.now(timezone_str.strip())
|
||||
except Exception as e:
|
||||
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
|
||||
|
||||
# Check if the current day matches the target day or overlaps due to duration
|
||||
current_weekday = now_tz.weekday()
|
||||
start_datetime_tz = datetime.combine(now_tz.date(), target_time, tzinfo=tz)
|
||||
# Create start datetime for today in target timezone
|
||||
start_datetime_tz = now_tz.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
||||
|
||||
# Handle previous day's overlap
|
||||
if target_weekday == (current_weekday - 1) % 7:
|
||||
# Calculate start and end times for the overlap from the previous day
|
||||
start_datetime_tz -= timedelta(days=1)
|
||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
||||
start_datetime_tz = start_datetime_tz.shift(days=-1)
|
||||
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||
if start_datetime_tz <= now_tz < end_datetime_tz:
|
||||
return True
|
||||
|
||||
# Handle current day's range
|
||||
if target_weekday == current_weekday:
|
||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
||||
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||
if start_datetime_tz <= now_tz < end_datetime_tz:
|
||||
return True
|
||||
|
||||
# Handle next day's overlap
|
||||
if target_weekday == (current_weekday + 1) % 7:
|
||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
||||
if now_tz < start_datetime_tz and now_tz + timedelta(days=1) < end_datetime_tz:
|
||||
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||
if now_tz < start_datetime_tz and now_tz.shift(days=1) < end_datetime_tz:
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def is_within_schedule(time_schedule_limit, default_tz="UTC"):
|
||||
"""
|
||||
Check if the current time is within a scheduled time window.
|
||||
|
||||
Parameters:
|
||||
time_schedule_limit (dict): Schedule configuration with timezone, day settings, etc.
|
||||
default_tz (str): Default timezone to use if not specified. Default is 'UTC'.
|
||||
|
||||
Returns:
|
||||
bool: True if current time is within the schedule, False otherwise.
|
||||
"""
|
||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||
# Get the timezone the time schedule is in, so we know what day it is there
|
||||
tz_name = time_schedule_limit.get('timezone')
|
||||
if not tz_name:
|
||||
tz_name = default_tz
|
||||
|
||||
now_day_name_in_tz = datetime.now(ZoneInfo(tz_name.strip())).strftime('%A')
|
||||
# Get current day name in the target timezone
|
||||
now_day_name_in_tz = arrow.now(tz_name.strip()).format('dddd')
|
||||
selected_day_schedule = time_schedule_limit.get(now_day_name_in_tz.lower())
|
||||
if not selected_day_schedule.get('enabled'):
|
||||
return False
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
|
||||
feedgen~=0.9
|
||||
feedparser~=6.0 # For parsing RSS/Atom feeds
|
||||
flask-compress
|
||||
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
|
||||
flask-login>=0.6.3
|
||||
@@ -9,7 +10,7 @@ flask_restful
|
||||
flask_cors # For the Chrome extension to operate
|
||||
janus # Thread-safe async/sync queue bridge
|
||||
flask_wtf~=1.2
|
||||
flask~=2.3
|
||||
flask~=3.1
|
||||
flask-socketio~=5.5.1
|
||||
python-socketio~=5.13.0
|
||||
python-engineio~=4.12.3
|
||||
@@ -66,13 +67,9 @@ elementpath==4.1.5
|
||||
|
||||
selenium~=4.31.0
|
||||
|
||||
# https://github.com/pallets/werkzeug/issues/2985
|
||||
# Maybe related to pytest?
|
||||
werkzeug==3.0.6
|
||||
|
||||
# Templating, so far just in the URLs but in the future can be for the notifications also
|
||||
jinja2~=3.1
|
||||
jinja2-time
|
||||
arrow
|
||||
openpyxl
|
||||
# https://peps.python.org/pep-0508/#environment-markers
|
||||
# https://github.com/dgtlmoon/changedetection.io/pull/1009
|
||||
@@ -88,6 +85,7 @@ pyppeteerstealth>=0.0.4
|
||||
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
|
||||
pytest ~=7.2
|
||||
pytest-flask ~=1.2
|
||||
pytest-mock ~=3.15
|
||||
|
||||
# Anything 4.0 and up but not 5.0
|
||||
jsonschema ~= 4.0
|
||||
@@ -124,8 +122,9 @@ price-parser
|
||||
|
||||
# flask_socket_io - incorrect package name, already have flask-socketio above
|
||||
|
||||
# So far for detecting correct favicon type, but for other things in the future
|
||||
python-magic
|
||||
# Lightweight MIME type detection (saves ~14MB memory vs python-magic/libmagic)
|
||||
# Used for detecting correct favicon type and content-type detection
|
||||
puremagic
|
||||
|
||||
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
|
||||
tzdata
|
||||
|
||||
Reference in New Issue
Block a user