mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-27 11:53:21 +00:00
Compare commits
12 Commits
0.50.21
...
flask-upda
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b2f07adc59 | ||
|
|
804e8c249b | ||
|
|
b26333b01c | ||
|
|
46252bc6f3 | ||
|
|
64350a2e78 | ||
|
|
2902c63a3b | ||
|
|
55b8588f1f | ||
|
|
02ecc4ae9a | ||
|
|
3ee50b7832 | ||
|
|
66ddd87ee4 | ||
|
|
233189e4f7 | ||
|
|
b237fd7201 |
@@ -253,6 +253,30 @@ jobs:
|
|||||||
docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt
|
docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt
|
||||||
docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt
|
docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt
|
||||||
|
|
||||||
|
- name: Extract and display memory test report
|
||||||
|
if: always()
|
||||||
|
run: |
|
||||||
|
# Extract test-memory.log from the container
|
||||||
|
echo "Extracting test-memory.log from container..."
|
||||||
|
docker cp test-cdio-basic-tests:/app/changedetectionio/test-memory.log output-logs/test-memory-${{ env.PYTHON_VERSION }}.log || echo "test-memory.log not found in container"
|
||||||
|
|
||||||
|
# Display the memory log contents for immediate visibility in workflow output
|
||||||
|
echo "=== Top 10 Highest Peak Memory Tests ==="
|
||||||
|
if [ -f output-logs/test-memory-${{ env.PYTHON_VERSION }}.log ]; then
|
||||||
|
# Sort by peak memory value (extract number before MB and sort numerically, reverse order)
|
||||||
|
grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | \
|
||||||
|
sed 's/.*Peak memory: //' | \
|
||||||
|
paste -d'|' - <(grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log) | \
|
||||||
|
sort -t'|' -k1 -nr | \
|
||||||
|
cut -d'|' -f2 | \
|
||||||
|
head -10
|
||||||
|
echo ""
|
||||||
|
echo "=== Full Memory Test Report ==="
|
||||||
|
cat output-logs/test-memory-${{ env.PYTHON_VERSION }}.log
|
||||||
|
else
|
||||||
|
echo "No memory log available"
|
||||||
|
fi
|
||||||
|
|
||||||
- name: Store everything including test-datastore
|
- name: Store everything including test-datastore
|
||||||
if: always()
|
if: always()
|
||||||
uses: actions/upload-artifact@v4
|
uses: actions/upload-artifact@v4
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ recursive-include changedetectionio/api *
|
|||||||
recursive-include changedetectionio/blueprint *
|
recursive-include changedetectionio/blueprint *
|
||||||
recursive-include changedetectionio/conditions *
|
recursive-include changedetectionio/conditions *
|
||||||
recursive-include changedetectionio/content_fetchers *
|
recursive-include changedetectionio/content_fetchers *
|
||||||
|
recursive-include changedetectionio/jinja2_custom *
|
||||||
recursive-include changedetectionio/model *
|
recursive-include changedetectionio/model *
|
||||||
recursive-include changedetectionio/notification *
|
recursive-include changedetectionio/notification *
|
||||||
recursive-include changedetectionio/processors *
|
recursive-include changedetectionio/processors *
|
||||||
|
|||||||
@@ -2,7 +2,7 @@
|
|||||||
|
|
||||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||||
|
|
||||||
__version__ = '0.50.21'
|
__version__ = '0.50.23'
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from json.decoder import JSONDecodeError
|
from json.decoder import JSONDecodeError
|
||||||
|
|||||||
@@ -334,6 +334,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
|
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
|
||||||
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
|
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
|
||||||
|
|
||||||
|
# Explicitly delete large content variables to free memory IMMEDIATELY after saving
|
||||||
|
# These are no longer needed after being saved to history
|
||||||
|
del contents
|
||||||
|
|
||||||
# Send notifications on second+ check
|
# Send notifications on second+ check
|
||||||
if watch.history_n >= 2:
|
if watch.history_n >= 2:
|
||||||
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
|
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
|
||||||
@@ -372,6 +376,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
|
datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
|
||||||
'check_count': count})
|
'check_count': count})
|
||||||
|
|
||||||
|
# NOW clear fetcher content - after all processing is complete
|
||||||
|
# This is the last point where we need the fetcher data
|
||||||
|
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||||
|
update_handler.fetcher.clear_content()
|
||||||
|
logger.debug(f"Cleared fetcher content for UUID {uuid}")
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
|
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
|
||||||
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
|
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
|
||||||
@@ -392,7 +402,28 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
|
|||||||
#logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}")
|
#logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}")
|
||||||
watch_check_update.send(watch_uuid=watch['uuid'])
|
watch_check_update.send(watch_uuid=watch['uuid'])
|
||||||
|
|
||||||
update_handler = None
|
# Explicitly clean up update_handler and all its references
|
||||||
|
if update_handler:
|
||||||
|
# Clear fetcher content using the proper method
|
||||||
|
if hasattr(update_handler, 'fetcher') and update_handler.fetcher:
|
||||||
|
update_handler.fetcher.clear_content()
|
||||||
|
|
||||||
|
# Clear processor references
|
||||||
|
if hasattr(update_handler, 'content_processor'):
|
||||||
|
update_handler.content_processor = None
|
||||||
|
|
||||||
|
update_handler = None
|
||||||
|
|
||||||
|
# Clear local contents variable if it still exists
|
||||||
|
if 'contents' in locals():
|
||||||
|
del contents
|
||||||
|
|
||||||
|
# Note: We don't set watch = None here because:
|
||||||
|
# 1. watch is just a local reference to datastore.data['watching'][uuid]
|
||||||
|
# 2. Setting it to None doesn't affect the datastore
|
||||||
|
# 3. GC can't collect the object anyway (still referenced by datastore)
|
||||||
|
# 4. It would just cause confusion
|
||||||
|
|
||||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||||
except Exception as cleanup_error:
|
except Exception as cleanup_error:
|
||||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||||
|
|||||||
@@ -6,7 +6,7 @@ from loguru import logger
|
|||||||
|
|
||||||
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT
|
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT
|
||||||
from changedetectionio.content_fetchers.base import manage_user_agent
|
from changedetectionio.content_fetchers.base import manage_user_agent
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
def long_task(uuid, preferred_proxy):
|
def long_task(uuid, preferred_proxy):
|
||||||
import time
|
import time
|
||||||
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
|
from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
|
|
||||||
status = {'status': '', 'length': 0, 'text': ''}
|
status = {'status': '', 'length': 0, 'text': ''}
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,5 @@
|
|||||||
|
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
from changedetectionio.store import ChangeDetectionStore
|
from changedetectionio.store import ChangeDetectionStore
|
||||||
from feedgen.feed import FeedGenerator
|
from feedgen.feed import FeedGenerator
|
||||||
from flask import Blueprint, make_response, request, url_for, redirect
|
from flask import Blueprint, make_response, request, url_for, redirect
|
||||||
|
|||||||
@@ -119,7 +119,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
hide_remove_pass=os.getenv("SALTED_PASS", False),
|
hide_remove_pass=os.getenv("SALTED_PASS", False),
|
||||||
min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)),
|
min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)),
|
||||||
settings_application=datastore.data['settings']['application'],
|
settings_application=datastore.data['settings']['application'],
|
||||||
timezone_default_config=datastore.data['settings']['application'].get('timezone'),
|
timezone_default_config=datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||||
utc_time=utc_time,
|
utc_time=utc_time,
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
@@ -1,7 +1,7 @@
|
|||||||
{% extends 'base.html' %}
|
{% extends 'base.html' %}
|
||||||
|
|
||||||
{% block content %}
|
{% block content %}
|
||||||
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %}
|
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %}
|
||||||
{% from '_common_fields.html' import render_common_settings_form %}
|
{% from '_common_fields.html' import render_common_settings_form %}
|
||||||
<script>
|
<script>
|
||||||
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
|
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
|
||||||
@@ -89,15 +89,6 @@
|
|||||||
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
|
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
{% if form.requests.proxy %}
|
|
||||||
<div class="pure-control-group inline-radio">
|
|
||||||
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
|
||||||
<span class="pure-form-message-inline">
|
|
||||||
Choose a default proxy for all watches
|
|
||||||
</span>
|
|
||||||
</div>
|
|
||||||
{% endif %}
|
|
||||||
</fieldset>
|
</fieldset>
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
@@ -140,6 +131,10 @@
|
|||||||
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
|
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
|
||||||
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
|
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="pure-control-group">
|
||||||
|
{{ render_field(form.requests.form.timeout) }}
|
||||||
|
<span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br>
|
||||||
|
</div>
|
||||||
<div class="pure-control-group inline-radio">
|
<div class="pure-control-group inline-radio">
|
||||||
{{ render_field(form.requests.form.default_ua) }}
|
{{ render_field(form.requests.form.default_ua) }}
|
||||||
<span class="pure-form-message-inline">
|
<span class="pure-form-message-inline">
|
||||||
@@ -243,7 +238,7 @@ nav
|
|||||||
<p><strong>UTC Time & Date from Server:</strong> <span id="utc-time" >{{ utc_time }}</span></p>
|
<p><strong>UTC Time & Date from Server:</strong> <span id="utc-time" >{{ utc_time }}</span></p>
|
||||||
<p><strong>Local Time & Date in Browser:</strong> <span class="local-time" data-utc="{{ utc_time }}"></span></p>
|
<p><strong>Local Time & Date in Browser:</strong> <span class="local-time" data-utc="{{ utc_time }}"></span></p>
|
||||||
<p>
|
<p>
|
||||||
{{ render_field(form.application.form.timezone) }}
|
{{ render_field(form.application.form.scheduler_timezone_default) }}
|
||||||
<datalist id="timezones" style="display: none;">
|
<datalist id="timezones" style="display: none;">
|
||||||
{% for tz_name in available_timezones %}
|
{% for tz_name in available_timezones %}
|
||||||
<option value="{{ tz_name }}">{{ tz_name }}</option>
|
<option value="{{ tz_name }}">{{ tz_name }}</option>
|
||||||
@@ -321,17 +316,27 @@ nav
|
|||||||
<p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites.
|
<p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites.
|
||||||
|
|
||||||
<div class="pure-control-group" id="extra-proxies-setting">
|
<div class="pure-control-group" id="extra-proxies-setting">
|
||||||
{{ render_field(form.requests.form.extra_proxies) }}
|
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }}
|
||||||
<span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br>
|
<span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br>
|
||||||
<span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span>
|
<span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span>
|
||||||
|
{% if form.requests.proxy %}
|
||||||
|
<div>
|
||||||
|
<br>
|
||||||
|
<div class="inline-radio">
|
||||||
|
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
||||||
|
<span class="pure-form-message-inline">Choose a default proxy for all watches</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
<div class="pure-control-group" id="extra-browsers-setting">
|
<div class="pure-control-group" id="extra-browsers-setting">
|
||||||
<p>
|
<p>
|
||||||
<span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br>
|
<span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br>
|
||||||
<span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span>
|
<span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span>
|
||||||
</p>
|
</p>
|
||||||
{{ render_field(form.requests.form.extra_browsers) }}
|
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }}
|
||||||
</div>
|
</div>
|
||||||
|
|
||||||
</div>
|
</div>
|
||||||
<div id="actions">
|
<div id="actions">
|
||||||
<div class="pure-control-group">
|
<div class="pure-control-group">
|
||||||
|
|||||||
@@ -187,7 +187,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
|
|
||||||
tz_name = time_schedule_limit.get('timezone')
|
tz_name = time_schedule_limit.get('timezone')
|
||||||
if not tz_name:
|
if not tz_name:
|
||||||
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
|
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
|
||||||
|
|
||||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||||
try:
|
try:
|
||||||
@@ -257,7 +257,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||||
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||||
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
|
'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'),
|
||||||
'using_global_webdriver_wait': not default['webdriver_delay'],
|
'using_global_webdriver_wait': not default['webdriver_delay'],
|
||||||
'uuid': uuid,
|
'uuid': uuid,
|
||||||
'watch': watch,
|
'watch': watch,
|
||||||
|
|||||||
@@ -64,6 +64,19 @@ class Fetcher():
|
|||||||
# Time ONTOP of the system defined env minimum time
|
# Time ONTOP of the system defined env minimum time
|
||||||
render_extract_delay = 0
|
render_extract_delay = 0
|
||||||
|
|
||||||
|
def clear_content(self):
|
||||||
|
"""
|
||||||
|
Explicitly clear all content from memory to free up heap space.
|
||||||
|
Call this after content has been saved to disk.
|
||||||
|
"""
|
||||||
|
self.content = None
|
||||||
|
if hasattr(self, 'raw_content'):
|
||||||
|
self.raw_content = None
|
||||||
|
self.screenshot = None
|
||||||
|
self.xpath_data = None
|
||||||
|
# Keep headers and status_code as they're small
|
||||||
|
logger.trace("Fetcher content cleared from memory")
|
||||||
|
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def get_error(self):
|
def get_error(self):
|
||||||
return self.error
|
return self.error
|
||||||
@@ -128,7 +141,7 @@ class Fetcher():
|
|||||||
async def iterate_browser_steps(self, start_url=None):
|
async def iterate_browser_steps(self, start_url=None):
|
||||||
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
|
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
|
||||||
from playwright._impl._errors import TimeoutError, Error
|
from playwright._impl._errors import TimeoutError, Error
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
step_n = 0
|
step_n = 0
|
||||||
|
|
||||||
if self.browser_steps is not None and len(self.browser_steps):
|
if self.browser_steps is not None and len(self.browser_steps):
|
||||||
|
|||||||
@@ -51,6 +51,7 @@ class fetcher(Fetcher):
|
|||||||
|
|
||||||
session = requests.Session()
|
session = requests.Session()
|
||||||
|
|
||||||
|
|
||||||
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
|
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
|
||||||
from requests_file import FileAdapter
|
from requests_file import FileAdapter
|
||||||
session.mount('file://', FileAdapter())
|
session.mount('file://', FileAdapter())
|
||||||
|
|||||||
@@ -795,7 +795,7 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
else:
|
else:
|
||||||
time_schedule_limit = watch.get('time_schedule_limit')
|
time_schedule_limit = watch.get('time_schedule_limit')
|
||||||
logger.trace(f"{uuid} Time scheduler - Using watch settings (not global settings)")
|
logger.trace(f"{uuid} Time scheduler - Using watch settings (not global settings)")
|
||||||
tz_name = datastore.data['settings']['application'].get('timezone', 'UTC')
|
tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip())
|
||||||
|
|
||||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||||
try:
|
try:
|
||||||
|
|||||||
@@ -487,9 +487,8 @@ class ValidateJinja2Template(object):
|
|||||||
"""
|
"""
|
||||||
def __call__(self, form, field):
|
def __call__(self, form, field):
|
||||||
from changedetectionio import notification
|
from changedetectionio import notification
|
||||||
|
from changedetectionio.jinja2_custom import create_jinja_env
|
||||||
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
|
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
|
||||||
from jinja2.sandbox import ImmutableSandboxedEnvironment
|
|
||||||
from jinja2.meta import find_undeclared_variables
|
from jinja2.meta import find_undeclared_variables
|
||||||
import jinja2.exceptions
|
import jinja2.exceptions
|
||||||
|
|
||||||
@@ -497,9 +496,11 @@ class ValidateJinja2Template(object):
|
|||||||
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
|
joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader, extensions=['jinja2_time.TimeExtension'])
|
# Use the shared helper to create a properly configured environment
|
||||||
|
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||||
|
|
||||||
|
# Add notification tokens for validation
|
||||||
jinja2_env.globals.update(notification.valid_tokens)
|
jinja2_env.globals.update(notification.valid_tokens)
|
||||||
# Extra validation tokens provided on the form_class(... extra_tokens={}) setup
|
|
||||||
if hasattr(field, 'extra_notification_tokens'):
|
if hasattr(field, 'extra_notification_tokens'):
|
||||||
jinja2_env.globals.update(field.extra_notification_tokens)
|
jinja2_env.globals.update(field.extra_notification_tokens)
|
||||||
|
|
||||||
@@ -511,6 +512,7 @@ class ValidateJinja2Template(object):
|
|||||||
except jinja2.exceptions.SecurityError as e:
|
except jinja2.exceptions.SecurityError as e:
|
||||||
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
|
raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e
|
||||||
|
|
||||||
|
# Check for undeclared variables
|
||||||
ast = jinja2_env.parse(joined_data)
|
ast = jinja2_env.parse(joined_data)
|
||||||
undefined = ", ".join(find_undeclared_variables(ast))
|
undefined = ", ".join(find_undeclared_variables(ast))
|
||||||
if undefined:
|
if undefined:
|
||||||
@@ -678,6 +680,51 @@ class ValidateCSSJSONXPATHInput(object):
|
|||||||
except:
|
except:
|
||||||
raise ValidationError("A system-error occurred when validating your jq expression")
|
raise ValidationError("A system-error occurred when validating your jq expression")
|
||||||
|
|
||||||
|
class ValidateSimpleURL:
|
||||||
|
"""Validate that the value can be parsed by urllib.parse.urlparse() and has a scheme/netloc."""
|
||||||
|
def __init__(self, message=None):
|
||||||
|
self.message = message or "Invalid URL."
|
||||||
|
|
||||||
|
def __call__(self, form, field):
|
||||||
|
data = (field.data or "").strip()
|
||||||
|
if not data:
|
||||||
|
return # empty is OK — pair with validators.Optional()
|
||||||
|
from urllib.parse import urlparse
|
||||||
|
|
||||||
|
parsed = urlparse(data)
|
||||||
|
if not parsed.scheme or not parsed.netloc:
|
||||||
|
raise ValidationError(self.message)
|
||||||
|
|
||||||
|
class ValidateStartsWithRegex(object):
|
||||||
|
def __init__(self, regex, *, flags=0, message=None, allow_empty=True, split_lines=True):
|
||||||
|
# compile with given flags (we’ll pass re.IGNORECASE below)
|
||||||
|
self.pattern = re.compile(regex, flags) if isinstance(regex, str) else regex
|
||||||
|
self.message = message
|
||||||
|
self.allow_empty = allow_empty
|
||||||
|
self.split_lines = split_lines
|
||||||
|
|
||||||
|
def __call__(self, form, field):
|
||||||
|
data = field.data
|
||||||
|
if not data:
|
||||||
|
return
|
||||||
|
|
||||||
|
# normalize into list of lines
|
||||||
|
if isinstance(data, str) and self.split_lines:
|
||||||
|
lines = data.splitlines()
|
||||||
|
elif isinstance(data, (list, tuple)):
|
||||||
|
lines = data
|
||||||
|
else:
|
||||||
|
lines = [data]
|
||||||
|
|
||||||
|
for line in lines:
|
||||||
|
stripped = line.strip()
|
||||||
|
if not stripped:
|
||||||
|
if self.allow_empty:
|
||||||
|
continue
|
||||||
|
raise ValidationError(self.message or "Empty value not allowed.")
|
||||||
|
if not self.pattern.match(stripped):
|
||||||
|
raise ValidationError(self.message or "Invalid value.")
|
||||||
|
|
||||||
class quickWatchForm(Form):
|
class quickWatchForm(Form):
|
||||||
from . import processors
|
from . import processors
|
||||||
|
|
||||||
@@ -705,7 +752,7 @@ class commonSettingsForm(Form):
|
|||||||
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
|
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||||
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
|
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
|
||||||
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
|
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
|
||||||
timezone = StringField("Timezone for watch schedule", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
|
scheduler_timezone_default = StringField("Default timezone for watch check scheduler", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
|
||||||
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
|
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
|
||||||
|
|
||||||
|
|
||||||
@@ -795,7 +842,7 @@ class processor_text_json_diff_form(commonSettingsForm):
|
|||||||
if not super().validate():
|
if not super().validate():
|
||||||
return False
|
return False
|
||||||
|
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
result = True
|
result = True
|
||||||
|
|
||||||
# Fail form validation when a body is set for a GET
|
# Fail form validation when a body is set for a GET
|
||||||
@@ -858,23 +905,36 @@ class processor_text_json_diff_form(commonSettingsForm):
|
|||||||
):
|
):
|
||||||
super().__init__(formdata, obj, prefix, data, meta, **kwargs)
|
super().__init__(formdata, obj, prefix, data, meta, **kwargs)
|
||||||
if kwargs and kwargs.get('default_system_settings'):
|
if kwargs and kwargs.get('default_system_settings'):
|
||||||
default_tz = kwargs.get('default_system_settings').get('application', {}).get('timezone')
|
default_tz = kwargs.get('default_system_settings').get('application', {}).get('scheduler_timezone_default')
|
||||||
if default_tz:
|
if default_tz:
|
||||||
self.time_schedule_limit.form.timezone.render_kw['placeholder'] = default_tz
|
self.time_schedule_limit.form.timezone.render_kw['placeholder'] = default_tz
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
class SingleExtraProxy(Form):
|
class SingleExtraProxy(Form):
|
||||||
|
|
||||||
# maybe better to set some <script>var..
|
# maybe better to set some <script>var..
|
||||||
proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
||||||
proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
|
proxy_url = StringField('Proxy URL', [
|
||||||
# @todo do the validation here instead
|
validators.Optional(),
|
||||||
|
ValidateStartsWithRegex(
|
||||||
|
regex=r'^(https?|socks5)://', # ✅ main pattern
|
||||||
|
flags=re.IGNORECASE, # ✅ makes it case-insensitive
|
||||||
|
message='Proxy URLs must start with http://, https:// or socks5://',
|
||||||
|
),
|
||||||
|
ValidateSimpleURL()
|
||||||
|
], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
|
||||||
|
|
||||||
class SingleExtraBrowser(Form):
|
class SingleExtraBrowser(Form):
|
||||||
browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
|
||||||
browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
|
browser_connection_url = StringField('Browser connection URL', [
|
||||||
# @todo do the validation here instead
|
validators.Optional(),
|
||||||
|
ValidateStartsWithRegex(
|
||||||
|
regex=r'^(wss?|ws)://',
|
||||||
|
flags=re.IGNORECASE,
|
||||||
|
message='Browser URLs must start with wss:// or ws://'
|
||||||
|
),
|
||||||
|
ValidateSimpleURL()
|
||||||
|
], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
|
||||||
|
|
||||||
class DefaultUAInputForm(Form):
|
class DefaultUAInputForm(Form):
|
||||||
html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
|
html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
|
||||||
@@ -885,7 +945,7 @@ class DefaultUAInputForm(Form):
|
|||||||
class globalSettingsRequestForm(Form):
|
class globalSettingsRequestForm(Form):
|
||||||
time_between_check = RequiredFormField(TimeBetweenCheckForm)
|
time_between_check = RequiredFormField(TimeBetweenCheckForm)
|
||||||
time_schedule_limit = FormField(ScheduleLimitForm)
|
time_schedule_limit = FormField(ScheduleLimitForm)
|
||||||
proxy = RadioField('Proxy')
|
proxy = RadioField('Default proxy')
|
||||||
jitter_seconds = IntegerField('Random jitter seconds ± check',
|
jitter_seconds = IntegerField('Random jitter seconds ± check',
|
||||||
render_kw={"style": "width: 5em;"},
|
render_kw={"style": "width: 5em;"},
|
||||||
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
|
||||||
@@ -894,7 +954,12 @@ class globalSettingsRequestForm(Form):
|
|||||||
render_kw={"style": "width: 5em;"},
|
render_kw={"style": "width: 5em;"},
|
||||||
validators=[validators.NumberRange(min=1, max=50,
|
validators=[validators.NumberRange(min=1, max=50,
|
||||||
message="Should be between 1 and 50")])
|
message="Should be between 1 and 50")])
|
||||||
|
|
||||||
|
timeout = IntegerField('Requests timeout in seconds',
|
||||||
|
render_kw={"style": "width: 5em;"},
|
||||||
|
validators=[validators.NumberRange(min=1, max=999,
|
||||||
|
message="Should be between 1 and 999")])
|
||||||
|
|
||||||
extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
|
extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5)
|
||||||
extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)
|
extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5)
|
||||||
|
|
||||||
|
|||||||
20
changedetectionio/jinja2_custom/__init__.py
Normal file
20
changedetectionio/jinja2_custom/__init__.py
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
"""
|
||||||
|
Jinja2 custom extensions and safe rendering utilities.
|
||||||
|
"""
|
||||||
|
from .extensions.TimeExtension import TimeExtension
|
||||||
|
from .safe_jinja import (
|
||||||
|
render,
|
||||||
|
render_fully_escaped,
|
||||||
|
create_jinja_env,
|
||||||
|
JINJA2_MAX_RETURN_PAYLOAD_SIZE,
|
||||||
|
DEFAULT_JINJA2_EXTENSIONS,
|
||||||
|
)
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
'TimeExtension',
|
||||||
|
'render',
|
||||||
|
'render_fully_escaped',
|
||||||
|
'create_jinja_env',
|
||||||
|
'JINJA2_MAX_RETURN_PAYLOAD_SIZE',
|
||||||
|
'DEFAULT_JINJA2_EXTENSIONS',
|
||||||
|
]
|
||||||
221
changedetectionio/jinja2_custom/extensions/TimeExtension.py
Normal file
221
changedetectionio/jinja2_custom/extensions/TimeExtension.py
Normal file
@@ -0,0 +1,221 @@
|
|||||||
|
"""
|
||||||
|
Jinja2 TimeExtension - Custom date/time handling for templates.
|
||||||
|
|
||||||
|
This extension provides the {% now %} tag for Jinja2 templates, offering timezone-aware
|
||||||
|
date/time formatting with support for time offsets.
|
||||||
|
|
||||||
|
Why This Extension Exists:
|
||||||
|
The Arrow library has a now() function (arrow.now()), but Jinja2 templates cannot
|
||||||
|
directly call Python functions - they need extensions or filters to expose functionality.
|
||||||
|
|
||||||
|
This TimeExtension serves as a Jinja2-to-Arrow bridge that:
|
||||||
|
|
||||||
|
1. Makes Arrow accessible in templates - Jinja2 requires registering functions/tags
|
||||||
|
through extensions. You cannot use arrow.now() directly in a template.
|
||||||
|
|
||||||
|
2. Provides template-friendly syntax - Instead of complex Python code, you get clean tags:
|
||||||
|
{% now 'UTC' %}
|
||||||
|
{% now 'UTC' + 'hours=2' %}
|
||||||
|
{% now 'Europe/London', '%Y-%m-%d' %}
|
||||||
|
|
||||||
|
3. Adds convenience features on top of Arrow:
|
||||||
|
- Default timezone from environment variable (TZ) or config
|
||||||
|
- Default datetime format configuration
|
||||||
|
- Offset syntax parsing: 'hours=2,minutes=30' → shift(hours=2, minutes=30)
|
||||||
|
- Empty string timezone support to use configured defaults
|
||||||
|
|
||||||
|
4. Maintains security - Works within Jinja2's sandboxed environment so users
|
||||||
|
cannot access arbitrary Python code or objects.
|
||||||
|
|
||||||
|
Essentially, this is a Jinja2 wrapper around arrow.now() and arrow.shift() that
|
||||||
|
provides user-friendly template syntax while maintaining security.
|
||||||
|
|
||||||
|
Basic Usage:
|
||||||
|
{% now 'UTC' %}
|
||||||
|
# Output: Wed, 09 Dec 2015 23:33:01
|
||||||
|
|
||||||
|
Custom Format:
|
||||||
|
{% now 'UTC', '%Y-%m-%d %H:%M:%S' %}
|
||||||
|
# Output: 2015-12-09 23:33:01
|
||||||
|
|
||||||
|
Timezone Support:
|
||||||
|
{% now 'America/New_York' %}
|
||||||
|
{% now 'Europe/London' %}
|
||||||
|
{% now '' %} # Uses default timezone from environment.default_timezone
|
||||||
|
|
||||||
|
Time Offsets (Addition):
|
||||||
|
{% now 'UTC' + 'hours=2' %}
|
||||||
|
{% now 'UTC' + 'hours=2,minutes=30' %}
|
||||||
|
{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10' %}
|
||||||
|
|
||||||
|
Time Offsets (Subtraction):
|
||||||
|
{% now 'UTC' - 'minutes=11' %}
|
||||||
|
{% now 'UTC' - 'days=2,minutes=33,seconds=1' %}
|
||||||
|
|
||||||
|
Time Offsets with Custom Format:
|
||||||
|
{% now 'UTC' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}
|
||||||
|
# Output: 2015-12-10 01:33:01
|
||||||
|
|
||||||
|
Weekday Support (for finding next/previous weekday):
|
||||||
|
{% now 'UTC' + 'weekday=0' %} # Next Monday (0=Monday, 6=Sunday)
|
||||||
|
{% now 'UTC' + 'weekday=4' %} # Next Friday
|
||||||
|
|
||||||
|
Configuration:
|
||||||
|
- Default timezone: Set via TZ environment variable or override environment.default_timezone
|
||||||
|
- Default format: '%a, %d %b %Y %H:%M:%S' (can be overridden via environment.datetime_format)
|
||||||
|
|
||||||
|
Environment Customization:
|
||||||
|
from changedetectionio.jinja2_custom import create_jinja_env
|
||||||
|
|
||||||
|
jinja2_env = create_jinja_env()
|
||||||
|
jinja2_env.default_timezone = 'America/New_York' # Override default timezone
|
||||||
|
jinja2_env.datetime_format = '%Y-%m-%d %H:%M' # Override default format
|
||||||
|
|
||||||
|
Supported Offset Parameters:
|
||||||
|
- years, months, weeks, days
|
||||||
|
- hours, minutes, seconds, microseconds
|
||||||
|
- weekday (0=Monday through 6=Sunday, must be integer)
|
||||||
|
|
||||||
|
Note:
|
||||||
|
This extension uses the Arrow library for timezone-aware datetime handling.
|
||||||
|
All timezone names should be valid IANA timezone identifiers (e.g., 'America/New_York').
|
||||||
|
"""
|
||||||
|
import arrow
|
||||||
|
|
||||||
|
from jinja2 import nodes
|
||||||
|
from jinja2.ext import Extension
|
||||||
|
import os
|
||||||
|
|
||||||
|
class TimeExtension(Extension):
|
||||||
|
"""
|
||||||
|
Jinja2 Extension providing the {% now %} tag for timezone-aware date/time rendering.
|
||||||
|
|
||||||
|
This extension adds two attributes to the Jinja2 environment:
|
||||||
|
- datetime_format: Default strftime format string (default: '%a, %d %b %Y %H:%M:%S')
|
||||||
|
- default_timezone: Default timezone for rendering (default: TZ env var or 'UTC')
|
||||||
|
|
||||||
|
Both can be overridden after environment creation by setting the attributes directly.
|
||||||
|
"""
|
||||||
|
|
||||||
|
tags = {'now'}
|
||||||
|
|
||||||
|
def __init__(self, environment):
|
||||||
|
"""Jinja2 Extension constructor."""
|
||||||
|
super().__init__(environment)
|
||||||
|
|
||||||
|
environment.extend(
|
||||||
|
datetime_format='%a, %d %b %Y %H:%M:%S',
|
||||||
|
default_timezone=os.getenv('TZ', 'UTC').strip()
|
||||||
|
)
|
||||||
|
|
||||||
|
def _datetime(self, timezone, operator, offset, datetime_format):
|
||||||
|
"""
|
||||||
|
Get current datetime with time offset applied.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
|
||||||
|
operator: '+' for addition or '-' for subtraction
|
||||||
|
offset: Comma-separated offset parameters (e.g., 'hours=2,minutes=30')
|
||||||
|
datetime_format: strftime format string or None to use environment default
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted datetime string with offset applied
|
||||||
|
|
||||||
|
Example:
|
||||||
|
_datetime('UTC', '+', 'hours=2,minutes=30', '%Y-%m-%d %H:%M:%S')
|
||||||
|
# Returns current time + 2.5 hours
|
||||||
|
"""
|
||||||
|
# Use default timezone if none specified
|
||||||
|
if not timezone or timezone == '':
|
||||||
|
timezone = self.environment.default_timezone
|
||||||
|
|
||||||
|
d = arrow.now(timezone)
|
||||||
|
|
||||||
|
# parse shift params from offset and include operator
|
||||||
|
shift_params = {}
|
||||||
|
for param in offset.split(','):
|
||||||
|
interval, value = param.split('=')
|
||||||
|
shift_params[interval.strip()] = float(operator + value.strip())
|
||||||
|
|
||||||
|
# Fix weekday parameter can not be float
|
||||||
|
if 'weekday' in shift_params:
|
||||||
|
shift_params['weekday'] = int(shift_params['weekday'])
|
||||||
|
|
||||||
|
d = d.shift(**shift_params)
|
||||||
|
|
||||||
|
if datetime_format is None:
|
||||||
|
datetime_format = self.environment.datetime_format
|
||||||
|
return d.strftime(datetime_format)
|
||||||
|
|
||||||
|
def _now(self, timezone, datetime_format):
|
||||||
|
"""
|
||||||
|
Get current datetime without any offset.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default
|
||||||
|
datetime_format: strftime format string or None to use environment default
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted datetime string for current time
|
||||||
|
|
||||||
|
Example:
|
||||||
|
_now('America/New_York', '%Y-%m-%d %H:%M:%S')
|
||||||
|
# Returns current time in New York timezone
|
||||||
|
"""
|
||||||
|
# Use default timezone if none specified
|
||||||
|
if not timezone or timezone == '':
|
||||||
|
timezone = self.environment.default_timezone
|
||||||
|
|
||||||
|
if datetime_format is None:
|
||||||
|
datetime_format = self.environment.datetime_format
|
||||||
|
return arrow.now(timezone).strftime(datetime_format)
|
||||||
|
|
||||||
|
def parse(self, parser):
|
||||||
|
"""
|
||||||
|
Parse the {% now %} tag and generate appropriate AST nodes.
|
||||||
|
|
||||||
|
This method is called by Jinja2 when it encounters a {% now %} tag.
|
||||||
|
It parses the tag syntax and determines whether to call _now() or _datetime()
|
||||||
|
based on whether offset operations (+ or -) are present.
|
||||||
|
|
||||||
|
Supported syntax:
|
||||||
|
{% now 'timezone' %} -> calls _now()
|
||||||
|
{% now 'timezone', 'format' %} -> calls _now()
|
||||||
|
{% now 'timezone' + 'offset' %} -> calls _datetime()
|
||||||
|
{% now 'timezone' + 'offset', 'format' %} -> calls _datetime()
|
||||||
|
{% now 'timezone' - 'offset', 'format' %} -> calls _datetime()
|
||||||
|
|
||||||
|
Args:
|
||||||
|
parser: Jinja2 parser instance
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
nodes.Output: AST output node containing the formatted datetime string
|
||||||
|
"""
|
||||||
|
lineno = next(parser.stream).lineno
|
||||||
|
|
||||||
|
node = parser.parse_expression()
|
||||||
|
|
||||||
|
if parser.stream.skip_if('comma'):
|
||||||
|
datetime_format = parser.parse_expression()
|
||||||
|
else:
|
||||||
|
datetime_format = nodes.Const(None)
|
||||||
|
|
||||||
|
if isinstance(node, nodes.Add):
|
||||||
|
call_method = self.call_method(
|
||||||
|
'_datetime',
|
||||||
|
[node.left, nodes.Const('+'), node.right, datetime_format],
|
||||||
|
lineno=lineno,
|
||||||
|
)
|
||||||
|
elif isinstance(node, nodes.Sub):
|
||||||
|
call_method = self.call_method(
|
||||||
|
'_datetime',
|
||||||
|
[node.left, nodes.Const('-'), node.right, datetime_format],
|
||||||
|
lineno=lineno,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
call_method = self.call_method(
|
||||||
|
'_now',
|
||||||
|
[node, datetime_format],
|
||||||
|
lineno=lineno,
|
||||||
|
)
|
||||||
|
return nodes.Output([call_method], lineno=lineno)
|
||||||
55
changedetectionio/jinja2_custom/safe_jinja.py
Normal file
55
changedetectionio/jinja2_custom/safe_jinja.py
Normal file
@@ -0,0 +1,55 @@
|
|||||||
|
"""
|
||||||
|
Safe Jinja2 render with max payload sizes
|
||||||
|
|
||||||
|
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
|
||||||
|
"""
|
||||||
|
|
||||||
|
import jinja2.sandbox
|
||||||
|
import typing as t
|
||||||
|
import os
|
||||||
|
from .extensions.TimeExtension import TimeExtension
|
||||||
|
|
||||||
|
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
||||||
|
|
||||||
|
# Default extensions - can be overridden in create_jinja_env()
|
||||||
|
DEFAULT_JINJA2_EXTENSIONS = [TimeExtension]
|
||||||
|
|
||||||
|
|
||||||
|
def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment:
|
||||||
|
"""
|
||||||
|
Create a sandboxed Jinja2 environment with our custom extensions and default timezone.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
extensions: List of extension classes to use (defaults to DEFAULT_JINJA2_EXTENSIONS)
|
||||||
|
**kwargs: Additional arguments to pass to ImmutableSandboxedEnvironment
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Configured Jinja2 environment
|
||||||
|
"""
|
||||||
|
if extensions is None:
|
||||||
|
extensions = DEFAULT_JINJA2_EXTENSIONS
|
||||||
|
|
||||||
|
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(
|
||||||
|
extensions=extensions,
|
||||||
|
**kwargs
|
||||||
|
)
|
||||||
|
|
||||||
|
# Get default timezone from environment variable
|
||||||
|
default_timezone = os.getenv('TZ', 'UTC').strip()
|
||||||
|
jinja2_env.default_timezone = default_timezone
|
||||||
|
|
||||||
|
return jinja2_env
|
||||||
|
|
||||||
|
|
||||||
|
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
|
||||||
|
# (Which also limits available functions that could be called)
|
||||||
|
def render(template_str, **args: t.Any) -> str:
|
||||||
|
jinja2_env = create_jinja_env()
|
||||||
|
output = jinja2_env.from_string(template_str).render(args)
|
||||||
|
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
|
||||||
|
|
||||||
|
def render_fully_escaped(content):
|
||||||
|
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
|
||||||
|
template = env.from_string("{{ some_html|e }}")
|
||||||
|
return template.render(some_html=content)
|
||||||
|
|
||||||
@@ -56,11 +56,11 @@ class model(dict):
|
|||||||
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
||||||
'rss_hide_muted_watches': True,
|
'rss_hide_muted_watches': True,
|
||||||
'rss_reader_mode': False,
|
'rss_reader_mode': False,
|
||||||
|
'scheduler_timezone_default': None, # Default IANA timezone name
|
||||||
'schema_version' : 0,
|
'schema_version' : 0,
|
||||||
'shared_diff_access': False,
|
'shared_diff_access': False,
|
||||||
'strip_ignored_lines': False,
|
'strip_ignored_lines': False,
|
||||||
'tags': {}, #@todo use Tag.model initialisers
|
'tags': {}, #@todo use Tag.model initialisers
|
||||||
'timezone': None, # Default IANA timezone name
|
|
||||||
'webdriver_delay': None , # Extra delay in seconds before extracting text
|
'webdriver_delay': None , # Extra delay in seconds before extracting text
|
||||||
'ui': {
|
'ui': {
|
||||||
'use_page_title_in_list': True,
|
'use_page_title_in_list': True,
|
||||||
|
|||||||
@@ -1,14 +1,14 @@
|
|||||||
from blinker import signal
|
from blinker import signal
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
from . import watch_base
|
from . import watch_base
|
||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
from .. import safe_jinja
|
from .. import jinja2_custom as safe_jinja
|
||||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||||
|
|
||||||
# Allowable protocols, protects against javascript: etc
|
# Allowable protocols, protects against javascript: etc
|
||||||
@@ -89,9 +89,8 @@ class model(watch_base):
|
|||||||
ready_url = jinja_render(template_str=url)
|
ready_url = jinja_render(template_str=url)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
|
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
|
||||||
from flask import (
|
from flask import flash, url_for
|
||||||
flash, Markup, url_for
|
from markupsafe import Markup
|
||||||
)
|
|
||||||
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
|
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
|
||||||
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
|
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
|
||||||
flash(message, 'error')
|
flash(message, 'error')
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ from loguru import logger
|
|||||||
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
||||||
|
|
||||||
def process_notification(n_object, datastore):
|
def process_notification(n_object, datastore):
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
||||||
# be sure its registered
|
# be sure its registered
|
||||||
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||||
|
|||||||
@@ -102,7 +102,7 @@ class difference_detection_processor():
|
|||||||
self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid'))
|
self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid'))
|
||||||
|
|
||||||
# Tweak the base config with the per-watch ones
|
# Tweak the base config with the per-watch ones
|
||||||
from changedetectionio.safe_jinja import render as jinja_render
|
from changedetectionio.jinja2_custom import render as jinja_render
|
||||||
request_headers = CaseInsensitiveDict()
|
request_headers = CaseInsensitiveDict()
|
||||||
|
|
||||||
ua = self.datastore.data['settings']['requests'].get('default_ua')
|
ua = self.datastore.data['settings']['requests'].get('default_ua')
|
||||||
|
|||||||
@@ -64,24 +64,31 @@ class guess_stream_type():
|
|||||||
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
|
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
|
||||||
test_content_normalized = re.sub(r'<\s+', '<', test_content)
|
test_content_normalized = re.sub(r'<\s+', '<', test_content)
|
||||||
|
|
||||||
# Magic will sometimes call text/plain as text/html!
|
# Use puremagic for lightweight MIME detection (saves ~14MB vs python-magic)
|
||||||
magic_result = None
|
magic_result = None
|
||||||
try:
|
try:
|
||||||
import magic
|
import puremagic
|
||||||
|
|
||||||
mime = magic.from_buffer(content[:200], mime=True) # Send the original content
|
# puremagic needs bytes, so encode if we have a string
|
||||||
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
|
content_bytes = content[:200].encode('utf-8') if isinstance(content, str) else content[:200]
|
||||||
if mime and "/" in mime:
|
|
||||||
magic_result = mime
|
# puremagic returns a list of PureMagic objects with confidence scores
|
||||||
# Ignore generic/fallback mime types from magic
|
detections = puremagic.magic_string(content_bytes)
|
||||||
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
|
if detections:
|
||||||
logger.debug(f"Ignoring generic mime type '{mime}' from magic library")
|
# Get the highest confidence detection
|
||||||
# Trust magic for non-text types immediately
|
mime = detections[0].mime_type
|
||||||
elif mime not in ['text/html', 'text/plain']:
|
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
|
||||||
magic_content_header = mime
|
if mime and "/" in mime:
|
||||||
|
magic_result = mime
|
||||||
|
# Ignore generic/fallback mime types
|
||||||
|
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
|
||||||
|
logger.debug(f"Ignoring generic mime type '{mime}' from puremagic library")
|
||||||
|
# Trust puremagic for non-text types immediately
|
||||||
|
elif mime not in ['text/html', 'text/plain']:
|
||||||
|
magic_content_header = mime
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection")
|
logger.error(f"Error getting a more precise mime type from 'puremagic' library ({str(e)}), using content-based detection")
|
||||||
|
|
||||||
# Content-based detection (most reliable for text formats)
|
# Content-based detection (most reliable for text formats)
|
||||||
# Check for HTML patterns first - if found, override magic's text/plain
|
# Check for HTML patterns first - if found, override magic's text/plain
|
||||||
|
|||||||
@@ -556,6 +556,20 @@ class perform_site_check(difference_detection_processor):
|
|||||||
else:
|
else:
|
||||||
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
||||||
|
|
||||||
|
# Note: Explicit cleanup is only needed here because text_json_diff handles
|
||||||
|
# large strings (100KB-300KB for RSS/HTML). The other processors work with
|
||||||
|
# small strings and don't need this.
|
||||||
|
#
|
||||||
|
# Python would clean these up automatically, but explicit `del` frees memory
|
||||||
|
# immediately rather than waiting for function return, reducing peak memory usage.
|
||||||
|
del content
|
||||||
|
if 'html_content' in locals() and html_content is not stripped_text:
|
||||||
|
del html_content
|
||||||
|
if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text:
|
||||||
|
del text_content_before_ignored_filter
|
||||||
|
if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text:
|
||||||
|
del text_for_checksuming
|
||||||
|
|
||||||
return changed_detected, update_obj, stripped_text
|
return changed_detected, update_obj, stripped_text
|
||||||
|
|
||||||
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
|
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
|
||||||
|
|||||||
@@ -1,24 +0,0 @@
|
|||||||
"""
|
|
||||||
Safe Jinja2 render with max payload sizes
|
|
||||||
|
|
||||||
See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations
|
|
||||||
"""
|
|
||||||
|
|
||||||
import jinja2.sandbox
|
|
||||||
import typing as t
|
|
||||||
import os
|
|
||||||
|
|
||||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
|
||||||
|
|
||||||
# This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available.
|
|
||||||
# (Which also limits available functions that could be called)
|
|
||||||
def render(template_str, **args: t.Any) -> str:
|
|
||||||
jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension'])
|
|
||||||
output = jinja2_env.from_string(template_str).render(args)
|
|
||||||
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
|
|
||||||
|
|
||||||
def render_fully_escaped(content):
|
|
||||||
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
|
|
||||||
template = env.from_string("{{ some_html|e }}")
|
|
||||||
return template.render(some_html=content)
|
|
||||||
|
|
||||||
@@ -29,7 +29,7 @@ $(document).ready(function () {
|
|||||||
$(this).text(new Date($(this).data("utc")).toLocaleString());
|
$(this).text(new Date($(this).data("utc")).toLocaleString());
|
||||||
})
|
})
|
||||||
|
|
||||||
const timezoneInput = $('#application-timezone');
|
const timezoneInput = $('#application-scheduler_timezone_default');
|
||||||
if(timezoneInput.length) {
|
if(timezoneInput.length) {
|
||||||
const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
|
const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone;
|
||||||
if (!timezoneInput.val().trim()) {
|
if (!timezoneInput.val().trim()) {
|
||||||
|
|||||||
@@ -976,6 +976,10 @@ class ChangeDetectionStore:
|
|||||||
if self.data['settings']['application'].get('extract_title_as_title'):
|
if self.data['settings']['application'].get('extract_title_as_title'):
|
||||||
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
|
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
|
||||||
|
|
||||||
|
def update_21(self):
|
||||||
|
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
|
||||||
|
del self.data['settings']['application']['timezone']
|
||||||
|
|
||||||
|
|
||||||
def add_notification_url(self, notification_url):
|
def add_notification_url(self, notification_url):
|
||||||
|
|
||||||
|
|||||||
@@ -14,13 +14,31 @@
|
|||||||
{% if field.errors is mapping and 'form' in field.errors %}
|
{% if field.errors is mapping and 'form' in field.errors %}
|
||||||
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
|
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
|
||||||
{% set errors = field.errors['form'] %}
|
{% set errors = field.errors['form'] %}
|
||||||
|
{% for error in errors %}
|
||||||
|
<li>{{ error }}</li>
|
||||||
|
{% endfor %}
|
||||||
|
{% elif field.type == 'FieldList' %}
|
||||||
|
{# Handle FieldList of FormFields - errors is a list of dicts, one per entry #}
|
||||||
|
{% for idx, entry_errors in field.errors|enumerate %}
|
||||||
|
{% if entry_errors is mapping and entry_errors %}
|
||||||
|
{# Only show entries that have actual errors #}
|
||||||
|
<li><strong>Entry {{ idx + 1 }}:</strong>
|
||||||
|
<ul>
|
||||||
|
{% for field_name, messages in entry_errors.items() %}
|
||||||
|
{% for message in messages %}
|
||||||
|
<li>{{ field_name }}: {{ message }}</li>
|
||||||
|
{% endfor %}
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</li>
|
||||||
|
{% endif %}
|
||||||
|
{% endfor %}
|
||||||
{% else %}
|
{% else %}
|
||||||
{# regular list of errors with this field #}
|
{# regular list of errors with this field #}
|
||||||
{% set errors = field.errors %}
|
{% for error in field.errors %}
|
||||||
|
<li>{{ error }}</li>
|
||||||
|
{% endfor %}
|
||||||
{% endif %}
|
{% endif %}
|
||||||
{% for error in errors %}
|
|
||||||
<li>{{ error }}</li>
|
|
||||||
{% endfor %}
|
|
||||||
</ul>
|
</ul>
|
||||||
{% endif %}
|
{% endif %}
|
||||||
</div>
|
</div>
|
||||||
@@ -93,6 +111,39 @@
|
|||||||
{{ field(**kwargs)|safe }}
|
{{ field(**kwargs)|safe }}
|
||||||
{% endmacro %}
|
{% endmacro %}
|
||||||
|
|
||||||
|
{% macro render_fieldlist_with_inline_errors(fieldlist) %}
|
||||||
|
{# Specialized macro for FieldList(FormField(...)) that renders errors inline with each field #}
|
||||||
|
<div {% if fieldlist.errors %} class="error" {% endif %}>{{ fieldlist.label }}</div>
|
||||||
|
<div {% if fieldlist.errors %} class="error" {% endif %}>
|
||||||
|
<ul id="{{ fieldlist.id }}">
|
||||||
|
{% for entry in fieldlist %}
|
||||||
|
<li {% if entry.errors %} class="error" {% endif %}>
|
||||||
|
<label for="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>{{ fieldlist.label.text }}-{{ loop.index0 }}</label>
|
||||||
|
<table id="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>
|
||||||
|
<tbody>
|
||||||
|
{% for subfield in entry %}
|
||||||
|
<tr {% if subfield.errors %} class="error" {% endif %}>
|
||||||
|
<th {% if subfield.errors %} class="error" {% endif %}><label for="{{ subfield.id }}" {% if subfield.errors %} class="error" {% endif %}>{{ subfield.label.text }}</label></th>
|
||||||
|
<td {% if subfield.errors %} class="error" {% endif %}>
|
||||||
|
{{ subfield(**kwargs)|safe }}
|
||||||
|
{% if subfield.errors %}
|
||||||
|
<ul class="errors">
|
||||||
|
{% for error in subfield.errors %}
|
||||||
|
<li class="error">{{ error }}</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
{% endif %}
|
||||||
|
</td>
|
||||||
|
</tr>
|
||||||
|
{% endfor %}
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</li>
|
||||||
|
{% endfor %}
|
||||||
|
</ul>
|
||||||
|
</div>
|
||||||
|
{% endmacro %}
|
||||||
|
|
||||||
{% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
|
{% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
|
||||||
<div class="fieldlist_formfields" id="{{ table_id }}">
|
<div class="fieldlist_formfields" id="{{ table_id }}">
|
||||||
<div class="fieldlist-header">
|
<div class="fieldlist-header">
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import time
|
|||||||
from threading import Thread
|
from threading import Thread
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
|
import arrow
|
||||||
from changedetectionio import changedetection_app
|
from changedetectionio import changedetection_app
|
||||||
from changedetectionio import store
|
from changedetectionio import store
|
||||||
import os
|
import os
|
||||||
@@ -29,6 +30,17 @@ def reportlog(pytestconfig):
|
|||||||
logger.remove(handler_id)
|
logger.remove(handler_id)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def environment(mocker):
|
||||||
|
"""Mock arrow.now() to return a fixed datetime for testing jinja2 time extension."""
|
||||||
|
# Fixed datetime: Wed, 09 Dec 2015 23:33:01 UTC
|
||||||
|
# This is calculated to match the test expectations when offsets are applied
|
||||||
|
fixed_datetime = arrow.Arrow(2015, 12, 9, 23, 33, 1, tzinfo='UTC')
|
||||||
|
# Patch arrow.now in the TimeExtension module where it's actually used
|
||||||
|
mocker.patch('changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now', return_value=fixed_datetime)
|
||||||
|
return fixed_datetime
|
||||||
|
|
||||||
|
|
||||||
def format_memory_human(bytes_value):
|
def format_memory_human(bytes_value):
|
||||||
"""Format memory in human-readable units (KB, MB, GB)"""
|
"""Format memory in human-readable units (KB, MB, GB)"""
|
||||||
if bytes_value < 1024:
|
if bytes_value < 1024:
|
||||||
|
|||||||
@@ -49,3 +49,39 @@ def test_select_custom(client, live_server, measure_memory_usage):
|
|||||||
#
|
#
|
||||||
# Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default
|
# Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default
|
||||||
|
|
||||||
|
|
||||||
|
def test_custom_proxy_validation(client, live_server, measure_memory_usage):
|
||||||
|
# live_server_setup(live_server) # Setup on conftest per function
|
||||||
|
|
||||||
|
# Goto settings, add our custom one
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings.settings_page"),
|
||||||
|
data={
|
||||||
|
"requests-time_between_check-minutes": 180,
|
||||||
|
"application-ignore_whitespace": "y",
|
||||||
|
"application-fetch_backend": 'html_requests',
|
||||||
|
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
|
||||||
|
"requests-extra_proxies-0-proxy_url": "xxxxhtt/333??p://test:awesome@squid-custom:3128",
|
||||||
|
},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"Settings updated." not in res.data
|
||||||
|
assert b'Proxy URLs must start with' in res.data
|
||||||
|
|
||||||
|
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings.settings_page"),
|
||||||
|
data={
|
||||||
|
"requests-time_between_check-minutes": 180,
|
||||||
|
"application-ignore_whitespace": "y",
|
||||||
|
"application-fetch_backend": 'html_requests',
|
||||||
|
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
|
||||||
|
"requests-extra_proxies-0-proxy_url": "https://",
|
||||||
|
},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"Settings updated." not in res.data
|
||||||
|
assert b"Invalid URL." in res.data
|
||||||
|
|
||||||
@@ -165,6 +165,46 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
|||||||
# Cleanup everything
|
# Cleanup everything
|
||||||
delete_all_watches(client)
|
delete_all_watches(client)
|
||||||
|
|
||||||
|
|
||||||
|
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
|
||||||
|
def test_requests_timeout(client, live_server, measure_memory_usage):
|
||||||
|
delay = 2
|
||||||
|
test_url = url_for('test_endpoint', delay=delay, _external=True)
|
||||||
|
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings.settings_page"),
|
||||||
|
data={"application-ui-use_page_title_in_list": "",
|
||||||
|
"requests-time_between_check-minutes": 180,
|
||||||
|
"requests-timeout": delay - 1,
|
||||||
|
'application-fetch_backend': "html_requests"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# requests takes >2 sec but we timeout at 1 second
|
||||||
|
res = client.get(url_for("watchlist.index"))
|
||||||
|
assert b'Read timed out. (read timeout=1)' in res.data
|
||||||
|
|
||||||
|
##### Now set a longer timeout
|
||||||
|
res = client.post(
|
||||||
|
url_for("settings.settings_page"),
|
||||||
|
data={"application-ui-use_page_title_in_list": "",
|
||||||
|
"requests-time_between_check-minutes": 180,
|
||||||
|
"requests-timeout": delay + 1, # timeout should be a second more than the reply time
|
||||||
|
'application-fetch_backend': "html_requests"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
res = client.get(url_for("watchlist.index"))
|
||||||
|
assert b'Read timed out' not in res.data
|
||||||
|
|
||||||
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
|
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
@@ -338,4 +378,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server
|
|||||||
assert b'<string name="feed_update_receiver_name"' in res.data
|
assert b'<string name="feed_update_receiver_name"' in res.data
|
||||||
assert b'<foobar' not in res.data
|
assert b'<foobar' not in res.data
|
||||||
|
|
||||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
|||||||
@@ -1,8 +1,10 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
import time
|
import time
|
||||||
|
import arrow
|
||||||
from flask import url_for
|
from flask import url_for
|
||||||
from .util import live_server_setup, wait_for_all_checks
|
from .util import live_server_setup, wait_for_all_checks
|
||||||
|
from ..jinja2_custom import render
|
||||||
|
|
||||||
|
|
||||||
# def test_setup(client, live_server, measure_memory_usage):
|
# def test_setup(client, live_server, measure_memory_usage):
|
||||||
@@ -33,6 +35,35 @@ def test_jinja2_in_url_query(client, live_server, measure_memory_usage):
|
|||||||
)
|
)
|
||||||
assert b'date=2' in res.data
|
assert b'date=2' in res.data
|
||||||
|
|
||||||
|
# Test for issue #1493 - jinja2-time offset functionality
|
||||||
|
def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usage):
|
||||||
|
"""Test that jinja2 time offset expressions work in watch URLs (issue #1493)."""
|
||||||
|
|
||||||
|
# Add our URL to the import page with time offset expression
|
||||||
|
test_url = url_for('test_return_query', _external=True)
|
||||||
|
|
||||||
|
# Test the exact syntax from issue #1493 that was broken in jinja2-time
|
||||||
|
# This should work now with our custom TimeExtension
|
||||||
|
full_url = "{}?{}".format(test_url,
|
||||||
|
"timestamp={% now 'utc' - 'minutes=11', '%Y-%m-%d %H:%M' %}", )
|
||||||
|
res = client.post(
|
||||||
|
url_for("ui.ui_views.form_quick_watch_add"),
|
||||||
|
data={"url": full_url, "tags": "test"},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"Watch added" in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Verify the URL was processed correctly (should not have errors)
|
||||||
|
res = client.get(
|
||||||
|
url_for("ui.ui_views.preview_page", uuid="first"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
# Should have a valid timestamp in the response
|
||||||
|
assert b'timestamp=' in res.data
|
||||||
|
# Should not have template error
|
||||||
|
assert b'Invalid template' not in res.data
|
||||||
|
|
||||||
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
|
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
|
||||||
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
||||||
|
|
||||||
@@ -56,3 +87,86 @@ def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
|
|||||||
assert b'is invalid and cannot be used' in res.data
|
assert b'is invalid and cannot be used' in res.data
|
||||||
# Some of the spewed output from the subclasses
|
# Some of the spewed output from the subclasses
|
||||||
assert b'dict_values' not in res.data
|
assert b'dict_values' not in res.data
|
||||||
|
|
||||||
|
def test_timezone(mocker):
|
||||||
|
"""Verify that timezone is parsed."""
|
||||||
|
|
||||||
|
timezone = 'America/Buenos_Aires'
|
||||||
|
currentDate = arrow.now(timezone)
|
||||||
|
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
|
||||||
|
arrowNowMock.return_value = currentDate
|
||||||
|
finalRender = render(f"{{% now '{timezone}' %}}")
|
||||||
|
|
||||||
|
assert finalRender == currentDate.strftime('%a, %d %b %Y %H:%M:%S')
|
||||||
|
|
||||||
|
def test_format(mocker):
|
||||||
|
"""Verify that format is parsed."""
|
||||||
|
|
||||||
|
timezone = 'utc'
|
||||||
|
format = '%d %b %Y %H:%M:%S'
|
||||||
|
currentDate = arrow.now(timezone)
|
||||||
|
arrowNowMock = mocker.patch("arrow.now")
|
||||||
|
arrowNowMock.return_value = currentDate
|
||||||
|
finalRender = render(f"{{% now '{timezone}', '{format}' %}}")
|
||||||
|
|
||||||
|
assert finalRender == currentDate.strftime(format)
|
||||||
|
|
||||||
|
def test_add_time(environment):
|
||||||
|
"""Verify that added time offset can be parsed."""
|
||||||
|
|
||||||
|
finalRender = render("{% now 'utc' + 'hours=2,seconds=30' %}")
|
||||||
|
|
||||||
|
assert finalRender == "Thu, 10 Dec 2015 01:33:31"
|
||||||
|
|
||||||
|
def test_add_weekday(mocker):
|
||||||
|
"""Verify that added weekday offset can be parsed."""
|
||||||
|
|
||||||
|
timezone = 'utc'
|
||||||
|
currentDate = arrow.now(timezone)
|
||||||
|
arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now")
|
||||||
|
arrowNowMock.return_value = currentDate
|
||||||
|
finalRender = render(f"{{% now '{timezone}' + 'weekday=1' %}}")
|
||||||
|
|
||||||
|
assert finalRender == currentDate.shift(weekday=1).strftime('%a, %d %b %Y %H:%M:%S')
|
||||||
|
|
||||||
|
|
||||||
|
def test_substract_time(environment):
|
||||||
|
"""Verify that substracted time offset can be parsed."""
|
||||||
|
|
||||||
|
finalRender = render("{% now 'utc' - 'minutes=11' %}")
|
||||||
|
|
||||||
|
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||||
|
|
||||||
|
|
||||||
|
def test_offset_with_format(environment):
|
||||||
|
"""Verify that offset works together with datetime format."""
|
||||||
|
|
||||||
|
finalRender = render(
|
||||||
|
"{% now 'utc' - 'days=2,minutes=33,seconds=1', '%d %b %Y %H:%M:%S' %}"
|
||||||
|
)
|
||||||
|
|
||||||
|
assert finalRender == "07 Dec 2015 23:00:00"
|
||||||
|
|
||||||
|
def test_default_timezone_empty_string(environment):
|
||||||
|
"""Verify that empty timezone string uses the default timezone (UTC in test environment)."""
|
||||||
|
|
||||||
|
# Empty string should use the default timezone which is 'UTC' (or from application settings)
|
||||||
|
finalRender = render("{% now '' %}")
|
||||||
|
|
||||||
|
# Should render with default format and UTC timezone (matches environment fixture)
|
||||||
|
assert finalRender == "Wed, 09 Dec 2015 23:33:01"
|
||||||
|
|
||||||
|
def test_default_timezone_with_offset(environment):
|
||||||
|
"""Verify that empty timezone works with offset operations."""
|
||||||
|
|
||||||
|
# Empty string with offset should use default timezone
|
||||||
|
finalRender = render("{% now '' + 'hours=2', '%d %b %Y %H:%M:%S' %}")
|
||||||
|
|
||||||
|
assert finalRender == "10 Dec 2015 01:33:01"
|
||||||
|
|
||||||
|
def test_default_timezone_subtraction(environment):
|
||||||
|
"""Verify that empty timezone works with subtraction offset."""
|
||||||
|
|
||||||
|
finalRender = render("{% now '' - 'minutes=11' %}")
|
||||||
|
|
||||||
|
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||||
@@ -2,7 +2,8 @@
|
|||||||
# coding=utf-8
|
# coding=utf-8
|
||||||
|
|
||||||
import time
|
import time
|
||||||
from flask import url_for, escape
|
from flask import url_for
|
||||||
|
from markupsafe import escape
|
||||||
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
|
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||||
import pytest
|
import pytest
|
||||||
jq_support = True
|
jq_support = True
|
||||||
|
|||||||
@@ -24,7 +24,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
|
|||||||
url_for("settings.settings_page"),
|
url_for("settings.settings_page"),
|
||||||
data={"application-empty_pages_are_a_change": "",
|
data={"application-empty_pages_are_a_change": "",
|
||||||
"requests-time_between_check-seconds": 1,
|
"requests-time_between_check-seconds": 1,
|
||||||
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||||
'application-fetch_backend': "html_requests"},
|
'application-fetch_backend': "html_requests"},
|
||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
@@ -119,7 +119,7 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
|
|||||||
|
|
||||||
data = {
|
data = {
|
||||||
"application-empty_pages_are_a_change": "",
|
"application-empty_pages_are_a_change": "",
|
||||||
"application-timezone": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
"application-scheduler_timezone_default": "Pacific/Kiritimati", # Most Forward Time Zone (UTC+14:00)
|
||||||
'application-fetch_backend': "html_requests",
|
'application-fetch_backend': "html_requests",
|
||||||
"requests-time_between_check-hours": 0,
|
"requests-time_between_check-hours": 0,
|
||||||
"requests-time_between_check-minutes": 0,
|
"requests-time_between_check-minutes": 0,
|
||||||
|
|||||||
@@ -4,7 +4,7 @@
|
|||||||
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
|
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from changedetectionio import safe_jinja
|
from changedetectionio import jinja2_custom as safe_jinja
|
||||||
|
|
||||||
|
|
||||||
# mostly
|
# mostly
|
||||||
|
|||||||
@@ -6,7 +6,7 @@
|
|||||||
import unittest
|
import unittest
|
||||||
import os
|
import os
|
||||||
|
|
||||||
from changedetectionio.processors import restock_diff
|
import changedetectionio.processors.restock_diff.processor as restock_diff
|
||||||
|
|
||||||
# mostly
|
# mostly
|
||||||
class TestDiffBuilder(unittest.TestCase):
|
class TestDiffBuilder(unittest.TestCase):
|
||||||
|
|||||||
@@ -1,11 +1,10 @@
|
|||||||
#!/usr/bin/env python3
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
# run from dir above changedetectionio/ dir
|
# run from dir above changedetectionio/ dir
|
||||||
# python3 -m unittest changedetectionio.tests.unit.test_jinja2_security
|
# python3 -m unittest changedetectionio.tests.unit.test_scheduler
|
||||||
|
|
||||||
import unittest
|
import unittest
|
||||||
from datetime import datetime, timedelta
|
import arrow
|
||||||
from zoneinfo import ZoneInfo
|
|
||||||
|
|
||||||
class TestScheduler(unittest.TestCase):
|
class TestScheduler(unittest.TestCase):
|
||||||
|
|
||||||
@@ -13,12 +12,13 @@ class TestScheduler(unittest.TestCase):
|
|||||||
# UTC-12:00 (Baker Island, Howland Island) is the farthest behind, always one calendar day behind UTC.
|
# UTC-12:00 (Baker Island, Howland Island) is the farthest behind, always one calendar day behind UTC.
|
||||||
|
|
||||||
def test_timezone_basic_time_within_schedule(self):
|
def test_timezone_basic_time_within_schedule(self):
|
||||||
|
"""Test that current time is detected as within schedule window."""
|
||||||
from changedetectionio import time_handler
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
timezone_str = 'Europe/Berlin'
|
timezone_str = 'Europe/Berlin'
|
||||||
debug_datetime = datetime.now(ZoneInfo(timezone_str))
|
debug_datetime = arrow.now(timezone_str)
|
||||||
day_of_week = debug_datetime.strftime('%A')
|
day_of_week = debug_datetime.format('dddd')
|
||||||
time_str = str(debug_datetime.hour)+':00'
|
time_str = debug_datetime.format('HH:00')
|
||||||
duration = 60 # minutes
|
duration = 60 # minutes
|
||||||
|
|
||||||
# The current time should always be within 60 minutes of [time_hour]:00
|
# The current time should always be within 60 minutes of [time_hour]:00
|
||||||
@@ -30,16 +30,17 @@ class TestScheduler(unittest.TestCase):
|
|||||||
self.assertEqual(result, True, f"{debug_datetime} is within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
self.assertEqual(result, True, f"{debug_datetime} is within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
||||||
|
|
||||||
def test_timezone_basic_time_outside_schedule(self):
|
def test_timezone_basic_time_outside_schedule(self):
|
||||||
|
"""Test that time from yesterday is outside current schedule."""
|
||||||
from changedetectionio import time_handler
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
timezone_str = 'Europe/Berlin'
|
timezone_str = 'Europe/Berlin'
|
||||||
# We try a date in the future..
|
# We try a date in the past (yesterday)
|
||||||
debug_datetime = datetime.now(ZoneInfo(timezone_str))+ timedelta(days=-1)
|
debug_datetime = arrow.now(timezone_str).shift(days=-1)
|
||||||
day_of_week = debug_datetime.strftime('%A')
|
day_of_week = debug_datetime.format('dddd')
|
||||||
time_str = str(debug_datetime.hour) + ':00'
|
time_str = debug_datetime.format('HH:00')
|
||||||
duration = 60*24 # minutes
|
duration = 60 * 24 # minutes
|
||||||
|
|
||||||
# The current time should always be within 60 minutes of [time_hour]:00
|
# The current time should NOT be within yesterday's schedule
|
||||||
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||||
time_str=time_str,
|
time_str=time_str,
|
||||||
timezone_str=timezone_str,
|
timezone_str=timezone_str,
|
||||||
@@ -48,6 +49,58 @@ class TestScheduler(unittest.TestCase):
|
|||||||
self.assertNotEqual(result, True,
|
self.assertNotEqual(result, True,
|
||||||
f"{debug_datetime} is NOT within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
f"{debug_datetime} is NOT within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes")
|
||||||
|
|
||||||
|
def test_timezone_utc_within_schedule(self):
|
||||||
|
"""Test UTC timezone works correctly."""
|
||||||
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
debug_datetime = arrow.now(timezone_str)
|
||||||
|
day_of_week = debug_datetime.format('dddd')
|
||||||
|
time_str = debug_datetime.format('HH:00')
|
||||||
|
duration = 120 # minutes
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Current time should be within UTC schedule")
|
||||||
|
|
||||||
|
def test_timezone_extreme_ahead(self):
|
||||||
|
"""Test with UTC+14 timezone (Line Islands, Kiribati)."""
|
||||||
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
|
timezone_str = 'Pacific/Kiritimati' # UTC+14
|
||||||
|
debug_datetime = arrow.now(timezone_str)
|
||||||
|
day_of_week = debug_datetime.format('dddd')
|
||||||
|
time_str = debug_datetime.format('HH:00')
|
||||||
|
duration = 60
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Should work with extreme ahead timezone")
|
||||||
|
|
||||||
|
def test_timezone_extreme_behind(self):
|
||||||
|
"""Test with UTC-12 timezone (Baker Island)."""
|
||||||
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
|
# Using Etc/GMT+12 which is UTC-12 (confusing, but that's how it works)
|
||||||
|
timezone_str = 'Etc/GMT+12' # UTC-12
|
||||||
|
debug_datetime = arrow.now(timezone_str)
|
||||||
|
day_of_week = debug_datetime.format('dddd')
|
||||||
|
time_str = debug_datetime.format('HH:00')
|
||||||
|
duration = 60
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Should work with extreme behind timezone")
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
|||||||
138
changedetectionio/tests/unit/test_time_extension.py
Normal file
138
changedetectionio/tests/unit/test_time_extension.py
Normal file
@@ -0,0 +1,138 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
Simple unit tests for TimeExtension that mimic how safe_jinja.py uses it.
|
||||||
|
These tests demonstrate that the environment.default_timezone override works
|
||||||
|
exactly as intended in the actual application code.
|
||||||
|
"""
|
||||||
|
|
||||||
|
import arrow
|
||||||
|
from jinja2.sandbox import ImmutableSandboxedEnvironment
|
||||||
|
from changedetectionio.jinja2_custom.extensions.TimeExtension import TimeExtension
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_timezone_override_like_safe_jinja(mocker):
|
||||||
|
"""
|
||||||
|
Test that mirrors exactly how safe_jinja.py uses the TimeExtension.
|
||||||
|
This is the simplest demonstration that environment.default_timezone works.
|
||||||
|
"""
|
||||||
|
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
# Override the default timezone - exactly like safe_jinja.py does
|
||||||
|
jinja2_env.default_timezone = 'America/New_York'
|
||||||
|
|
||||||
|
# Mock arrow.now to return a fixed time
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='America/New_York')
|
||||||
|
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Use empty string timezone - should use the overridden default
|
||||||
|
template_str = "{% now '' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# Verify arrow.now was called with the overridden timezone
|
||||||
|
mock.assert_called_with('America/New_York')
|
||||||
|
assert '2025' in output
|
||||||
|
assert 'Jan' in output
|
||||||
|
|
||||||
|
|
||||||
|
def test_default_timezone_not_overridden(mocker):
|
||||||
|
"""
|
||||||
|
Test that without override, the default 'UTC' from __init__ is used.
|
||||||
|
"""
|
||||||
|
# Create environment (TimeExtension.__init__ sets default_timezone='UTC')
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
# DON'T override - should use 'UTC' default
|
||||||
|
|
||||||
|
# Mock arrow.now
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 17, 0, 0, tzinfo='UTC')
|
||||||
|
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Use empty string timezone - should use 'UTC' default
|
||||||
|
template_str = "{% now '' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# Verify arrow.now was called with 'UTC'
|
||||||
|
mock.assert_called_with('UTC')
|
||||||
|
assert '2025' in output
|
||||||
|
|
||||||
|
|
||||||
|
def test_datetime_format_override_like_safe_jinja(mocker):
|
||||||
|
"""
|
||||||
|
Test that environment.datetime_format can be overridden after creation.
|
||||||
|
"""
|
||||||
|
# Create environment (default format is '%a, %d %b %Y %H:%M:%S')
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
# Override the datetime format
|
||||||
|
jinja2_env.datetime_format = '%Y-%m-%d %H:%M:%S'
|
||||||
|
|
||||||
|
# Mock arrow.now
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 14, 30, 45, tzinfo='UTC')
|
||||||
|
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Don't specify format - should use overridden default
|
||||||
|
template_str = "{% now 'UTC' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# Should use custom format YYYY-MM-DD HH:MM:SS
|
||||||
|
assert output == '2025-01-15 14:30:45'
|
||||||
|
|
||||||
|
|
||||||
|
def test_offset_with_overridden_timezone(mocker):
|
||||||
|
"""
|
||||||
|
Test that offset operations also respect the overridden default_timezone.
|
||||||
|
"""
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
# Override to use Europe/London
|
||||||
|
jinja2_env.default_timezone = 'Europe/London'
|
||||||
|
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 10, 0, 0, tzinfo='Europe/London')
|
||||||
|
mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Use offset with empty timezone string
|
||||||
|
template_str = "{% now '' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# Should have called arrow.now with Europe/London
|
||||||
|
mock.assert_called_with('Europe/London')
|
||||||
|
# Should be 10:00 + 2 hours = 12:00
|
||||||
|
assert output == '2025-01-15 12:00:00'
|
||||||
|
|
||||||
|
|
||||||
|
def test_weekday_parameter_converted_to_int(mocker):
|
||||||
|
"""
|
||||||
|
Test that weekday parameter is properly converted from float to int.
|
||||||
|
This is important because arrow.shift() requires weekday as int, not float.
|
||||||
|
"""
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
# Wednesday, Jan 15, 2025
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='UTC')
|
||||||
|
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Add offset to next Monday (weekday=0)
|
||||||
|
template_str = "{% now 'UTC' + 'weekday=0', '%A' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# Should be Monday
|
||||||
|
assert output == 'Monday'
|
||||||
|
|
||||||
|
|
||||||
|
def test_multiple_offset_parameters(mocker):
|
||||||
|
"""
|
||||||
|
Test that multiple offset parameters can be combined in one expression.
|
||||||
|
"""
|
||||||
|
jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension])
|
||||||
|
|
||||||
|
fixed_time = arrow.Arrow(2025, 1, 15, 10, 30, 45, tzinfo='UTC')
|
||||||
|
mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time)
|
||||||
|
|
||||||
|
# Test multiple parameters: days, hours, minutes, seconds
|
||||||
|
template_str = "{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10', '%Y-%m-%d %H:%M:%S' %}"
|
||||||
|
output = jinja2_env.from_string(template_str).render()
|
||||||
|
|
||||||
|
# 2025-01-15 10:30:45 + 1 day + 2 hours + 15 minutes + 10 seconds
|
||||||
|
# = 2025-01-16 12:45:55
|
||||||
|
assert output == '2025-01-16 12:45:55'
|
||||||
429
changedetectionio/tests/unit/test_time_handler.py
Normal file
429
changedetectionio/tests/unit/test_time_handler.py
Normal file
@@ -0,0 +1,429 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
"""
|
||||||
|
Comprehensive tests for time_handler module refactored to use arrow.
|
||||||
|
|
||||||
|
Run from project root:
|
||||||
|
python3 -m pytest changedetectionio/tests/unit/test_time_handler.py -v
|
||||||
|
"""
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import arrow
|
||||||
|
from changedetectionio import time_handler
|
||||||
|
|
||||||
|
|
||||||
|
class TestAmIInsideTime(unittest.TestCase):
|
||||||
|
"""Tests for the am_i_inside_time function."""
|
||||||
|
|
||||||
|
def test_current_time_within_schedule(self):
|
||||||
|
"""Test that current time is detected as within schedule."""
|
||||||
|
# Get current time in a specific timezone
|
||||||
|
timezone_str = 'Europe/Berlin'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = now.format('HH:00') # Current hour, 0 minutes
|
||||||
|
duration = 60 # 60 minutes
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result, f"Current time should be within {duration} minute window starting at {time_str}")
|
||||||
|
|
||||||
|
def test_current_time_outside_schedule(self):
|
||||||
|
"""Test that time in the past is not within current schedule."""
|
||||||
|
timezone_str = 'Europe/Berlin'
|
||||||
|
# Get yesterday's date
|
||||||
|
yesterday = arrow.now(timezone_str).shift(days=-1)
|
||||||
|
day_of_week = yesterday.format('dddd')
|
||||||
|
time_str = yesterday.format('HH:mm')
|
||||||
|
duration = 30 # Only 30 minutes
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertFalse(result, "Yesterday's time should not be within current schedule")
|
||||||
|
|
||||||
|
def test_timezone_pacific_within_schedule(self):
|
||||||
|
"""Test with US/Pacific timezone."""
|
||||||
|
timezone_str = 'US/Pacific'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = now.format('HH:00')
|
||||||
|
duration = 120 # 2 hours
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_timezone_tokyo_within_schedule(self):
|
||||||
|
"""Test with Asia/Tokyo timezone."""
|
||||||
|
timezone_str = 'Asia/Tokyo'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = now.format('HH:00')
|
||||||
|
duration = 90 # 1.5 hours
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_schedule_crossing_midnight(self):
|
||||||
|
"""Test schedule that crosses midnight."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
|
||||||
|
# Set schedule to start 23:30 with 120 minute duration (crosses midnight)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = "23:30"
|
||||||
|
duration = 120 # 2 hours - goes into next day
|
||||||
|
|
||||||
|
# If we're at 00:15 the next day, we should still be in the schedule
|
||||||
|
if now.hour == 0 and now.minute < 30:
|
||||||
|
# We're in the time window that spilled over from yesterday
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
# This might be true or false depending on exact time
|
||||||
|
self.assertIsInstance(result, bool)
|
||||||
|
|
||||||
|
def test_invalid_day_of_week(self):
|
||||||
|
"""Test that invalid day raises ValueError."""
|
||||||
|
with self.assertRaises(ValueError) as context:
|
||||||
|
time_handler.am_i_inside_time(
|
||||||
|
day_of_week="Funday",
|
||||||
|
time_str="12:00",
|
||||||
|
timezone_str="UTC",
|
||||||
|
duration=60
|
||||||
|
)
|
||||||
|
self.assertIn("Invalid day_of_week", str(context.exception))
|
||||||
|
|
||||||
|
def test_invalid_time_format(self):
|
||||||
|
"""Test that invalid time format raises ValueError."""
|
||||||
|
with self.assertRaises(ValueError) as context:
|
||||||
|
time_handler.am_i_inside_time(
|
||||||
|
day_of_week="Monday",
|
||||||
|
time_str="25:99",
|
||||||
|
timezone_str="UTC",
|
||||||
|
duration=60
|
||||||
|
)
|
||||||
|
self.assertIn("Invalid time_str", str(context.exception))
|
||||||
|
|
||||||
|
def test_invalid_time_format_non_numeric(self):
|
||||||
|
"""Test that non-numeric time raises ValueError."""
|
||||||
|
with self.assertRaises(ValueError) as context:
|
||||||
|
time_handler.am_i_inside_time(
|
||||||
|
day_of_week="Monday",
|
||||||
|
time_str="twelve:thirty",
|
||||||
|
timezone_str="UTC",
|
||||||
|
duration=60
|
||||||
|
)
|
||||||
|
self.assertIn("Invalid time_str", str(context.exception))
|
||||||
|
|
||||||
|
def test_invalid_timezone(self):
|
||||||
|
"""Test that invalid timezone raises ValueError."""
|
||||||
|
with self.assertRaises(ValueError) as context:
|
||||||
|
time_handler.am_i_inside_time(
|
||||||
|
day_of_week="Monday",
|
||||||
|
time_str="12:00",
|
||||||
|
timezone_str="Invalid/Timezone",
|
||||||
|
duration=60
|
||||||
|
)
|
||||||
|
self.assertIn("Invalid timezone_str", str(context.exception))
|
||||||
|
|
||||||
|
def test_short_duration(self):
|
||||||
|
"""Test with very short duration (15 minutes default)."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = now.format('HH:mm')
|
||||||
|
duration = 15 # Default duration
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Current time should be within 15 minute window")
|
||||||
|
|
||||||
|
def test_long_duration(self):
|
||||||
|
"""Test with long duration (24 hours)."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
# Set time to current hour
|
||||||
|
time_str = now.format('HH:00')
|
||||||
|
duration = 1440 # 24 hours in minutes
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Current time should be within 24 hour window")
|
||||||
|
|
||||||
|
def test_case_insensitive_day(self):
|
||||||
|
"""Test that day of week is case insensitive."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd').lower() # lowercase day
|
||||||
|
time_str = now.format('HH:00')
|
||||||
|
duration = 60
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
self.assertTrue(result, "Lowercase day should work")
|
||||||
|
|
||||||
|
def test_edge_case_midnight(self):
|
||||||
|
"""Test edge case at exactly midnight."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = "00:00"
|
||||||
|
duration = 60
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
# Should be true if we're in the first hour of the day
|
||||||
|
if now.hour == 0:
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_edge_case_end_of_day(self):
|
||||||
|
"""Test edge case near end of day."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
day_of_week = now.format('dddd')
|
||||||
|
time_str = "23:45"
|
||||||
|
duration = 30 # 30 minutes crosses midnight
|
||||||
|
|
||||||
|
result = time_handler.am_i_inside_time(
|
||||||
|
day_of_week=day_of_week,
|
||||||
|
time_str=time_str,
|
||||||
|
timezone_str=timezone_str,
|
||||||
|
duration=duration
|
||||||
|
)
|
||||||
|
|
||||||
|
# Result depends on current time
|
||||||
|
self.assertIsInstance(result, bool)
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsWithinSchedule(unittest.TestCase):
|
||||||
|
"""Tests for the is_within_schedule function."""
|
||||||
|
|
||||||
|
def test_schedule_disabled(self):
|
||||||
|
"""Test that disabled schedule returns False."""
|
||||||
|
time_schedule_limit = {'enabled': False}
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_schedule_none(self):
|
||||||
|
"""Test that None schedule returns False."""
|
||||||
|
result = time_handler.is_within_schedule(None)
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_schedule_empty_dict(self):
|
||||||
|
"""Test that empty dict returns False."""
|
||||||
|
result = time_handler.is_within_schedule({})
|
||||||
|
self.assertFalse(result)
|
||||||
|
|
||||||
|
def test_schedule_enabled_but_day_disabled(self):
|
||||||
|
"""Test schedule enabled but current day disabled."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': False,
|
||||||
|
'start_time': '09:00',
|
||||||
|
'duration': {'hours': 8, 'minutes': 0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertFalse(result, "Disabled day should return False")
|
||||||
|
|
||||||
|
def test_schedule_enabled_within_time(self):
|
||||||
|
"""Test schedule enabled and within time window."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
current_hour = now.format('HH:00')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': current_hour,
|
||||||
|
'duration': {'hours': 2, 'minutes': 0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertTrue(result, "Current time should be within schedule")
|
||||||
|
|
||||||
|
def test_schedule_enabled_outside_time(self):
|
||||||
|
"""Test schedule enabled but outside time window."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
# Set time to 3 hours ago
|
||||||
|
past_time = now.shift(hours=-3).format('HH:mm')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': past_time,
|
||||||
|
'duration': {'hours': 1, 'minutes': 0} # Only 1 hour duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertFalse(result, "3 hours ago with 1 hour duration should be False")
|
||||||
|
|
||||||
|
def test_schedule_with_default_timezone(self):
|
||||||
|
"""Test schedule without timezone uses default."""
|
||||||
|
now = arrow.now('America/New_York')
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
current_hour = now.format('HH:00')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
# No timezone specified
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': current_hour,
|
||||||
|
'duration': {'hours': 2, 'minutes': 0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
# Should use default UTC, but since we're testing with NY time,
|
||||||
|
# the result depends on time difference
|
||||||
|
result = time_handler.is_within_schedule(
|
||||||
|
time_schedule_limit,
|
||||||
|
default_tz='America/New_York'
|
||||||
|
)
|
||||||
|
self.assertTrue(result, "Should work with default timezone")
|
||||||
|
|
||||||
|
def test_schedule_different_timezones(self):
|
||||||
|
"""Test schedule works correctly across different timezones."""
|
||||||
|
# Test with Tokyo timezone
|
||||||
|
timezone_str = 'Asia/Tokyo'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
current_hour = now.format('HH:00')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': current_hour,
|
||||||
|
'duration': {'hours': 1, 'minutes': 30}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertTrue(result)
|
||||||
|
|
||||||
|
def test_schedule_with_minutes_in_duration(self):
|
||||||
|
"""Test schedule with minutes specified in duration."""
|
||||||
|
timezone_str = 'UTC'
|
||||||
|
now = arrow.now(timezone_str)
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
current_time = now.format('HH:mm')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': current_time,
|
||||||
|
'duration': {'hours': 0, 'minutes': 45}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertTrue(result, "Should handle minutes in duration")
|
||||||
|
|
||||||
|
def test_schedule_with_timezone_whitespace(self):
|
||||||
|
"""Test that timezone with whitespace is handled."""
|
||||||
|
timezone_str = ' UTC '
|
||||||
|
now = arrow.now('UTC')
|
||||||
|
current_day = now.format('dddd').lower()
|
||||||
|
current_hour = now.format('HH:00')
|
||||||
|
|
||||||
|
time_schedule_limit = {
|
||||||
|
'enabled': True,
|
||||||
|
'timezone': timezone_str,
|
||||||
|
current_day: {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': current_hour,
|
||||||
|
'duration': {'hours': 1, 'minutes': 0}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
result = time_handler.is_within_schedule(time_schedule_limit)
|
||||||
|
self.assertTrue(result, "Should handle timezone with whitespace")
|
||||||
|
|
||||||
|
|
||||||
|
class TestWeekdayEnum(unittest.TestCase):
|
||||||
|
"""Tests for the Weekday enum."""
|
||||||
|
|
||||||
|
def test_weekday_values(self):
|
||||||
|
"""Test that weekday enum has correct values."""
|
||||||
|
self.assertEqual(time_handler.Weekday.Monday, 0)
|
||||||
|
self.assertEqual(time_handler.Weekday.Tuesday, 1)
|
||||||
|
self.assertEqual(time_handler.Weekday.Wednesday, 2)
|
||||||
|
self.assertEqual(time_handler.Weekday.Thursday, 3)
|
||||||
|
self.assertEqual(time_handler.Weekday.Friday, 4)
|
||||||
|
self.assertEqual(time_handler.Weekday.Saturday, 5)
|
||||||
|
self.assertEqual(time_handler.Weekday.Sunday, 6)
|
||||||
|
|
||||||
|
def test_weekday_string_access(self):
|
||||||
|
"""Test accessing weekday enum by string."""
|
||||||
|
self.assertEqual(time_handler.Weekday['Monday'], 0)
|
||||||
|
self.assertEqual(time_handler.Weekday['Sunday'], 6)
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
@@ -188,6 +188,10 @@ def new_live_server_setup(live_server):
|
|||||||
ctype = request.args.get('content_type')
|
ctype = request.args.get('content_type')
|
||||||
status_code = request.args.get('status_code')
|
status_code = request.args.get('status_code')
|
||||||
content = request.args.get('content') or None
|
content = request.args.get('content') or None
|
||||||
|
delay = int(request.args.get('delay', 0))
|
||||||
|
|
||||||
|
if delay:
|
||||||
|
time.sleep(delay)
|
||||||
|
|
||||||
# Used to just try to break the header detection
|
# Used to just try to break the header detection
|
||||||
uppercase_headers = request.args.get('uppercase_headers')
|
uppercase_headers = request.args.get('uppercase_headers')
|
||||||
|
|||||||
@@ -1,6 +1,5 @@
|
|||||||
from datetime import timedelta, datetime
|
import arrow
|
||||||
from enum import IntEnum
|
from enum import IntEnum
|
||||||
from zoneinfo import ZoneInfo
|
|
||||||
|
|
||||||
|
|
||||||
class Weekday(IntEnum):
|
class Weekday(IntEnum):
|
||||||
@@ -40,54 +39,65 @@ def am_i_inside_time(
|
|||||||
|
|
||||||
# Parse the start time
|
# Parse the start time
|
||||||
try:
|
try:
|
||||||
target_time = datetime.strptime(time_str, '%H:%M').time()
|
hour, minute = map(int, time_str.split(':'))
|
||||||
except ValueError:
|
if not (0 <= hour <= 23 and 0 <= minute <= 59):
|
||||||
|
raise ValueError
|
||||||
|
except (ValueError, AttributeError):
|
||||||
raise ValueError(f"Invalid time_str: '{time_str}'. Must be in 'HH:MM' format.")
|
raise ValueError(f"Invalid time_str: '{time_str}'. Must be in 'HH:MM' format.")
|
||||||
|
|
||||||
# Define the timezone
|
|
||||||
try:
|
|
||||||
tz = ZoneInfo(timezone_str)
|
|
||||||
except Exception:
|
|
||||||
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
|
|
||||||
|
|
||||||
# Get the current time in the specified timezone
|
# Get the current time in the specified timezone
|
||||||
now_tz = datetime.now(tz)
|
try:
|
||||||
|
now_tz = arrow.now(timezone_str.strip())
|
||||||
|
except Exception as e:
|
||||||
|
raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.")
|
||||||
|
|
||||||
# Check if the current day matches the target day or overlaps due to duration
|
# Check if the current day matches the target day or overlaps due to duration
|
||||||
current_weekday = now_tz.weekday()
|
current_weekday = now_tz.weekday()
|
||||||
start_datetime_tz = datetime.combine(now_tz.date(), target_time, tzinfo=tz)
|
# Create start datetime for today in target timezone
|
||||||
|
start_datetime_tz = now_tz.replace(hour=hour, minute=minute, second=0, microsecond=0)
|
||||||
|
|
||||||
# Handle previous day's overlap
|
# Handle previous day's overlap
|
||||||
if target_weekday == (current_weekday - 1) % 7:
|
if target_weekday == (current_weekday - 1) % 7:
|
||||||
# Calculate start and end times for the overlap from the previous day
|
# Calculate start and end times for the overlap from the previous day
|
||||||
start_datetime_tz -= timedelta(days=1)
|
start_datetime_tz = start_datetime_tz.shift(days=-1)
|
||||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||||
if start_datetime_tz <= now_tz < end_datetime_tz:
|
if start_datetime_tz <= now_tz < end_datetime_tz:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Handle current day's range
|
# Handle current day's range
|
||||||
if target_weekday == current_weekday:
|
if target_weekday == current_weekday:
|
||||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||||
if start_datetime_tz <= now_tz < end_datetime_tz:
|
if start_datetime_tz <= now_tz < end_datetime_tz:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
# Handle next day's overlap
|
# Handle next day's overlap
|
||||||
if target_weekday == (current_weekday + 1) % 7:
|
if target_weekday == (current_weekday + 1) % 7:
|
||||||
end_datetime_tz = start_datetime_tz + timedelta(minutes=duration)
|
end_datetime_tz = start_datetime_tz.shift(minutes=duration)
|
||||||
if now_tz < start_datetime_tz and now_tz + timedelta(days=1) < end_datetime_tz:
|
if now_tz < start_datetime_tz and now_tz.shift(days=1) < end_datetime_tz:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
def is_within_schedule(time_schedule_limit, default_tz="UTC"):
|
def is_within_schedule(time_schedule_limit, default_tz="UTC"):
|
||||||
|
"""
|
||||||
|
Check if the current time is within a scheduled time window.
|
||||||
|
|
||||||
|
Parameters:
|
||||||
|
time_schedule_limit (dict): Schedule configuration with timezone, day settings, etc.
|
||||||
|
default_tz (str): Default timezone to use if not specified. Default is 'UTC'.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if current time is within the schedule, False otherwise.
|
||||||
|
"""
|
||||||
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
if time_schedule_limit and time_schedule_limit.get('enabled'):
|
||||||
# Get the timezone the time schedule is in, so we know what day it is there
|
# Get the timezone the time schedule is in, so we know what day it is there
|
||||||
tz_name = time_schedule_limit.get('timezone')
|
tz_name = time_schedule_limit.get('timezone')
|
||||||
if not tz_name:
|
if not tz_name:
|
||||||
tz_name = default_tz
|
tz_name = default_tz
|
||||||
|
|
||||||
now_day_name_in_tz = datetime.now(ZoneInfo(tz_name.strip())).strftime('%A')
|
# Get current day name in the target timezone
|
||||||
|
now_day_name_in_tz = arrow.now(tz_name.strip()).format('dddd')
|
||||||
selected_day_schedule = time_schedule_limit.get(now_day_name_in_tz.lower())
|
selected_day_schedule = time_schedule_limit.get(now_day_name_in_tz.lower())
|
||||||
if not selected_day_schedule.get('enabled'):
|
if not selected_day_schedule.get('enabled'):
|
||||||
return False
|
return False
|
||||||
|
|||||||
@@ -10,7 +10,7 @@ flask_restful
|
|||||||
flask_cors # For the Chrome extension to operate
|
flask_cors # For the Chrome extension to operate
|
||||||
janus # Thread-safe async/sync queue bridge
|
janus # Thread-safe async/sync queue bridge
|
||||||
flask_wtf~=1.2
|
flask_wtf~=1.2
|
||||||
flask~=2.3
|
flask~=3.1
|
||||||
flask-socketio~=5.5.1
|
flask-socketio~=5.5.1
|
||||||
python-socketio~=5.13.0
|
python-socketio~=5.13.0
|
||||||
python-engineio~=4.12.3
|
python-engineio~=4.12.3
|
||||||
@@ -67,13 +67,9 @@ elementpath==4.1.5
|
|||||||
|
|
||||||
selenium~=4.31.0
|
selenium~=4.31.0
|
||||||
|
|
||||||
# https://github.com/pallets/werkzeug/issues/2985
|
|
||||||
# Maybe related to pytest?
|
|
||||||
werkzeug==3.0.6
|
|
||||||
|
|
||||||
# Templating, so far just in the URLs but in the future can be for the notifications also
|
# Templating, so far just in the URLs but in the future can be for the notifications also
|
||||||
jinja2~=3.1
|
jinja2~=3.1
|
||||||
jinja2-time
|
arrow
|
||||||
openpyxl
|
openpyxl
|
||||||
# https://peps.python.org/pep-0508/#environment-markers
|
# https://peps.python.org/pep-0508/#environment-markers
|
||||||
# https://github.com/dgtlmoon/changedetection.io/pull/1009
|
# https://github.com/dgtlmoon/changedetection.io/pull/1009
|
||||||
@@ -89,6 +85,7 @@ pyppeteerstealth>=0.0.4
|
|||||||
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
|
# Include pytest, so if theres a support issue we can ask them to run these tests on their setup
|
||||||
pytest ~=7.2
|
pytest ~=7.2
|
||||||
pytest-flask ~=1.2
|
pytest-flask ~=1.2
|
||||||
|
pytest-mock ~=3.15
|
||||||
|
|
||||||
# Anything 4.0 and up but not 5.0
|
# Anything 4.0 and up but not 5.0
|
||||||
jsonschema ~= 4.0
|
jsonschema ~= 4.0
|
||||||
@@ -125,8 +122,9 @@ price-parser
|
|||||||
|
|
||||||
# flask_socket_io - incorrect package name, already have flask-socketio above
|
# flask_socket_io - incorrect package name, already have flask-socketio above
|
||||||
|
|
||||||
# So far for detecting correct favicon type, but for other things in the future
|
# Lightweight MIME type detection (saves ~14MB memory vs python-magic/libmagic)
|
||||||
python-magic
|
# Used for detecting correct favicon type and content-type detection
|
||||||
|
puremagic
|
||||||
|
|
||||||
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
|
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
|
||||||
tzdata
|
tzdata
|
||||||
|
|||||||
Reference in New Issue
Block a user