mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 14:47:21 +00:00 
			
		
		
		
	Compare commits
	
		
			14 Commits
		
	
	
		
			rss-reader
			...
			notificati
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | d450262d5b | ||
|   | 6f926ed595 | ||
|   | 249dc55212 | ||
|   | 46252bc6f3 | ||
|   | 64350a2e78 | ||
|   | 2902c63a3b | ||
|   | 55b8588f1f | ||
|   | 02ecc4ae9a | ||
|   | 3ee50b7832 | ||
|   | 66ddd87ee4 | ||
|   | 233189e4f7 | ||
|   | b237fd7201 | ||
|   | 3c81efe2f4 | ||
|   | 0fcfb94690 | 
| @@ -253,6 +253,30 @@ jobs: | ||||
|           docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt | ||||
|           docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt | ||||
|  | ||||
|       - name: Extract and display memory test report | ||||
|         if: always() | ||||
|         run: | | ||||
|           # Extract test-memory.log from the container | ||||
|           echo "Extracting test-memory.log from container..." | ||||
|           docker cp test-cdio-basic-tests:/app/changedetectionio/test-memory.log output-logs/test-memory-${{ env.PYTHON_VERSION }}.log || echo "test-memory.log not found in container" | ||||
|  | ||||
|           # Display the memory log contents for immediate visibility in workflow output | ||||
|           echo "=== Top 10 Highest Peak Memory Tests ===" | ||||
|           if [ -f output-logs/test-memory-${{ env.PYTHON_VERSION }}.log ]; then | ||||
|             # Sort by peak memory value (extract number before MB and sort numerically, reverse order) | ||||
|             grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | \ | ||||
|               sed 's/.*Peak memory: //' | \ | ||||
|               paste -d'|' - <(grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log) | \ | ||||
|               sort -t'|' -k1 -nr | \ | ||||
|               cut -d'|' -f2 | \ | ||||
|               head -10 | ||||
|             echo "" | ||||
|             echo "=== Full Memory Test Report ===" | ||||
|             cat output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | ||||
|           else | ||||
|             echo "No memory log available" | ||||
|           fi | ||||
|  | ||||
|       - name: Store everything including test-datastore | ||||
|         if: always() | ||||
|         uses: actions/upload-artifact@v4 | ||||
|   | ||||
| @@ -2,6 +2,7 @@ recursive-include changedetectionio/api * | ||||
| recursive-include changedetectionio/blueprint * | ||||
| recursive-include changedetectionio/conditions * | ||||
| recursive-include changedetectionio/content_fetchers * | ||||
| recursive-include changedetectionio/jinja2_custom * | ||||
| recursive-include changedetectionio/model * | ||||
| recursive-include changedetectionio/notification * | ||||
| recursive-include changedetectionio/processors * | ||||
|   | ||||
| @@ -2,7 +2,7 @@ | ||||
|  | ||||
| # Read more https://github.com/dgtlmoon/changedetection.io/wiki | ||||
|  | ||||
| __version__ = '0.50.20' | ||||
| __version__ = '0.50.24' | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from json.decoder import JSONDecodeError | ||||
|   | ||||
| @@ -334,6 +334,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): | ||||
|                             if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change): | ||||
|                                 watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time)) | ||||
|  | ||||
|                             # Explicitly delete large content variables to free memory IMMEDIATELY after saving | ||||
|                             # These are no longer needed after being saved to history | ||||
|                             del contents | ||||
|  | ||||
|                             # Send notifications on second+ check | ||||
|                             if watch.history_n >= 2: | ||||
|                                 logger.info(f"Change detected in UUID {uuid} - {watch['url']}") | ||||
| @@ -372,6 +376,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): | ||||
|                 datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3), | ||||
|                                                                'check_count': count}) | ||||
|  | ||||
|                 # NOW clear fetcher content - after all processing is complete | ||||
|                 # This is the last point where we need the fetcher data | ||||
|                 if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher: | ||||
|                     update_handler.fetcher.clear_content() | ||||
|                     logger.debug(f"Cleared fetcher content for UUID {uuid}") | ||||
|  | ||||
|         except Exception as e: | ||||
|             logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}") | ||||
|             logger.error(f"Worker {worker_id} traceback:", exc_info=True) | ||||
| @@ -392,7 +402,28 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore): | ||||
|                         #logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}") | ||||
|                         watch_check_update.send(watch_uuid=watch['uuid']) | ||||
|  | ||||
|                     update_handler = None | ||||
|                     # Explicitly clean up update_handler and all its references | ||||
|                     if update_handler: | ||||
|                         # Clear fetcher content using the proper method | ||||
|                         if hasattr(update_handler, 'fetcher') and update_handler.fetcher: | ||||
|                             update_handler.fetcher.clear_content() | ||||
|  | ||||
|                         # Clear processor references | ||||
|                         if hasattr(update_handler, 'content_processor'): | ||||
|                             update_handler.content_processor = None | ||||
|  | ||||
|                         update_handler = None | ||||
|  | ||||
|                     # Clear local contents variable if it still exists | ||||
|                     if 'contents' in locals(): | ||||
|                         del contents | ||||
|  | ||||
|                     # Note: We don't set watch = None here because: | ||||
|                     # 1. watch is just a local reference to datastore.data['watching'][uuid] | ||||
|                     # 2. Setting it to None doesn't affect the datastore | ||||
|                     # 3. GC can't collect the object anyway (still referenced by datastore) | ||||
|                     # 4. It would just cause confusion | ||||
|  | ||||
|                     logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s") | ||||
|                 except Exception as cleanup_error: | ||||
|                     logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}") | ||||
|   | ||||
| @@ -6,7 +6,7 @@ from loguru import logger | ||||
|  | ||||
| from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT | ||||
| from changedetectionio.content_fetchers.base import manage_user_agent | ||||
| from changedetectionio.safe_jinja import render as jinja_render | ||||
| from changedetectionio.jinja2_custom import render as jinja_render | ||||
|  | ||||
|  | ||||
|  | ||||
|   | ||||
| @@ -33,7 +33,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): | ||||
|     def long_task(uuid, preferred_proxy): | ||||
|         import time | ||||
|         from changedetectionio.content_fetchers import exceptions as content_fetcher_exceptions | ||||
|         from changedetectionio.safe_jinja import render as jinja_render | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|  | ||||
|         status = {'status': '', 'length': 0, 'text': ''} | ||||
|  | ||||
|   | ||||
| @@ -1,5 +1,5 @@ | ||||
|  | ||||
| from changedetectionio.safe_jinja import render as jinja_render | ||||
| from changedetectionio.jinja2_custom import render as jinja_render | ||||
| from changedetectionio.store import ChangeDetectionStore | ||||
| from feedgen.feed import FeedGenerator | ||||
| from flask import Blueprint, make_response, request, url_for, redirect | ||||
|   | ||||
| @@ -119,7 +119,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): | ||||
|                                 hide_remove_pass=os.getenv("SALTED_PASS", False), | ||||
|                                 min_system_recheck_seconds=int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)), | ||||
|                                 settings_application=datastore.data['settings']['application'], | ||||
|                                 timezone_default_config=datastore.data['settings']['application'].get('timezone'), | ||||
|                                 timezone_default_config=datastore.data['settings']['application'].get('scheduler_timezone_default'), | ||||
|                                 utc_time=utc_time, | ||||
|                                 ) | ||||
|  | ||||
|   | ||||
| @@ -1,7 +1,7 @@ | ||||
| {% extends 'base.html' %} | ||||
|  | ||||
| {% block content %} | ||||
| {% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %} | ||||
| {% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %} | ||||
| {% from '_common_fields.html' import render_common_settings_form %} | ||||
| <script> | ||||
|     const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}"; | ||||
| @@ -72,25 +72,23 @@ | ||||
|                         <span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page) | ||||
|                         </span> | ||||
|                     </div> | ||||
|                     <div class="pure-control-group"> | ||||
|                         {{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }} | ||||
|                     </div> | ||||
|                     <div class="pure-control-group"> | ||||
|                         {{ render_field(form.application.form.rss_content_format) }} | ||||
|                         <span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span> | ||||
|                     </div> | ||||
|                     <div class="pure-control-group"> | ||||
|                         {{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }} | ||||
|                         <span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span> | ||||
|                     </div> | ||||
|                 {% if form.requests.proxy %} | ||||
|                     <div class="pure-control-group inline-radio"> | ||||
|                         {{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }} | ||||
|                         <span class="pure-form-message-inline"> | ||||
|                         Choose a default proxy for all watches | ||||
|                         </span> | ||||
|                     <div class="grey-form-border"> | ||||
|                         <div class="pure-control-group"> | ||||
|                             {{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }} | ||||
|                         </div> | ||||
|                         <div class="pure-control-group"> | ||||
|                             {{ render_field(form.application.form.rss_content_format) }} | ||||
|                             <span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span> | ||||
|                         </div> | ||||
|                         <div class="pure-control-group"> | ||||
|                             {{ render_checkbox_field(form.application.form.rss_reader_mode) }} | ||||
|                             <span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span> | ||||
|                         </div> | ||||
|                     </div> | ||||
|                 {% endif %} | ||||
|                 </fieldset> | ||||
|             </div> | ||||
|  | ||||
| @@ -133,6 +131,10 @@ | ||||
|                     <span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br> | ||||
|                     Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span> | ||||
|                 </div> | ||||
|                 <div class="pure-control-group"> | ||||
|                     {{ render_field(form.requests.form.timeout) }} | ||||
|                     <span class="pure-form-message-inline">For regular plain requests (not chrome based), maximum number of seconds until timeout, 1-999.<br> | ||||
|                 </div> | ||||
|                 <div class="pure-control-group inline-radio"> | ||||
|                     {{ render_field(form.requests.form.default_ua) }} | ||||
|                     <span class="pure-form-message-inline"> | ||||
| @@ -236,7 +238,7 @@ nav | ||||
|                     <p><strong>UTC Time & Date from Server:</strong> <span id="utc-time" >{{ utc_time }}</span></p> | ||||
|                     <p><strong>Local Time & Date in Browser:</strong> <span class="local-time" data-utc="{{ utc_time }}"></span></p> | ||||
|                     <p> | ||||
|                        {{ render_field(form.application.form.timezone) }} | ||||
|                        {{ render_field(form.application.form.scheduler_timezone_default) }} | ||||
|                         <datalist id="timezones" style="display: none;"> | ||||
|                             {% for tz_name in available_timezones %} | ||||
|                                 <option value="{{ tz_name }}">{{ tz_name }}</option> | ||||
| @@ -314,17 +316,27 @@ nav | ||||
|                <p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites. | ||||
|  | ||||
|                 <div class="pure-control-group" id="extra-proxies-setting"> | ||||
|                 {{ render_field(form.requests.form.extra_proxies) }} | ||||
|                 {{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }} | ||||
|                 <span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br> | ||||
|                 <span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span> | ||||
|                 {% if form.requests.proxy %} | ||||
|                 <div> | ||||
|                 <br> | ||||
|                     <div class="inline-radio"> | ||||
|                         {{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }} | ||||
|                         <span class="pure-form-message-inline">Choose a default proxy for all watches</span> | ||||
|                     </div> | ||||
|                 </div> | ||||
|                 {% endif %} | ||||
|                 </div> | ||||
|                 <div class="pure-control-group" id="extra-browsers-setting"> | ||||
|                     <p> | ||||
|                     <span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br> | ||||
|                     <span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span> | ||||
|                     </p> | ||||
|                     {{ render_field(form.requests.form.extra_browsers) }} | ||||
|                     {{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }} | ||||
|                 </div> | ||||
|              | ||||
|             </div> | ||||
|             <div id="actions"> | ||||
|                 <div class="pure-control-group"> | ||||
|   | ||||
| @@ -187,7 +187,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|  | ||||
|             tz_name = time_schedule_limit.get('timezone') | ||||
|             if not tz_name: | ||||
|                 tz_name = datastore.data['settings']['application'].get('timezone', 'UTC') | ||||
|                 tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip()) | ||||
|  | ||||
|             if time_schedule_limit and time_schedule_limit.get('enabled'): | ||||
|                 try: | ||||
| @@ -257,7 +257,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe | ||||
|                 'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'), | ||||
|                 'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch), | ||||
|                 'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid), | ||||
|                 'timezone_default_config': datastore.data['settings']['application'].get('timezone'), | ||||
|                 'timezone_default_config': datastore.data['settings']['application'].get('scheduler_timezone_default'), | ||||
|                 'using_global_webdriver_wait': not default['webdriver_delay'], | ||||
|                 'uuid': uuid, | ||||
|                 'watch': watch, | ||||
|   | ||||
| @@ -2,6 +2,7 @@ from flask import Blueprint, request, make_response | ||||
| import random | ||||
| from loguru import logger | ||||
|  | ||||
| from changedetectionio.notification_service import NotificationContextData | ||||
| from changedetectionio.store import ChangeDetectionStore | ||||
| from changedetectionio.auth_decorator import login_optionally_required | ||||
|  | ||||
| @@ -19,6 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore): | ||||
|         import apprise | ||||
|         from changedetectionio.notification.handler import process_notification | ||||
|         from changedetectionio.notification.apprise_plugin.assets import apprise_asset | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|  | ||||
|         from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler | ||||
|  | ||||
| @@ -61,16 +63,20 @@ def construct_blueprint(datastore: ChangeDetectionStore): | ||||
|             return 'Error: No Notification URLs set/found' | ||||
|  | ||||
|         for n_url in notification_urls: | ||||
|             # We are ONLY validating the apprise:// part here, convert all tags to something so as not to break apprise URLs | ||||
|             generic_notification_context_data = NotificationContextData() | ||||
|             generic_notification_context_data.set_random_for_validation() | ||||
|             n_url = jinja_render(template_str=n_url, **generic_notification_context_data).strip() | ||||
|             if len(n_url.strip()): | ||||
|                 if not apobj.add(n_url): | ||||
|                     return f'Error:  {n_url} is not a valid AppRise URL.' | ||||
|  | ||||
|         try: | ||||
|             # use the same as when it is triggered, but then override it with the form test values | ||||
|             n_object = { | ||||
|             n_object = NotificationContextData({ | ||||
|                 'watch_url': request.form.get('window_url', "https://changedetection.io"), | ||||
|                 'notification_urls': notification_urls | ||||
|             } | ||||
|             }) | ||||
|  | ||||
|             # Only use if present, if not set in n_object it should use the default system value | ||||
|             if 'notification_format' in request.form and request.form['notification_format'].strip(): | ||||
|   | ||||
| @@ -64,6 +64,19 @@ class Fetcher(): | ||||
|     # Time ONTOP of the system defined env minimum time | ||||
|     render_extract_delay = 0 | ||||
|  | ||||
|     def clear_content(self): | ||||
|         """ | ||||
|         Explicitly clear all content from memory to free up heap space. | ||||
|         Call this after content has been saved to disk. | ||||
|         """ | ||||
|         self.content = None | ||||
|         if hasattr(self, 'raw_content'): | ||||
|             self.raw_content = None | ||||
|         self.screenshot = None | ||||
|         self.xpath_data = None | ||||
|         # Keep headers and status_code as they're small | ||||
|         logger.trace("Fetcher content cleared from memory") | ||||
|  | ||||
|     @abstractmethod | ||||
|     def get_error(self): | ||||
|         return self.error | ||||
| @@ -128,7 +141,7 @@ class Fetcher(): | ||||
|     async def iterate_browser_steps(self, start_url=None): | ||||
|         from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface | ||||
|         from playwright._impl._errors import TimeoutError, Error | ||||
|         from changedetectionio.safe_jinja import render as jinja_render | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|         step_n = 0 | ||||
|  | ||||
|         if self.browser_steps is not None and len(self.browser_steps): | ||||
|   | ||||
| @@ -51,6 +51,7 @@ class fetcher(Fetcher): | ||||
|  | ||||
|         session = requests.Session() | ||||
|  | ||||
|  | ||||
|         if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'): | ||||
|             from requests_file import FileAdapter | ||||
|             session.mount('file://', FileAdapter()) | ||||
|   | ||||
| @@ -795,7 +795,7 @@ def ticker_thread_check_time_launch_checks(): | ||||
|             else: | ||||
|                 time_schedule_limit = watch.get('time_schedule_limit') | ||||
|                 logger.trace(f"{uuid} Time scheduler - Using watch settings (not global settings)") | ||||
|             tz_name = datastore.data['settings']['application'].get('timezone', 'UTC') | ||||
|             tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip()) | ||||
|  | ||||
|             if time_schedule_limit and time_schedule_limit.get('enabled'): | ||||
|                 try: | ||||
|   | ||||
| @@ -5,6 +5,7 @@ from wtforms.widgets.core import TimeInput | ||||
|  | ||||
| from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES | ||||
| from changedetectionio.conditions.form import ConditionFormRow | ||||
| from changedetectionio.notification_service import NotificationContextData | ||||
| from changedetectionio.strtobool import strtobool | ||||
|  | ||||
| from wtforms import ( | ||||
| @@ -469,11 +470,16 @@ class ValidateAppRiseServers(object): | ||||
|         import apprise | ||||
|         from .notification.apprise_plugin.assets import apprise_asset | ||||
|         from .notification.apprise_plugin.custom_handlers import apprise_http_custom_handler  # noqa: F401 | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|  | ||||
|         apobj = apprise.Apprise(asset=apprise_asset) | ||||
|  | ||||
|         for server_url in field.data: | ||||
|             url = server_url.strip() | ||||
|             generic_notification_context_data = NotificationContextData() | ||||
|             # Make sure something is atleast in all those regular token fields | ||||
|             generic_notification_context_data.set_random_for_validation() | ||||
|  | ||||
|             url = jinja_render(template_str=server_url.strip(), **generic_notification_context_data).strip() | ||||
|             if url.startswith("#"): | ||||
|                 continue | ||||
|  | ||||
| @@ -487,9 +493,8 @@ class ValidateJinja2Template(object): | ||||
|     """ | ||||
|     def __call__(self, form, field): | ||||
|         from changedetectionio import notification | ||||
|  | ||||
|         from changedetectionio.jinja2_custom import create_jinja_env | ||||
|         from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError | ||||
|         from jinja2.sandbox import ImmutableSandboxedEnvironment | ||||
|         from jinja2.meta import find_undeclared_variables | ||||
|         import jinja2.exceptions | ||||
|  | ||||
| @@ -497,9 +502,11 @@ class ValidateJinja2Template(object): | ||||
|         joined_data = ' '.join(map(str, field.data)) if isinstance(field.data, list) else f"{field.data}" | ||||
|  | ||||
|         try: | ||||
|             jinja2_env = ImmutableSandboxedEnvironment(loader=BaseLoader, extensions=['jinja2_time.TimeExtension']) | ||||
|             jinja2_env.globals.update(notification.valid_tokens) | ||||
|             # Extra validation tokens provided on the form_class(... extra_tokens={}) setup | ||||
|             # Use the shared helper to create a properly configured environment | ||||
|             jinja2_env = create_jinja_env(loader=BaseLoader) | ||||
|  | ||||
|             # Add notification tokens for validation | ||||
|             jinja2_env.globals.update(NotificationContextData()) | ||||
|             if hasattr(field, 'extra_notification_tokens'): | ||||
|                 jinja2_env.globals.update(field.extra_notification_tokens) | ||||
|  | ||||
| @@ -511,6 +518,7 @@ class ValidateJinja2Template(object): | ||||
|         except jinja2.exceptions.SecurityError as e: | ||||
|             raise ValidationError(f"This is not a valid Jinja2 template: {e}") from e | ||||
|  | ||||
|         # Check for undeclared variables | ||||
|         ast = jinja2_env.parse(joined_data) | ||||
|         undefined = ", ".join(find_undeclared_variables(ast)) | ||||
|         if undefined: | ||||
| @@ -678,6 +686,51 @@ class ValidateCSSJSONXPATHInput(object): | ||||
|                 except: | ||||
|                     raise ValidationError("A system-error occurred when validating your jq expression") | ||||
|  | ||||
| class ValidateSimpleURL: | ||||
|     """Validate that the value can be parsed by urllib.parse.urlparse() and has a scheme/netloc.""" | ||||
|     def __init__(self, message=None): | ||||
|         self.message = message or "Invalid URL." | ||||
|  | ||||
|     def __call__(self, form, field): | ||||
|         data = (field.data or "").strip() | ||||
|         if not data: | ||||
|             return  # empty is OK — pair with validators.Optional() | ||||
|         from urllib.parse import urlparse | ||||
|  | ||||
|         parsed = urlparse(data) | ||||
|         if not parsed.scheme or not parsed.netloc: | ||||
|             raise ValidationError(self.message) | ||||
|  | ||||
| class ValidateStartsWithRegex(object): | ||||
|     def __init__(self, regex, *, flags=0, message=None, allow_empty=True, split_lines=True): | ||||
|         # compile with given flags (we’ll pass re.IGNORECASE below) | ||||
|         self.pattern = re.compile(regex, flags) if isinstance(regex, str) else regex | ||||
|         self.message = message | ||||
|         self.allow_empty = allow_empty | ||||
|         self.split_lines = split_lines | ||||
|  | ||||
|     def __call__(self, form, field): | ||||
|         data = field.data | ||||
|         if not data: | ||||
|             return | ||||
|  | ||||
|         # normalize into list of lines | ||||
|         if isinstance(data, str) and self.split_lines: | ||||
|             lines = data.splitlines() | ||||
|         elif isinstance(data, (list, tuple)): | ||||
|             lines = data | ||||
|         else: | ||||
|             lines = [data] | ||||
|  | ||||
|         for line in lines: | ||||
|             stripped = line.strip() | ||||
|             if not stripped: | ||||
|                 if self.allow_empty: | ||||
|                     continue | ||||
|                 raise ValidationError(self.message or "Empty value not allowed.") | ||||
|             if not self.pattern.match(stripped): | ||||
|                 raise ValidationError(self.message or "Invalid value.") | ||||
|  | ||||
| class quickWatchForm(Form): | ||||
|     from . import processors | ||||
|  | ||||
| @@ -705,7 +758,7 @@ class commonSettingsForm(Form): | ||||
|     notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()]) | ||||
|     notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()]) | ||||
|     processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff") | ||||
|     timezone = StringField("Timezone for watch schedule", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()]) | ||||
|     scheduler_timezone_default = StringField("Default timezone for watch check scheduler", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()]) | ||||
|     webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")]) | ||||
|  | ||||
|  | ||||
| @@ -795,7 +848,7 @@ class processor_text_json_diff_form(commonSettingsForm): | ||||
|         if not super().validate(): | ||||
|             return False | ||||
|  | ||||
|         from changedetectionio.safe_jinja import render as jinja_render | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|         result = True | ||||
|  | ||||
|         # Fail form validation when a body is set for a GET | ||||
| @@ -858,23 +911,36 @@ class processor_text_json_diff_form(commonSettingsForm): | ||||
|     ): | ||||
|         super().__init__(formdata, obj, prefix, data, meta, **kwargs) | ||||
|         if kwargs and kwargs.get('default_system_settings'): | ||||
|             default_tz = kwargs.get('default_system_settings').get('application', {}).get('timezone') | ||||
|             default_tz = kwargs.get('default_system_settings').get('application', {}).get('scheduler_timezone_default') | ||||
|             if default_tz: | ||||
|                 self.time_schedule_limit.form.timezone.render_kw['placeholder'] = default_tz | ||||
|  | ||||
|  | ||||
|  | ||||
| class SingleExtraProxy(Form): | ||||
|  | ||||
|     # maybe better to set some <script>var.. | ||||
|     proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"}) | ||||
|     proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50}) | ||||
|     # @todo do the validation here instead | ||||
|     proxy_url = StringField('Proxy URL', [ | ||||
|         validators.Optional(), | ||||
|         ValidateStartsWithRegex( | ||||
|             regex=r'^(https?|socks5)://',  # ✅ main pattern | ||||
|             flags=re.IGNORECASE,  # ✅ makes it case-insensitive | ||||
|             message='Proxy URLs must start with http://, https:// or socks5://', | ||||
|         ), | ||||
|         ValidateSimpleURL() | ||||
|     ], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50}) | ||||
|  | ||||
| class SingleExtraBrowser(Form): | ||||
|     browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"}) | ||||
|     browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50}) | ||||
|     # @todo do the validation here instead | ||||
|     browser_connection_url = StringField('Browser connection URL', [ | ||||
|         validators.Optional(), | ||||
|         ValidateStartsWithRegex( | ||||
|             regex=r'^(wss?|ws)://', | ||||
|             flags=re.IGNORECASE, | ||||
|             message='Browser URLs must start with wss:// or ws://' | ||||
|         ), | ||||
|         ValidateSimpleURL() | ||||
|     ], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50}) | ||||
|  | ||||
| class DefaultUAInputForm(Form): | ||||
|     html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"}) | ||||
| @@ -885,7 +951,7 @@ class DefaultUAInputForm(Form): | ||||
| class globalSettingsRequestForm(Form): | ||||
|     time_between_check = RequiredFormField(TimeBetweenCheckForm) | ||||
|     time_schedule_limit = FormField(ScheduleLimitForm) | ||||
|     proxy = RadioField('Proxy') | ||||
|     proxy = RadioField('Default proxy') | ||||
|     jitter_seconds = IntegerField('Random jitter seconds ± check', | ||||
|                                   render_kw={"style": "width: 5em;"}, | ||||
|                                   validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")]) | ||||
| @@ -894,7 +960,12 @@ class globalSettingsRequestForm(Form): | ||||
|                           render_kw={"style": "width: 5em;"}, | ||||
|                           validators=[validators.NumberRange(min=1, max=50, | ||||
|                                                              message="Should be between 1 and 50")]) | ||||
|      | ||||
|  | ||||
|     timeout = IntegerField('Requests timeout in seconds', | ||||
|                            render_kw={"style": "width: 5em;"}, | ||||
|                            validators=[validators.NumberRange(min=1, max=999, | ||||
|                                                               message="Should be between 1 and 999")]) | ||||
|  | ||||
|     extra_proxies = FieldList(FormField(SingleExtraProxy), min_entries=5) | ||||
|     extra_browsers = FieldList(FormField(SingleExtraBrowser), min_entries=5) | ||||
|  | ||||
| @@ -940,6 +1011,10 @@ class globalSettingsApplicationForm(commonSettingsForm): | ||||
|     strip_ignored_lines = BooleanField('Strip ignored lines') | ||||
|     rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True, | ||||
|                                       validators=[validators.Optional()]) | ||||
|  | ||||
|     rss_reader_mode = BooleanField('RSS reader mode ', default=False, | ||||
|                                       validators=[validators.Optional()]) | ||||
|  | ||||
|     filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification', | ||||
|                                                                   render_kw={"style": "width: 5em;"}, | ||||
|                                                                   validators=[validators.NumberRange(min=0, | ||||
|   | ||||
							
								
								
									
										20
									
								
								changedetectionio/jinja2_custom/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										20
									
								
								changedetectionio/jinja2_custom/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,20 @@ | ||||
| """ | ||||
| Jinja2 custom extensions and safe rendering utilities. | ||||
| """ | ||||
| from .extensions.TimeExtension import TimeExtension | ||||
| from .safe_jinja import ( | ||||
|     render, | ||||
|     render_fully_escaped, | ||||
|     create_jinja_env, | ||||
|     JINJA2_MAX_RETURN_PAYLOAD_SIZE, | ||||
|     DEFAULT_JINJA2_EXTENSIONS, | ||||
| ) | ||||
|  | ||||
| __all__ = [ | ||||
|     'TimeExtension', | ||||
|     'render', | ||||
|     'render_fully_escaped', | ||||
|     'create_jinja_env', | ||||
|     'JINJA2_MAX_RETURN_PAYLOAD_SIZE', | ||||
|     'DEFAULT_JINJA2_EXTENSIONS', | ||||
| ] | ||||
							
								
								
									
										221
									
								
								changedetectionio/jinja2_custom/extensions/TimeExtension.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										221
									
								
								changedetectionio/jinja2_custom/extensions/TimeExtension.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,221 @@ | ||||
| """ | ||||
| Jinja2 TimeExtension - Custom date/time handling for templates. | ||||
|  | ||||
| This extension provides the {% now %} tag for Jinja2 templates, offering timezone-aware | ||||
| date/time formatting with support for time offsets. | ||||
|  | ||||
| Why This Extension Exists: | ||||
|     The Arrow library has a now() function (arrow.now()), but Jinja2 templates cannot | ||||
|     directly call Python functions - they need extensions or filters to expose functionality. | ||||
|  | ||||
|     This TimeExtension serves as a Jinja2-to-Arrow bridge that: | ||||
|  | ||||
|     1. Makes Arrow accessible in templates - Jinja2 requires registering functions/tags | ||||
|        through extensions. You cannot use arrow.now() directly in a template. | ||||
|  | ||||
|     2. Provides template-friendly syntax - Instead of complex Python code, you get clean tags: | ||||
|        {% now 'UTC' %} | ||||
|        {% now 'UTC' + 'hours=2' %} | ||||
|        {% now 'Europe/London', '%Y-%m-%d' %} | ||||
|  | ||||
|     3. Adds convenience features on top of Arrow: | ||||
|        - Default timezone from environment variable (TZ) or config | ||||
|        - Default datetime format configuration | ||||
|        - Offset syntax parsing: 'hours=2,minutes=30' → shift(hours=2, minutes=30) | ||||
|        - Empty string timezone support to use configured defaults | ||||
|  | ||||
|     4. Maintains security - Works within Jinja2's sandboxed environment so users | ||||
|        cannot access arbitrary Python code or objects. | ||||
|  | ||||
|     Essentially, this is a Jinja2 wrapper around arrow.now() and arrow.shift() that | ||||
|     provides user-friendly template syntax while maintaining security. | ||||
|  | ||||
| Basic Usage: | ||||
|     {% now 'UTC' %} | ||||
|     # Output: Wed, 09 Dec 2015 23:33:01 | ||||
|  | ||||
| Custom Format: | ||||
|     {% now 'UTC', '%Y-%m-%d %H:%M:%S' %} | ||||
|     # Output: 2015-12-09 23:33:01 | ||||
|  | ||||
| Timezone Support: | ||||
|     {% now 'America/New_York' %} | ||||
|     {% now 'Europe/London' %} | ||||
|     {% now '' %}  # Uses default timezone from environment.default_timezone | ||||
|  | ||||
| Time Offsets (Addition): | ||||
|     {% now 'UTC' + 'hours=2' %} | ||||
|     {% now 'UTC' + 'hours=2,minutes=30' %} | ||||
|     {% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10' %} | ||||
|  | ||||
| Time Offsets (Subtraction): | ||||
|     {% now 'UTC' - 'minutes=11' %} | ||||
|     {% now 'UTC' - 'days=2,minutes=33,seconds=1' %} | ||||
|  | ||||
| Time Offsets with Custom Format: | ||||
|     {% now 'UTC' + 'hours=2', '%Y-%m-%d %H:%M:%S' %} | ||||
|     # Output: 2015-12-10 01:33:01 | ||||
|  | ||||
| Weekday Support (for finding next/previous weekday): | ||||
|     {% now 'UTC' + 'weekday=0' %}  # Next Monday (0=Monday, 6=Sunday) | ||||
|     {% now 'UTC' + 'weekday=4' %}  # Next Friday | ||||
|  | ||||
| Configuration: | ||||
|     - Default timezone: Set via TZ environment variable or override environment.default_timezone | ||||
|     - Default format: '%a, %d %b %Y %H:%M:%S' (can be overridden via environment.datetime_format) | ||||
|  | ||||
| Environment Customization: | ||||
|     from changedetectionio.jinja2_custom import create_jinja_env | ||||
|  | ||||
|     jinja2_env = create_jinja_env() | ||||
|     jinja2_env.default_timezone = 'America/New_York'  # Override default timezone | ||||
|     jinja2_env.datetime_format = '%Y-%m-%d %H:%M'      # Override default format | ||||
|  | ||||
| Supported Offset Parameters: | ||||
|     - years, months, weeks, days | ||||
|     - hours, minutes, seconds, microseconds | ||||
|     - weekday (0=Monday through 6=Sunday, must be integer) | ||||
|  | ||||
| Note: | ||||
|     This extension uses the Arrow library for timezone-aware datetime handling. | ||||
|     All timezone names should be valid IANA timezone identifiers (e.g., 'America/New_York'). | ||||
| """ | ||||
| import arrow | ||||
|  | ||||
| from jinja2 import nodes | ||||
| from jinja2.ext import Extension | ||||
| import os | ||||
|  | ||||
| class TimeExtension(Extension): | ||||
|     """ | ||||
|     Jinja2 Extension providing the {% now %} tag for timezone-aware date/time rendering. | ||||
|  | ||||
|     This extension adds two attributes to the Jinja2 environment: | ||||
|     - datetime_format: Default strftime format string (default: '%a, %d %b %Y %H:%M:%S') | ||||
|     - default_timezone: Default timezone for rendering (default: TZ env var or 'UTC') | ||||
|  | ||||
|     Both can be overridden after environment creation by setting the attributes directly. | ||||
|     """ | ||||
|  | ||||
|     tags = {'now'} | ||||
|  | ||||
|     def __init__(self, environment): | ||||
|         """Jinja2 Extension constructor.""" | ||||
|         super().__init__(environment) | ||||
|  | ||||
|         environment.extend( | ||||
|             datetime_format='%a, %d %b %Y %H:%M:%S', | ||||
|             default_timezone=os.getenv('TZ', 'UTC').strip() | ||||
|         ) | ||||
|  | ||||
|     def _datetime(self, timezone, operator, offset, datetime_format): | ||||
|         """ | ||||
|         Get current datetime with time offset applied. | ||||
|  | ||||
|         Args: | ||||
|             timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default | ||||
|             operator: '+' for addition or '-' for subtraction | ||||
|             offset: Comma-separated offset parameters (e.g., 'hours=2,minutes=30') | ||||
|             datetime_format: strftime format string or None to use environment default | ||||
|  | ||||
|         Returns: | ||||
|             Formatted datetime string with offset applied | ||||
|  | ||||
|         Example: | ||||
|             _datetime('UTC', '+', 'hours=2,minutes=30', '%Y-%m-%d %H:%M:%S') | ||||
|             # Returns current time + 2.5 hours | ||||
|         """ | ||||
|         # Use default timezone if none specified | ||||
|         if not timezone or timezone == '': | ||||
|             timezone = self.environment.default_timezone | ||||
|  | ||||
|         d = arrow.now(timezone) | ||||
|  | ||||
|         # parse shift params from offset and include operator | ||||
|         shift_params = {} | ||||
|         for param in offset.split(','): | ||||
|             interval, value = param.split('=') | ||||
|             shift_params[interval.strip()] = float(operator + value.strip()) | ||||
|  | ||||
|         # Fix weekday parameter can not be float | ||||
|         if 'weekday' in shift_params: | ||||
|             shift_params['weekday'] = int(shift_params['weekday']) | ||||
|  | ||||
|         d = d.shift(**shift_params) | ||||
|  | ||||
|         if datetime_format is None: | ||||
|             datetime_format = self.environment.datetime_format | ||||
|         return d.strftime(datetime_format) | ||||
|  | ||||
|     def _now(self, timezone, datetime_format): | ||||
|         """ | ||||
|         Get current datetime without any offset. | ||||
|  | ||||
|         Args: | ||||
|             timezone: IANA timezone identifier (e.g., 'UTC', 'America/New_York') or empty string for default | ||||
|             datetime_format: strftime format string or None to use environment default | ||||
|  | ||||
|         Returns: | ||||
|             Formatted datetime string for current time | ||||
|  | ||||
|         Example: | ||||
|             _now('America/New_York', '%Y-%m-%d %H:%M:%S') | ||||
|             # Returns current time in New York timezone | ||||
|         """ | ||||
|         # Use default timezone if none specified | ||||
|         if not timezone or timezone == '': | ||||
|             timezone = self.environment.default_timezone | ||||
|  | ||||
|         if datetime_format is None: | ||||
|             datetime_format = self.environment.datetime_format | ||||
|         return arrow.now(timezone).strftime(datetime_format) | ||||
|  | ||||
|     def parse(self, parser): | ||||
|         """ | ||||
|         Parse the {% now %} tag and generate appropriate AST nodes. | ||||
|  | ||||
|         This method is called by Jinja2 when it encounters a {% now %} tag. | ||||
|         It parses the tag syntax and determines whether to call _now() or _datetime() | ||||
|         based on whether offset operations (+ or -) are present. | ||||
|  | ||||
|         Supported syntax: | ||||
|             {% now 'timezone' %}                              -> calls _now() | ||||
|             {% now 'timezone', 'format' %}                    -> calls _now() | ||||
|             {% now 'timezone' + 'offset' %}                   -> calls _datetime() | ||||
|             {% now 'timezone' + 'offset', 'format' %}         -> calls _datetime() | ||||
|             {% now 'timezone' - 'offset', 'format' %}         -> calls _datetime() | ||||
|  | ||||
|         Args: | ||||
|             parser: Jinja2 parser instance | ||||
|  | ||||
|         Returns: | ||||
|             nodes.Output: AST output node containing the formatted datetime string | ||||
|         """ | ||||
|         lineno = next(parser.stream).lineno | ||||
|  | ||||
|         node = parser.parse_expression() | ||||
|  | ||||
|         if parser.stream.skip_if('comma'): | ||||
|             datetime_format = parser.parse_expression() | ||||
|         else: | ||||
|             datetime_format = nodes.Const(None) | ||||
|  | ||||
|         if isinstance(node, nodes.Add): | ||||
|             call_method = self.call_method( | ||||
|                 '_datetime', | ||||
|                 [node.left, nodes.Const('+'), node.right, datetime_format], | ||||
|                 lineno=lineno, | ||||
|             ) | ||||
|         elif isinstance(node, nodes.Sub): | ||||
|             call_method = self.call_method( | ||||
|                 '_datetime', | ||||
|                 [node.left, nodes.Const('-'), node.right, datetime_format], | ||||
|                 lineno=lineno, | ||||
|             ) | ||||
|         else: | ||||
|             call_method = self.call_method( | ||||
|                 '_now', | ||||
|                 [node, datetime_format], | ||||
|                 lineno=lineno, | ||||
|             ) | ||||
|         return nodes.Output([call_method], lineno=lineno) | ||||
							
								
								
									
										55
									
								
								changedetectionio/jinja2_custom/safe_jinja.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										55
									
								
								changedetectionio/jinja2_custom/safe_jinja.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,55 @@ | ||||
| """ | ||||
| Safe Jinja2 render with max payload sizes | ||||
|  | ||||
| See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations | ||||
| """ | ||||
|  | ||||
| import jinja2.sandbox | ||||
| import typing as t | ||||
| import os | ||||
| from .extensions.TimeExtension import TimeExtension | ||||
|  | ||||
| JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10)) | ||||
|  | ||||
| # Default extensions - can be overridden in create_jinja_env() | ||||
| DEFAULT_JINJA2_EXTENSIONS = [TimeExtension] | ||||
|  | ||||
|  | ||||
| def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment: | ||||
|     """ | ||||
|     Create a sandboxed Jinja2 environment with our custom extensions and default timezone. | ||||
|  | ||||
|     Args: | ||||
|         extensions: List of extension classes to use (defaults to DEFAULT_JINJA2_EXTENSIONS) | ||||
|         **kwargs: Additional arguments to pass to ImmutableSandboxedEnvironment | ||||
|  | ||||
|     Returns: | ||||
|         Configured Jinja2 environment | ||||
|     """ | ||||
|     if extensions is None: | ||||
|         extensions = DEFAULT_JINJA2_EXTENSIONS | ||||
|  | ||||
|     jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment( | ||||
|         extensions=extensions, | ||||
|         **kwargs | ||||
|     ) | ||||
|  | ||||
|     # Get default timezone from environment variable | ||||
|     default_timezone = os.getenv('TZ', 'UTC').strip() | ||||
|     jinja2_env.default_timezone = default_timezone | ||||
|  | ||||
|     return jinja2_env | ||||
|  | ||||
|  | ||||
| # This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available. | ||||
| # (Which also limits available functions that could be called) | ||||
| def render(template_str, **args: t.Any) -> str: | ||||
|     jinja2_env = create_jinja_env() | ||||
|     output = jinja2_env.from_string(template_str).render(args) | ||||
|     return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE] | ||||
|  | ||||
| def render_fully_escaped(content): | ||||
|     env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True) | ||||
|     template = env.from_string("{{ some_html|e }}") | ||||
|     return template.render(some_html=content) | ||||
|  | ||||
| @@ -55,11 +55,12 @@ class model(dict): | ||||
|                     'rss_access_token': None, | ||||
|                     'rss_content_format': RSS_FORMAT_TYPES[0][0], | ||||
|                     'rss_hide_muted_watches': True, | ||||
|                     'rss_reader_mode': False, | ||||
|                     'scheduler_timezone_default': None,  # Default IANA timezone name | ||||
|                     'schema_version' : 0, | ||||
|                     'shared_diff_access': False, | ||||
|                     'strip_ignored_lines': False, | ||||
|                     'tags': {}, #@todo use Tag.model initialisers | ||||
|                     'timezone': None, # Default IANA timezone name | ||||
|                     'webdriver_delay': None , # Extra delay in seconds before extracting text | ||||
|                     'ui': { | ||||
|                         'use_page_title_in_list': True, | ||||
|   | ||||
| @@ -1,14 +1,14 @@ | ||||
| from blinker import signal | ||||
|  | ||||
| from changedetectionio.strtobool import strtobool | ||||
| from changedetectionio.safe_jinja import render as jinja_render | ||||
| from changedetectionio.jinja2_custom import render as jinja_render | ||||
| from . import watch_base | ||||
| import os | ||||
| import re | ||||
| from pathlib import Path | ||||
| from loguru import logger | ||||
|  | ||||
| from .. import safe_jinja | ||||
| from .. import jinja2_custom as safe_jinja | ||||
| from ..html_tools import TRANSLATE_WHITESPACE_TABLE | ||||
|  | ||||
| # Allowable protocols, protects against javascript: etc | ||||
|   | ||||
| @@ -16,20 +16,3 @@ valid_notification_formats = { | ||||
|     default_notification_format_for_watch: default_notification_format_for_watch | ||||
| } | ||||
|  | ||||
|  | ||||
| valid_tokens = { | ||||
|     'base_url': '', | ||||
|     'current_snapshot': '', | ||||
|     'diff': '', | ||||
|     'diff_added': '', | ||||
|     'diff_full': '', | ||||
|     'diff_patch': '', | ||||
|     'diff_removed': '', | ||||
|     'diff_url': '', | ||||
|     'preview_url': '', | ||||
|     'triggered_text': '', | ||||
|     'watch_tag': '', | ||||
|     'watch_title': '', | ||||
|     'watch_url': '', | ||||
|     'watch_uuid': '', | ||||
| } | ||||
|   | ||||
| @@ -3,16 +3,22 @@ import time | ||||
| import apprise | ||||
| from loguru import logger | ||||
| from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL | ||||
| from ..notification_service import NotificationContextData | ||||
|  | ||||
| def process_notification(n_object, datastore): | ||||
|     from changedetectionio.safe_jinja import render as jinja_render | ||||
|  | ||||
| def process_notification(n_object: NotificationContextData, datastore): | ||||
|     from changedetectionio.jinja2_custom import render as jinja_render | ||||
|     from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats | ||||
|     # be sure its registered | ||||
|     from .apprise_plugin.custom_handlers import apprise_http_custom_handler | ||||
|  | ||||
|     if not isinstance(n_object, NotificationContextData): | ||||
|         raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") | ||||
|  | ||||
|     now = time.time() | ||||
|     if n_object.get('notification_timestamp'): | ||||
|         logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s") | ||||
|  | ||||
|     # Insert variables into the notification content | ||||
|     notification_parameters = create_notification_parameters(n_object, datastore) | ||||
|  | ||||
| @@ -47,7 +53,7 @@ def process_notification(n_object, datastore): | ||||
|  | ||||
|             # Get the notification body from datastore | ||||
|             n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters) | ||||
|             if n_object.get('notification_format', '').startswith('HTML'): | ||||
|             if n_format.lower().startswith('html'): | ||||
|                 n_body = n_body.replace("\n", '<br>') | ||||
|  | ||||
|             n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters) | ||||
| @@ -141,17 +147,15 @@ def process_notification(n_object, datastore): | ||||
|  | ||||
| # Notification title + body content parameters get created here. | ||||
| # ( Where we prepare the tokens in the notification to be replaced with actual values ) | ||||
| def create_notification_parameters(n_object, datastore): | ||||
|     from copy import deepcopy | ||||
|     from . import valid_tokens | ||||
| def create_notification_parameters(n_object: NotificationContextData, datastore): | ||||
|     if not isinstance(n_object, NotificationContextData): | ||||
|         raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") | ||||
|  | ||||
|     # in the case we send a test notification from the main settings, there is no UUID. | ||||
|     uuid = n_object['uuid'] if 'uuid' in n_object else '' | ||||
|  | ||||
|     if uuid: | ||||
|         watch_title = datastore.data['watching'][uuid].label | ||||
|     watch = datastore.data['watching'].get(n_object['uuid']) | ||||
|     if watch: | ||||
|         watch_title = datastore.data['watching'][n_object['uuid']].label | ||||
|         tag_list = [] | ||||
|         tags = datastore.get_all_tags_for_watch(uuid) | ||||
|         tags = datastore.get_all_tags_for_watch(n_object['uuid']) | ||||
|         if tags: | ||||
|             for tag_uuid, tag in tags.items(): | ||||
|                 tag_list.append(tag.get('title')) | ||||
| @@ -166,14 +170,10 @@ def create_notification_parameters(n_object, datastore): | ||||
|  | ||||
|     watch_url = n_object['watch_url'] | ||||
|  | ||||
|     diff_url = "{}/diff/{}".format(base_url, uuid) | ||||
|     preview_url = "{}/preview/{}".format(base_url, uuid) | ||||
|     diff_url = "{}/diff/{}".format(base_url, n_object['uuid']) | ||||
|     preview_url = "{}/preview/{}".format(base_url, n_object['uuid']) | ||||
|  | ||||
|     # Not sure deepcopy is needed here, but why not | ||||
|     tokens = deepcopy(valid_tokens) | ||||
|  | ||||
|     # Valid_tokens also used as a field validator | ||||
|     tokens.update( | ||||
|     n_object.update( | ||||
|         { | ||||
|             'base_url': base_url, | ||||
|             'diff_url': diff_url, | ||||
| @@ -181,13 +181,10 @@ def create_notification_parameters(n_object, datastore): | ||||
|             'watch_tag': watch_tag if watch_tag is not None else '', | ||||
|             'watch_title': watch_title if watch_title is not None else '', | ||||
|             'watch_url': watch_url, | ||||
|             'watch_uuid': uuid, | ||||
|             'watch_uuid': n_object['uuid'], | ||||
|         }) | ||||
|  | ||||
|     # n_object will contain diff, diff_added etc etc | ||||
|     tokens.update(n_object) | ||||
|     if watch: | ||||
|         n_object.update(datastore.data['watching'].get(n_object['uuid']).extra_notification_token_values()) | ||||
|  | ||||
|     if uuid: | ||||
|         tokens.update(datastore.data['watching'].get(uuid).extra_notification_token_values()) | ||||
|  | ||||
|     return tokens | ||||
|     return n_object | ||||
|   | ||||
| @@ -6,9 +6,48 @@ Extracted from update_worker.py to provide standalone notification functionality | ||||
| for both sync and async workers | ||||
| """ | ||||
|  | ||||
| import time | ||||
| from loguru import logger | ||||
| import time | ||||
|  | ||||
| # What is passed around as notification context, also used as the complete list of valid {{ tokens }} | ||||
| class NotificationContextData(dict): | ||||
|     def __init__(self, initial_data=None, **kwargs): | ||||
|         super().__init__({ | ||||
|             'current_snapshot': None, | ||||
|             'diff': None, | ||||
|             'diff_added': None, | ||||
|             'diff_full': None, | ||||
|             'diff_patch': None, | ||||
|             'diff_removed': None, | ||||
|             'notification_timestamp': time.time(), | ||||
|             'screenshot': None, | ||||
|             'triggered_text': None, | ||||
|             'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX',  # Converted to 'watch_uuid' in create_notification_parameters | ||||
|             'watch_url': 'https://WATCH-PLACE-HOLDER/', | ||||
|             'base_url': None, | ||||
|             'diff_url': None, | ||||
|             'preview_url': None, | ||||
|             'watch_tag': None, | ||||
|             'watch_title': None | ||||
|         }) | ||||
|  | ||||
|         # Apply any initial data passed in | ||||
|         self.update({'watch_uuid': self.get('uuid')}) | ||||
|         if initial_data: | ||||
|             self.update(initial_data) | ||||
|  | ||||
|         # Apply any keyword arguments | ||||
|         if kwargs: | ||||
|             self.update(kwargs) | ||||
|  | ||||
|     def set_random_for_validation(self): | ||||
|         import random, string | ||||
|         """Randomly fills all dict keys with random strings (for validation/testing).""" | ||||
|         for key in self.keys(): | ||||
|             if key in ['uuid', 'time', 'watch_uuid']: | ||||
|                 continue | ||||
|             rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12)) | ||||
|             self[key] = rand_str | ||||
|  | ||||
| class NotificationService: | ||||
|     """ | ||||
| @@ -20,13 +59,16 @@ class NotificationService: | ||||
|         self.datastore = datastore | ||||
|         self.notification_q = notification_q | ||||
|      | ||||
|     def queue_notification_for_watch(self, n_object, watch): | ||||
|     def queue_notification_for_watch(self, n_object: NotificationContextData, watch): | ||||
|         """ | ||||
|         Queue a notification for a watch with full diff rendering and template variables | ||||
|         """ | ||||
|         from changedetectionio import diff | ||||
|         from changedetectionio.notification import default_notification_format_for_watch | ||||
|  | ||||
|         if not isinstance(n_object, NotificationContextData): | ||||
|             raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") | ||||
|  | ||||
|         dates = [] | ||||
|         trigger_text = '' | ||||
|  | ||||
| @@ -79,15 +121,15 @@ class NotificationService: | ||||
|         n_object.update({ | ||||
|             'current_snapshot': snapshot_contents, | ||||
|             'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), | ||||
|             'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep), | ||||
|             'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), | ||||
|             'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), | ||||
|             'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True), | ||||
|             'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep), | ||||
|             'notification_timestamp': now, | ||||
|             'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable), | ||||
|             'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None, | ||||
|             'triggered_text': triggered_text, | ||||
|             'uuid': watch.get('uuid') if watch else None, | ||||
|             'watch_url': watch.get('url') if watch else None, | ||||
|             'watch_uuid': watch.get('uuid') if watch else None, | ||||
|         }) | ||||
|  | ||||
|         if watch: | ||||
| @@ -140,7 +182,7 @@ class NotificationService: | ||||
|         """ | ||||
|         Send notification when content changes are detected | ||||
|         """ | ||||
|         n_object = {} | ||||
|         n_object = NotificationContextData() | ||||
|         watch = self.datastore.data['watching'].get(watch_uuid) | ||||
|         if not watch: | ||||
|             return | ||||
| @@ -183,11 +225,13 @@ class NotificationService: | ||||
|         if not watch: | ||||
|             return | ||||
|  | ||||
|         n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', | ||||
|                     'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( | ||||
|                         ", ".join(watch['include_filters']), | ||||
|                         threshold), | ||||
|                     'notification_format': 'text'} | ||||
|         n_object = NotificationContextData({ | ||||
|             'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', | ||||
|             'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format( | ||||
|                 ", ".join(watch['include_filters']), | ||||
|                 threshold), | ||||
|             'notification_format': 'text' | ||||
|         }) | ||||
|  | ||||
|         if len(watch['notification_urls']): | ||||
|             n_object['notification_urls'] = watch['notification_urls'] | ||||
| @@ -215,12 +259,14 @@ class NotificationService: | ||||
|         if not watch: | ||||
|             return | ||||
|         threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') | ||||
|         n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1), | ||||
|                     'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} " | ||||
|                                          "did not appear on the page after {} attempts, did the page change layout? " | ||||
|                                          "Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n" | ||||
|                                          "Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold), | ||||
|                     'notification_format': 'text'} | ||||
|         n_object = NotificationContextData({ | ||||
|             'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1), | ||||
|             'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} " | ||||
|                                  "did not appear on the page after {} attempts, did the page change layout? " | ||||
|                                  "Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n" | ||||
|                                  "Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold), | ||||
|             'notification_format': 'text' | ||||
|         }) | ||||
|  | ||||
|         if len(watch['notification_urls']): | ||||
|             n_object['notification_urls'] = watch['notification_urls'] | ||||
|   | ||||
| @@ -102,7 +102,7 @@ class difference_detection_processor(): | ||||
|             self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid')) | ||||
|  | ||||
|         # Tweak the base config with the per-watch ones | ||||
|         from changedetectionio.safe_jinja import render as jinja_render | ||||
|         from changedetectionio.jinja2_custom import render as jinja_render | ||||
|         request_headers = CaseInsensitiveDict() | ||||
|  | ||||
|         ua = self.datastore.data['settings']['requests'].get('default_ua') | ||||
|   | ||||
| @@ -64,24 +64,31 @@ class guess_stream_type(): | ||||
|         # Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.) | ||||
|         test_content_normalized = re.sub(r'<\s+', '<', test_content) | ||||
|  | ||||
|         # Magic will sometimes call text/plain as text/html! | ||||
|         # Use puremagic for lightweight MIME detection (saves ~14MB vs python-magic) | ||||
|         magic_result = None | ||||
|         try: | ||||
|             import magic | ||||
|             import puremagic | ||||
|  | ||||
|             mime = magic.from_buffer(content[:200], mime=True) # Send the original content | ||||
|             logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'") | ||||
|             if mime and "/" in mime: | ||||
|                 magic_result = mime | ||||
|                 # Ignore generic/fallback mime types from magic | ||||
|                 if mime in ['application/octet-stream', 'application/x-empty', 'binary']: | ||||
|                     logger.debug(f"Ignoring generic mime type '{mime}' from magic library") | ||||
|                 # Trust magic for non-text types immediately | ||||
|                 elif mime not in ['text/html', 'text/plain']: | ||||
|                     magic_content_header = mime | ||||
|             # puremagic needs bytes, so encode if we have a string | ||||
|             content_bytes = content[:200].encode('utf-8') if isinstance(content, str) else content[:200] | ||||
|  | ||||
|             # puremagic returns a list of PureMagic objects with confidence scores | ||||
|             detections = puremagic.magic_string(content_bytes) | ||||
|             if detections: | ||||
|                 # Get the highest confidence detection | ||||
|                 mime = detections[0].mime_type | ||||
|                 logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'") | ||||
|                 if mime and "/" in mime: | ||||
|                     magic_result = mime | ||||
|                     # Ignore generic/fallback mime types | ||||
|                     if mime in ['application/octet-stream', 'application/x-empty', 'binary']: | ||||
|                         logger.debug(f"Ignoring generic mime type '{mime}' from puremagic library") | ||||
|                     # Trust puremagic for non-text types immediately | ||||
|                     elif mime not in ['text/html', 'text/plain']: | ||||
|                         magic_content_header = mime | ||||
|  | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection") | ||||
|             logger.error(f"Error getting a more precise mime type from 'puremagic' library ({str(e)}), using content-based detection") | ||||
|  | ||||
|         # Content-based detection (most reliable for text formats) | ||||
|         # Check for HTML patterns first - if found, override magic's text/plain | ||||
| @@ -103,7 +110,7 @@ class guess_stream_type(): | ||||
|         # magic will call a rss document 'xml' | ||||
|         # Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss | ||||
|         # This also triggers the automatic CDATA text parser so the RSS goes back a nice content list | ||||
|         elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES): | ||||
|         elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized: | ||||
|             self.is_rss = True | ||||
|         elif any(s in http_content_header for s in XML_CONTENT_TYPES): | ||||
|             # Only mark as generic XML if not already detected as RSS | ||||
|   | ||||
| @@ -228,8 +228,21 @@ class ContentProcessor: | ||||
|         self.datastore = datastore | ||||
|  | ||||
|     def preprocess_rss(self, content): | ||||
|         """Convert CDATA/comments in RSS to usable text.""" | ||||
|         return cdata_in_document_to_text(html_content=content) | ||||
|         """ | ||||
|         Convert CDATA/comments in RSS to usable text. | ||||
|  | ||||
|         Supports two RSS processing modes: | ||||
|         - 'default': Inline CDATA replacement (original behavior) | ||||
|         - 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked) | ||||
|         """ | ||||
|         from changedetectionio import rss_tools | ||||
|         rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode") | ||||
|         if rss_mode: | ||||
|             # Format RSS items nicely with CDATA content unmarked and converted to text | ||||
|             return rss_tools.format_rss_items(content) | ||||
|         else: | ||||
|             # Default: Original inline CDATA replacement | ||||
|             return cdata_in_document_to_text(html_content=content) | ||||
|  | ||||
|     def preprocess_pdf(self, raw_content): | ||||
|         """Convert PDF to HTML using external tool.""" | ||||
| @@ -384,6 +397,11 @@ class perform_site_check(difference_detection_processor): | ||||
|         # RSS preprocessing | ||||
|         if stream_content_type.is_rss: | ||||
|             content = content_processor.preprocess_rss(content) | ||||
|             if self.datastore.data["settings"]["application"].get("rss_reader_mode"): | ||||
|                 # Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc) | ||||
|                 stream_content_type.is_rss = False | ||||
|                 stream_content_type.is_html = True | ||||
|                 self.fetcher.content = content | ||||
|  | ||||
|         # PDF preprocessing | ||||
|         if watch.is_pdf or stream_content_type.is_pdf: | ||||
| @@ -538,6 +556,20 @@ class perform_site_check(difference_detection_processor): | ||||
|             else: | ||||
|                 logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content") | ||||
|  | ||||
|         # Note: Explicit cleanup is only needed here because text_json_diff handles | ||||
|         # large strings (100KB-300KB for RSS/HTML). The other processors work with | ||||
|         # small strings and don't need this. | ||||
|         # | ||||
|         # Python would clean these up automatically, but explicit `del` frees memory | ||||
|         # immediately rather than waiting for function return, reducing peak memory usage. | ||||
|         del content | ||||
|         if 'html_content' in locals() and html_content is not stripped_text: | ||||
|             del html_content | ||||
|         if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text: | ||||
|             del text_content_before_ignored_filter | ||||
|         if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text: | ||||
|             del text_for_checksuming | ||||
|  | ||||
|         return changed_detected, update_obj, stripped_text | ||||
|  | ||||
|     def _apply_diff_filtering(self, watch, stripped_text, text_before_filter): | ||||
|   | ||||
							
								
								
									
										130
									
								
								changedetectionio/rss_tools.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										130
									
								
								changedetectionio/rss_tools.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,130 @@ | ||||
| """ | ||||
| RSS/Atom feed processing tools for changedetection.io | ||||
| """ | ||||
|  | ||||
| from loguru import logger | ||||
| import re | ||||
|  | ||||
|  | ||||
| def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str: | ||||
|     """ | ||||
|     Process CDATA sections in HTML/XML content - inline replacement. | ||||
|  | ||||
|     Args: | ||||
|         html_content: The HTML/XML content to process | ||||
|         render_anchor_tag_content: Whether to render anchor tag content | ||||
|  | ||||
|     Returns: | ||||
|         Processed HTML/XML content with CDATA sections replaced inline | ||||
|     """ | ||||
|     from xml.sax.saxutils import escape as xml_escape | ||||
|     from .html_tools import html_to_text | ||||
|  | ||||
|     pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>' | ||||
|  | ||||
|     def repl(m): | ||||
|         text = m.group(1) | ||||
|         return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip() | ||||
|  | ||||
|     return re.sub(pattern, repl, html_content) | ||||
|  | ||||
|  | ||||
| def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str: | ||||
|     """ | ||||
|     Format RSS/Atom feed items in a readable text format using feedparser. | ||||
|  | ||||
|     Converts RSS <item> or Atom <entry> elements to formatted text with: | ||||
|     - <title> → <h1>Title</h1> | ||||
|     - <link> → Link: [url] | ||||
|     - <guid> → Guid: [id] | ||||
|     - <pubDate> → PubDate: [date] | ||||
|     - <description> or <content> → Raw HTML content (CDATA and entities automatically handled) | ||||
|  | ||||
|     Args: | ||||
|         rss_content: The RSS/Atom feed content | ||||
|         render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility) | ||||
|  | ||||
|     Returns: | ||||
|         Formatted HTML content ready for html_to_text conversion | ||||
|     """ | ||||
|     try: | ||||
|         import feedparser | ||||
|         from xml.sax.saxutils import escape as xml_escape | ||||
|  | ||||
|         # Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc. | ||||
|         feed = feedparser.parse(rss_content) | ||||
|  | ||||
|         formatted_items = [] | ||||
|  | ||||
|         # Determine feed type for appropriate labels when fields are missing | ||||
|         # feedparser sets feed.version to things like 'rss20', 'atom10', etc. | ||||
|         is_atom = feed.version and 'atom' in feed.version | ||||
|  | ||||
|         for entry in feed.entries: | ||||
|             item_parts = [] | ||||
|  | ||||
|             # Title - feedparser handles CDATA and entity unescaping automatically | ||||
|             if hasattr(entry, 'title') and entry.title: | ||||
|                 item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>') | ||||
|  | ||||
|             # Link | ||||
|             if hasattr(entry, 'link') and entry.link: | ||||
|                 item_parts.append(f'Link: {xml_escape(entry.link)}<br>') | ||||
|  | ||||
|             # GUID/ID | ||||
|             if hasattr(entry, 'id') and entry.id: | ||||
|                 item_parts.append(f'Guid: {xml_escape(entry.id)}<br>') | ||||
|  | ||||
|             # Date - feedparser normalizes all date field names to 'published' | ||||
|             if hasattr(entry, 'published') and entry.published: | ||||
|                 item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>') | ||||
|  | ||||
|             # Description/Content - feedparser handles CDATA and entity unescaping automatically | ||||
|             # Only add "Summary:" label for Atom <summary> tags | ||||
|             content = None | ||||
|             add_label = False | ||||
|  | ||||
|             if hasattr(entry, 'content') and entry.content: | ||||
|                 # Atom <content> - no label, just content | ||||
|                 content = entry.content[0].value if entry.content[0].value else None | ||||
|             elif hasattr(entry, 'summary'): | ||||
|                 # Could be RSS <description> or Atom <summary> | ||||
|                 # feedparser maps both to entry.summary | ||||
|                 content = entry.summary if entry.summary else None | ||||
|                 # Only add "Summary:" label for Atom feeds (which use <summary> tag) | ||||
|                 if is_atom: | ||||
|                     add_label = True | ||||
|  | ||||
|             # Add content with or without label | ||||
|             if content: | ||||
|                 if add_label: | ||||
|                     item_parts.append(f'Summary:<br>{content}') | ||||
|                 else: | ||||
|                     item_parts.append(content) | ||||
|             else: | ||||
|                 # No content - just show <none> | ||||
|                 item_parts.append('<none>') | ||||
|  | ||||
|             # Join all parts of this item | ||||
|             if item_parts: | ||||
|                 formatted_items.append('\n'.join(item_parts)) | ||||
|  | ||||
|         # Wrap each item in a div with classes (first, last, item-N) | ||||
|         items_html = [] | ||||
|         total_items = len(formatted_items) | ||||
|         for idx, item in enumerate(formatted_items): | ||||
|             classes = ['rss-item'] | ||||
|             if idx == 0: | ||||
|                 classes.append('first') | ||||
|             if idx == total_items - 1: | ||||
|                 classes.append('last') | ||||
|             classes.append(f'item-{idx + 1}') | ||||
|  | ||||
|             class_str = ' '.join(classes) | ||||
|             items_html.append(f'<div class="{class_str}">{item}</div>') | ||||
|         return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>' | ||||
|  | ||||
|     except Exception as e: | ||||
|         logger.warning(f"Error formatting RSS items: {str(e)}") | ||||
|         # Fall back to original content | ||||
|         return rss_content | ||||
| @@ -1,24 +0,0 @@ | ||||
| """ | ||||
| Safe Jinja2 render with max payload sizes | ||||
|  | ||||
| See https://jinja.palletsprojects.com/en/3.1.x/sandbox/#security-considerations | ||||
| """ | ||||
|  | ||||
| import jinja2.sandbox | ||||
| import typing as t | ||||
| import os | ||||
|  | ||||
| JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10)) | ||||
|  | ||||
| # This is used for notifications etc, so actually it's OK to send custom HTML such as <a href> etc, but it should limit what data is available. | ||||
| # (Which also limits available functions that could be called) | ||||
| def render(template_str, **args: t.Any) -> str: | ||||
|     jinja2_env = jinja2.sandbox.ImmutableSandboxedEnvironment(extensions=['jinja2_time.TimeExtension']) | ||||
|     output = jinja2_env.from_string(template_str).render(args) | ||||
|     return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE] | ||||
|  | ||||
| def render_fully_escaped(content): | ||||
|     env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True) | ||||
|     template = env.from_string("{{ some_html|e }}") | ||||
|     return template.render(some_html=content) | ||||
|  | ||||
| @@ -29,7 +29,7 @@ $(document).ready(function () { | ||||
|         $(this).text(new Date($(this).data("utc")).toLocaleString()); | ||||
|     }) | ||||
|  | ||||
|     const timezoneInput = $('#application-timezone'); | ||||
|     const timezoneInput = $('#application-scheduler_timezone_default'); | ||||
|     if(timezoneInput.length) { | ||||
|         const timezone = Intl.DateTimeFormat().resolvedOptions().timeZone; | ||||
|         if (!timezoneInput.val().trim()) { | ||||
|   | ||||
| @@ -344,7 +344,7 @@ label { | ||||
|  }   | ||||
| } | ||||
|  | ||||
| #notification-customisation { | ||||
| .grey-form-border { | ||||
|   border: 1px solid var(--color-border-notification); | ||||
|   padding: 0.5rem; | ||||
|   border-radius: 5px; | ||||
|   | ||||
										
											
												File diff suppressed because one or more lines are too long
											
										
									
								
							| @@ -976,6 +976,10 @@ class ChangeDetectionStore: | ||||
|         if self.data['settings']['application'].get('extract_title_as_title'): | ||||
|             self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title') | ||||
|  | ||||
|     def update_21(self): | ||||
|         self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone') | ||||
|         del self.data['settings']['application']['timezone'] | ||||
|  | ||||
|  | ||||
|     def add_notification_url(self, notification_url): | ||||
|          | ||||
|   | ||||
| @@ -33,7 +33,7 @@ | ||||
|                                 <div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div> | ||||
|                             </div> | ||||
|                         </div> | ||||
|                         <div id="notification-customisation" class="pure-control-group"> | ||||
|                         <div class="pure-control-group grey-form-border"> | ||||
|                             <div class="pure-control-group"> | ||||
|                                 {{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }} | ||||
|                                 <span class="pure-form-message-inline">Title for all notifications</span> | ||||
|   | ||||
| @@ -14,13 +14,31 @@ | ||||
|                 {% if field.errors is mapping and 'form' in field.errors %} | ||||
|                     {#  and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #} | ||||
|                     {% set errors = field.errors['form'] %} | ||||
|                     {% for error in errors %} | ||||
|                         <li>{{ error }}</li> | ||||
|                     {% endfor %} | ||||
|                 {% elif field.type == 'FieldList' %} | ||||
|                     {# Handle FieldList of FormFields - errors is a list of dicts, one per entry #} | ||||
|                     {% for idx, entry_errors in field.errors|enumerate %} | ||||
|                         {% if entry_errors is mapping and entry_errors %} | ||||
|                             {# Only show entries that have actual errors #} | ||||
|                             <li><strong>Entry {{ idx + 1 }}:</strong> | ||||
|                                 <ul> | ||||
|                                     {% for field_name, messages in entry_errors.items() %} | ||||
|                                         {% for message in messages %} | ||||
|                                             <li>{{ field_name }}: {{ message }}</li> | ||||
|                                         {% endfor %} | ||||
|                                     {% endfor %} | ||||
|                                 </ul> | ||||
|                             </li> | ||||
|                         {% endif %} | ||||
|                     {% endfor %} | ||||
|                 {% else %} | ||||
|                     {#  regular list of errors with this field #} | ||||
|                     {% set errors = field.errors %} | ||||
|                     {% for error in field.errors %} | ||||
|                         <li>{{ error }}</li> | ||||
|                     {% endfor %} | ||||
|                 {% endif %} | ||||
|                 {% for error in errors %} | ||||
|                     <li>{{ error }}</li> | ||||
|                 {% endfor %} | ||||
|             </ul> | ||||
|         {% endif %} | ||||
|     </div> | ||||
| @@ -93,6 +111,39 @@ | ||||
|   {{ field(**kwargs)|safe }} | ||||
| {% endmacro %} | ||||
|  | ||||
| {% macro render_fieldlist_with_inline_errors(fieldlist) %} | ||||
|   {# Specialized macro for FieldList(FormField(...)) that renders errors inline with each field #} | ||||
|   <div {% if fieldlist.errors %} class="error" {% endif %}>{{ fieldlist.label }}</div> | ||||
|   <div {% if fieldlist.errors %} class="error" {% endif %}> | ||||
|     <ul id="{{ fieldlist.id }}"> | ||||
|       {% for entry in fieldlist %} | ||||
|         <li {% if entry.errors %} class="error" {% endif %}> | ||||
|           <label for="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>{{ fieldlist.label.text }}-{{ loop.index0 }}</label> | ||||
|           <table id="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}> | ||||
|             <tbody> | ||||
|               {% for subfield in entry %} | ||||
|                 <tr {% if subfield.errors %} class="error" {% endif %}> | ||||
|                   <th {% if subfield.errors %} class="error" {% endif %}><label for="{{ subfield.id }}" {% if subfield.errors %} class="error" {% endif %}>{{ subfield.label.text }}</label></th> | ||||
|                   <td {% if subfield.errors %} class="error" {% endif %}> | ||||
|                     {{ subfield(**kwargs)|safe }} | ||||
|                     {% if subfield.errors %} | ||||
|                       <ul class="errors"> | ||||
|                         {% for error in subfield.errors %} | ||||
|                           <li class="error">{{ error }}</li> | ||||
|                         {% endfor %} | ||||
|                       </ul> | ||||
|                     {% endif %} | ||||
|                   </td> | ||||
|                 </tr> | ||||
|               {% endfor %} | ||||
|             </tbody> | ||||
|           </table> | ||||
|         </li> | ||||
|       {% endfor %} | ||||
|     </ul> | ||||
|   </div> | ||||
| {% endmacro %} | ||||
|  | ||||
| {% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %} | ||||
|   <div class="fieldlist_formfields" id="{{ table_id }}"> | ||||
|     <div class="fieldlist-header"> | ||||
|   | ||||
| @@ -4,6 +4,7 @@ import time | ||||
| from threading import Thread | ||||
|  | ||||
| import pytest | ||||
| import arrow | ||||
| from changedetectionio import changedetection_app | ||||
| from changedetectionio import store | ||||
| import os | ||||
| @@ -29,6 +30,17 @@ def reportlog(pytestconfig): | ||||
|     logger.remove(handler_id) | ||||
|  | ||||
|  | ||||
| @pytest.fixture | ||||
| def environment(mocker): | ||||
|     """Mock arrow.now() to return a fixed datetime for testing jinja2 time extension.""" | ||||
|     # Fixed datetime: Wed, 09 Dec 2015 23:33:01 UTC | ||||
|     # This is calculated to match the test expectations when offsets are applied | ||||
|     fixed_datetime = arrow.Arrow(2015, 12, 9, 23, 33, 1, tzinfo='UTC') | ||||
|     # Patch arrow.now in the TimeExtension module where it's actually used | ||||
|     mocker.patch('changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now', return_value=fixed_datetime) | ||||
|     return fixed_datetime | ||||
|  | ||||
|  | ||||
| def format_memory_human(bytes_value): | ||||
|     """Format memory in human-readable units (KB, MB, GB)""" | ||||
|     if bytes_value < 1024: | ||||
|   | ||||
| @@ -49,3 +49,39 @@ def test_select_custom(client, live_server, measure_memory_usage): | ||||
|     # | ||||
|     # Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default | ||||
|  | ||||
|  | ||||
| def test_custom_proxy_validation(client, live_server, measure_memory_usage): | ||||
|     #  live_server_setup(live_server) # Setup on conftest per function | ||||
|  | ||||
|     # Goto settings, add our custom one | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={ | ||||
|             "requests-time_between_check-minutes": 180, | ||||
|             "application-ignore_whitespace": "y", | ||||
|             "application-fetch_backend": 'html_requests', | ||||
|             "requests-extra_proxies-0-proxy_name": "custom-test-proxy", | ||||
|             "requests-extra_proxies-0-proxy_url": "xxxxhtt/333??p://test:awesome@squid-custom:3128", | ||||
|         }, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b"Settings updated." not in res.data | ||||
|     assert b'Proxy URLs must start with' in res.data | ||||
|  | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={ | ||||
|             "requests-time_between_check-minutes": 180, | ||||
|             "application-ignore_whitespace": "y", | ||||
|             "application-fetch_backend": 'html_requests', | ||||
|             "requests-extra_proxies-0-proxy_name": "custom-test-proxy", | ||||
|             "requests-extra_proxies-0-proxy_url": "https://", | ||||
|         }, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b"Settings updated." not in res.data | ||||
|     assert b"Invalid URL." in res.data | ||||
|      | ||||
| @@ -165,6 +165,46 @@ def test_check_basic_change_detection_functionality(client, live_server, measure | ||||
|     # Cleanup everything | ||||
|     delete_all_watches(client) | ||||
|  | ||||
|  | ||||
| # Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that | ||||
| def test_requests_timeout(client, live_server, measure_memory_usage): | ||||
|     delay = 2 | ||||
|     test_url = url_for('test_endpoint', delay=delay, _external=True) | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-ui-use_page_title_in_list": "", | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               "requests-timeout": delay - 1, | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # requests takes >2 sec but we timeout at 1 second | ||||
|     res = client.get(url_for("watchlist.index")) | ||||
|     assert b'Read timed out. (read timeout=1)' in res.data | ||||
|  | ||||
|     ##### Now set a longer timeout | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-ui-use_page_title_in_list": "", | ||||
|               "requests-time_between_check-minutes": 180, | ||||
|               "requests-timeout": delay + 1, # timeout should be a second more than the reply time | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     res = client.get(url_for("watchlist.index")) | ||||
|     assert b'Read timed out' not in res.data | ||||
|  | ||||
| def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage): | ||||
|     """ | ||||
|  | ||||
| @@ -338,4 +378,4 @@ def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server | ||||
|     assert b'<string name="feed_update_receiver_name"' in res.data | ||||
|     assert b'<foobar' not in res.data | ||||
|  | ||||
|     res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) | ||||
|     res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) | ||||
|   | ||||
| @@ -1,8 +1,10 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import time | ||||
| import arrow | ||||
| from flask import url_for | ||||
| from .util import live_server_setup, wait_for_all_checks | ||||
| from ..jinja2_custom import render | ||||
|  | ||||
|  | ||||
| # def test_setup(client, live_server, measure_memory_usage): | ||||
| @@ -33,6 +35,35 @@ def test_jinja2_in_url_query(client, live_server, measure_memory_usage): | ||||
|     ) | ||||
|     assert b'date=2' in res.data | ||||
|  | ||||
| # Test for issue #1493 - jinja2-time offset functionality | ||||
| def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usage): | ||||
|     """Test that jinja2 time offset expressions work in watch URLs (issue #1493).""" | ||||
|  | ||||
|     # Add our URL to the import page with time offset expression | ||||
|     test_url = url_for('test_return_query', _external=True) | ||||
|  | ||||
|     # Test the exact syntax from issue #1493 that was broken in jinja2-time | ||||
|     # This should work now with our custom TimeExtension | ||||
|     full_url = "{}?{}".format(test_url, | ||||
|                               "timestamp={% now 'utc' - 'minutes=11', '%Y-%m-%d %H:%M' %}", ) | ||||
|     res = client.post( | ||||
|         url_for("ui.ui_views.form_quick_watch_add"), | ||||
|         data={"url": full_url, "tags": "test"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b"Watch added" in res.data | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     # Verify the URL was processed correctly (should not have errors) | ||||
|     res = client.get( | ||||
|         url_for("ui.ui_views.preview_page", uuid="first"), | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     # Should have a valid timestamp in the response | ||||
|     assert b'timestamp=' in res.data | ||||
|     # Should not have template error | ||||
|     assert b'Invalid template' not in res.data | ||||
|  | ||||
| # https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456 | ||||
| def test_jinja2_security_url_query(client, live_server, measure_memory_usage): | ||||
|      | ||||
| @@ -56,3 +87,86 @@ def test_jinja2_security_url_query(client, live_server, measure_memory_usage): | ||||
|     assert b'is invalid and cannot be used' in res.data | ||||
|     # Some of the spewed output from the subclasses | ||||
|     assert b'dict_values' not in res.data | ||||
|  | ||||
| def test_timezone(mocker): | ||||
|     """Verify that timezone is parsed.""" | ||||
|  | ||||
|     timezone = 'America/Buenos_Aires' | ||||
|     currentDate = arrow.now(timezone) | ||||
|     arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now") | ||||
|     arrowNowMock.return_value = currentDate | ||||
|     finalRender = render(f"{{% now '{timezone}' %}}") | ||||
|  | ||||
|     assert finalRender == currentDate.strftime('%a, %d %b %Y %H:%M:%S') | ||||
|  | ||||
| def test_format(mocker): | ||||
|     """Verify that format is parsed.""" | ||||
|  | ||||
|     timezone = 'utc' | ||||
|     format = '%d %b %Y %H:%M:%S' | ||||
|     currentDate = arrow.now(timezone) | ||||
|     arrowNowMock = mocker.patch("arrow.now") | ||||
|     arrowNowMock.return_value = currentDate | ||||
|     finalRender = render(f"{{% now '{timezone}', '{format}' %}}") | ||||
|  | ||||
|     assert finalRender == currentDate.strftime(format) | ||||
|  | ||||
| def test_add_time(environment): | ||||
|     """Verify that added time offset can be parsed.""" | ||||
|  | ||||
|     finalRender = render("{% now 'utc' + 'hours=2,seconds=30' %}") | ||||
|  | ||||
|     assert finalRender == "Thu, 10 Dec 2015 01:33:31" | ||||
|  | ||||
| def test_add_weekday(mocker): | ||||
|     """Verify that added weekday offset can be parsed.""" | ||||
|  | ||||
|     timezone = 'utc' | ||||
|     currentDate = arrow.now(timezone) | ||||
|     arrowNowMock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now") | ||||
|     arrowNowMock.return_value = currentDate | ||||
|     finalRender = render(f"{{% now '{timezone}' + 'weekday=1' %}}") | ||||
|  | ||||
|     assert finalRender == currentDate.shift(weekday=1).strftime('%a, %d %b %Y %H:%M:%S') | ||||
|  | ||||
|  | ||||
| def test_substract_time(environment): | ||||
|     """Verify that substracted time offset can be parsed.""" | ||||
|  | ||||
|     finalRender = render("{% now 'utc' - 'minutes=11' %}") | ||||
|  | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
|  | ||||
|  | ||||
| def test_offset_with_format(environment): | ||||
|     """Verify that offset works together with datetime format.""" | ||||
|  | ||||
|     finalRender = render( | ||||
|         "{% now 'utc' - 'days=2,minutes=33,seconds=1', '%d %b %Y %H:%M:%S' %}" | ||||
|     ) | ||||
|  | ||||
|     assert finalRender == "07 Dec 2015 23:00:00" | ||||
|  | ||||
| def test_default_timezone_empty_string(environment): | ||||
|     """Verify that empty timezone string uses the default timezone (UTC in test environment).""" | ||||
|  | ||||
|     # Empty string should use the default timezone which is 'UTC' (or from application settings) | ||||
|     finalRender = render("{% now '' %}") | ||||
|  | ||||
|     # Should render with default format and UTC timezone (matches environment fixture) | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:33:01" | ||||
|  | ||||
| def test_default_timezone_with_offset(environment): | ||||
|     """Verify that empty timezone works with offset operations.""" | ||||
|  | ||||
|     # Empty string with offset should use default timezone | ||||
|     finalRender = render("{% now '' + 'hours=2', '%d %b %Y %H:%M:%S' %}") | ||||
|  | ||||
|     assert finalRender == "10 Dec 2015 01:33:01" | ||||
|  | ||||
| def test_default_timezone_subtraction(environment): | ||||
|     """Verify that empty timezone works with subtraction offset.""" | ||||
|  | ||||
|     finalRender = render("{% now '' - 'minutes=11' %}") | ||||
|  | ||||
|     assert finalRender == "Wed, 09 Dec 2015 23:22:01" | ||||
| @@ -284,6 +284,27 @@ def test_notification_validation(client, live_server, measure_memory_usage): | ||||
|     ) | ||||
|  | ||||
|  | ||||
| def test_notification_urls_jinja2_apprise_integration(client, live_server, measure_memory_usage): | ||||
|  | ||||
|     # | ||||
|     # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation | ||||
|     test_notification_url = "hassio://127.0.0.1/longaccesstoken?verify=no&nid={{watch_uuid}}" | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
|         data={ | ||||
|               "application-fetch_backend": "html_requests", | ||||
|               "application-minutes_between_check": 180, | ||||
|               "application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了" }', | ||||
|               "application-notification_format": default_notification_format, | ||||
|               "application-notification_urls": test_notification_url, | ||||
|               # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation | ||||
|               "application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} ", | ||||
|               }, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b'Settings updated' in res.data | ||||
|  | ||||
|  | ||||
| def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_memory_usage): | ||||
|      | ||||
| @@ -294,7 +315,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me | ||||
|     # CUSTOM JSON BODY CHECK for POST:// | ||||
|     set_original_response() | ||||
|     # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation | ||||
|     test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&xxx={{ watch_url }}&+custom-header=123&+second=hello+world%20%22space%22" | ||||
|     test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&watch_uuid={{ watch_uuid }}&xxx={{ watch_url }}&now={% now 'Europe/London', '%Y-%m-%d' %}&+custom-header=123&+second=hello+world%20%22space%22" | ||||
|  | ||||
|     res = client.post( | ||||
|         url_for("settings.settings_page"), | ||||
| @@ -320,6 +341,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me | ||||
|     ) | ||||
|  | ||||
|     assert b"Watch added" in res.data | ||||
|     watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|     set_modified_response() | ||||
| @@ -349,6 +371,11 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me | ||||
|         assert 'xxx=http' in notification_url | ||||
|         # apprise style headers should be stripped | ||||
|         assert 'custom-header' not in notification_url | ||||
|         # Check jinja2 custom arrow/jinja2-time replace worked | ||||
|         assert 'now=2' in notification_url | ||||
|         # Check our watch_uuid appeared | ||||
|         assert f'watch_uuid={watch_uuid}' in notification_url | ||||
|  | ||||
|  | ||||
|     with open("test-datastore/notification-headers.txt", 'r') as f: | ||||
|         notification_headers = f.read() | ||||
| @@ -416,7 +443,6 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage | ||||
|     assert res.status_code != 400 | ||||
|     assert res.status_code != 500 | ||||
|  | ||||
|  | ||||
|     with open("test-datastore/notification.txt", 'r') as f: | ||||
|         x = f.read() | ||||
|         assert test_body in x | ||||
|   | ||||
							
								
								
									
										98
									
								
								changedetectionio/tests/test_rss_reader_mode.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										98
									
								
								changedetectionio/tests/test_rss_reader_mode.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,98 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| import time | ||||
| from flask import url_for | ||||
| from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \ | ||||
|     extract_UUID_from_client, delete_all_watches | ||||
|  | ||||
|  | ||||
| def set_original_cdata_xml(): | ||||
|     test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0"> | ||||
| <channel> | ||||
| <title>Security Bulletins on wetscale</title> | ||||
| <link>https://wetscale.com/security-bulletins/</link> | ||||
| <description>Recent security bulletins from wetscale</description> | ||||
| <lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate> | ||||
| <docs>https://validator.w3.org/feed/docs/rss2.html</docs> | ||||
| <generator>wetscale.com</generator> | ||||
| <language>en-US</language> | ||||
| <copyright>© 2025 wetscale Inc. All rights reserved.</copyright> | ||||
| <atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/> | ||||
| <item> | ||||
| <title>TS-2025-005</title> | ||||
| <link>https://wetscale.com/security-bulletins/#ts-2025-005</link> | ||||
| <guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid> | ||||
| <pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate> | ||||
| <description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description> | ||||
| </item> | ||||
|  | ||||
|  | ||||
| <item> | ||||
| <title>TS-2025-004</title> | ||||
| <link>https://wetscale.com/security-bulletins/#ts-2025-004</link> | ||||
| <guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid> | ||||
| <pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate> | ||||
| <description> | ||||
|     <![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]> | ||||
| </description> | ||||
| </item> | ||||
|     </channel> | ||||
|     </rss> | ||||
|             """ | ||||
|  | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write(test_return_data) | ||||
|  | ||||
|  | ||||
|  | ||||
| def test_rss_reader_mode(client, live_server, measure_memory_usage): | ||||
|     set_original_cdata_xml() | ||||
|  | ||||
|     # Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss | ||||
|     # This also triggers the automatic CDATA text parser so the RSS goes back a nice content list | ||||
|     test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True) | ||||
|     live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True | ||||
|  | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|  | ||||
|     watch = live_server.app.config['DATASTORE'].data['watching'][uuid] | ||||
|     dates = list(watch.history.keys()) | ||||
|     snapshot_contents = watch.get_history_snapshot(dates[0]) | ||||
|     assert 'Wet noodles escape' in snapshot_contents | ||||
|     assert '<br>' not in snapshot_contents | ||||
|     assert '<' not in snapshot_contents | ||||
|     assert 'The days of Terminator and The Matrix' in snapshot_contents | ||||
|     assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage): | ||||
|     set_original_cdata_xml() | ||||
|  | ||||
|     # Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss | ||||
|     # This also triggers the automatic CDATA text parser so the RSS goes back a nice content list | ||||
|     test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True) | ||||
|     live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True | ||||
|  | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]}) | ||||
|     client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|  | ||||
|     watch = live_server.app.config['DATASTORE'].data['watching'][uuid] | ||||
|     dates = list(watch.history.keys()) | ||||
|     snapshot_contents = watch.get_history_snapshot(dates[0]) | ||||
|     assert 'Wet noodles escape' not in snapshot_contents | ||||
|     assert '<br>' not in snapshot_contents | ||||
|     assert '<' not in snapshot_contents | ||||
|     assert 'The days of Terminator and The Matrix' in snapshot_contents | ||||
|     delete_all_watches(client) | ||||
|  | ||||
| @@ -24,7 +24,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory | ||||
|         url_for("settings.settings_page"), | ||||
|         data={"application-empty_pages_are_a_change": "", | ||||
|               "requests-time_between_check-seconds": 1, | ||||
|               "application-timezone": "Pacific/Kiritimati",  # Most Forward Time Zone (UTC+14:00) | ||||
|               "application-scheduler_timezone_default": "Pacific/Kiritimati",  # Most Forward Time Zone (UTC+14:00) | ||||
|               'application-fetch_backend': "html_requests"}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
| @@ -119,7 +119,7 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure | ||||
|  | ||||
|     data = { | ||||
|         "application-empty_pages_are_a_change": "", | ||||
|         "application-timezone": "Pacific/Kiritimati",  # Most Forward Time Zone (UTC+14:00) | ||||
|         "application-scheduler_timezone_default": "Pacific/Kiritimati",  # Most Forward Time Zone (UTC+14:00) | ||||
|         'application-fetch_backend': "html_requests", | ||||
|         "requests-time_between_check-hours": 0, | ||||
|         "requests-time_between_check-minutes": 0, | ||||
|   | ||||
| @@ -4,7 +4,7 @@ | ||||
| # python3 -m unittest changedetectionio.tests.unit.test_jinja2_security | ||||
|  | ||||
| import unittest | ||||
| from changedetectionio import safe_jinja | ||||
| from changedetectionio import jinja2_custom as safe_jinja | ||||
|  | ||||
|  | ||||
| # mostly | ||||
|   | ||||
| @@ -6,7 +6,7 @@ | ||||
| import unittest | ||||
| import os | ||||
|  | ||||
| from changedetectionio.processors import restock_diff | ||||
| import changedetectionio.processors.restock_diff.processor as restock_diff | ||||
|  | ||||
| # mostly | ||||
| class TestDiffBuilder(unittest.TestCase): | ||||
|   | ||||
| @@ -1,11 +1,10 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| # run from dir above changedetectionio/ dir | ||||
| # python3 -m unittest changedetectionio.tests.unit.test_jinja2_security | ||||
| # python3 -m unittest changedetectionio.tests.unit.test_scheduler | ||||
|  | ||||
| import unittest | ||||
| from datetime import datetime, timedelta | ||||
| from zoneinfo import ZoneInfo | ||||
| import arrow | ||||
|  | ||||
| class TestScheduler(unittest.TestCase): | ||||
|  | ||||
| @@ -13,12 +12,13 @@ class TestScheduler(unittest.TestCase): | ||||
|     # UTC-12:00 (Baker Island, Howland Island) is the farthest behind, always one calendar day behind UTC. | ||||
|  | ||||
|     def test_timezone_basic_time_within_schedule(self): | ||||
|         """Test that current time is detected as within schedule window.""" | ||||
|         from changedetectionio import time_handler | ||||
|  | ||||
|         timezone_str = 'Europe/Berlin' | ||||
|         debug_datetime = datetime.now(ZoneInfo(timezone_str)) | ||||
|         day_of_week = debug_datetime.strftime('%A') | ||||
|         time_str = str(debug_datetime.hour)+':00' | ||||
|         debug_datetime = arrow.now(timezone_str) | ||||
|         day_of_week = debug_datetime.format('dddd') | ||||
|         time_str = debug_datetime.format('HH:00') | ||||
|         duration = 60  # minutes | ||||
|  | ||||
|         # The current time should always be within 60 minutes of [time_hour]:00 | ||||
| @@ -30,16 +30,17 @@ class TestScheduler(unittest.TestCase): | ||||
|         self.assertEqual(result, True, f"{debug_datetime} is within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes") | ||||
|  | ||||
|     def test_timezone_basic_time_outside_schedule(self): | ||||
|         """Test that time from yesterday is outside current schedule.""" | ||||
|         from changedetectionio import time_handler | ||||
|  | ||||
|         timezone_str = 'Europe/Berlin' | ||||
|         # We try a date in the future.. | ||||
|         debug_datetime = datetime.now(ZoneInfo(timezone_str))+ timedelta(days=-1) | ||||
|         day_of_week = debug_datetime.strftime('%A') | ||||
|         time_str = str(debug_datetime.hour) + ':00' | ||||
|         duration = 60*24  # minutes | ||||
|         # We try a date in the past (yesterday) | ||||
|         debug_datetime = arrow.now(timezone_str).shift(days=-1) | ||||
|         day_of_week = debug_datetime.format('dddd') | ||||
|         time_str = debug_datetime.format('HH:00') | ||||
|         duration = 60 * 24  # minutes | ||||
|  | ||||
|         # The current time should always be within 60 minutes of [time_hour]:00 | ||||
|         # The current time should NOT be within yesterday's schedule | ||||
|         result = time_handler.am_i_inside_time(day_of_week=day_of_week, | ||||
|                                                time_str=time_str, | ||||
|                                                timezone_str=timezone_str, | ||||
| @@ -48,6 +49,58 @@ class TestScheduler(unittest.TestCase): | ||||
|         self.assertNotEqual(result, True, | ||||
|                          f"{debug_datetime} is NOT within time scheduler {day_of_week} {time_str} in {timezone_str} for {duration} minutes") | ||||
|  | ||||
|     def test_timezone_utc_within_schedule(self): | ||||
|         """Test UTC timezone works correctly.""" | ||||
|         from changedetectionio import time_handler | ||||
|  | ||||
|         timezone_str = 'UTC' | ||||
|         debug_datetime = arrow.now(timezone_str) | ||||
|         day_of_week = debug_datetime.format('dddd') | ||||
|         time_str = debug_datetime.format('HH:00') | ||||
|         duration = 120  # minutes | ||||
|  | ||||
|         result = time_handler.am_i_inside_time(day_of_week=day_of_week, | ||||
|                                                time_str=time_str, | ||||
|                                                timezone_str=timezone_str, | ||||
|                                                duration=duration) | ||||
|  | ||||
|         self.assertTrue(result, "Current time should be within UTC schedule") | ||||
|  | ||||
|     def test_timezone_extreme_ahead(self): | ||||
|         """Test with UTC+14 timezone (Line Islands, Kiribati).""" | ||||
|         from changedetectionio import time_handler | ||||
|  | ||||
|         timezone_str = 'Pacific/Kiritimati'  # UTC+14 | ||||
|         debug_datetime = arrow.now(timezone_str) | ||||
|         day_of_week = debug_datetime.format('dddd') | ||||
|         time_str = debug_datetime.format('HH:00') | ||||
|         duration = 60 | ||||
|  | ||||
|         result = time_handler.am_i_inside_time(day_of_week=day_of_week, | ||||
|                                                time_str=time_str, | ||||
|                                                timezone_str=timezone_str, | ||||
|                                                duration=duration) | ||||
|  | ||||
|         self.assertTrue(result, "Should work with extreme ahead timezone") | ||||
|  | ||||
|     def test_timezone_extreme_behind(self): | ||||
|         """Test with UTC-12 timezone (Baker Island).""" | ||||
|         from changedetectionio import time_handler | ||||
|  | ||||
|         # Using Etc/GMT+12 which is UTC-12 (confusing, but that's how it works) | ||||
|         timezone_str = 'Etc/GMT+12'  # UTC-12 | ||||
|         debug_datetime = arrow.now(timezone_str) | ||||
|         day_of_week = debug_datetime.format('dddd') | ||||
|         time_str = debug_datetime.format('HH:00') | ||||
|         duration = 60 | ||||
|  | ||||
|         result = time_handler.am_i_inside_time(day_of_week=day_of_week, | ||||
|                                                time_str=time_str, | ||||
|                                                timezone_str=timezone_str, | ||||
|                                                duration=duration) | ||||
|  | ||||
|         self.assertTrue(result, "Should work with extreme behind timezone") | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
|   | ||||
							
								
								
									
										138
									
								
								changedetectionio/tests/unit/test_time_extension.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										138
									
								
								changedetectionio/tests/unit/test_time_extension.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,138 @@ | ||||
| #!/usr/bin/env python3 | ||||
| """ | ||||
| Simple unit tests for TimeExtension that mimic how safe_jinja.py uses it. | ||||
| These tests demonstrate that the environment.default_timezone override works | ||||
| exactly as intended in the actual application code. | ||||
| """ | ||||
|  | ||||
| import arrow | ||||
| from jinja2.sandbox import ImmutableSandboxedEnvironment | ||||
| from changedetectionio.jinja2_custom.extensions.TimeExtension import TimeExtension | ||||
|  | ||||
|  | ||||
| def test_default_timezone_override_like_safe_jinja(mocker): | ||||
|     """ | ||||
|     Test that mirrors exactly how safe_jinja.py uses the TimeExtension. | ||||
|     This is the simplest demonstration that environment.default_timezone works. | ||||
|     """ | ||||
|     # Create environment (TimeExtension.__init__ sets default_timezone='UTC') | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     # Override the default timezone - exactly like safe_jinja.py does | ||||
|     jinja2_env.default_timezone = 'America/New_York' | ||||
|  | ||||
|     # Mock arrow.now to return a fixed time | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='America/New_York') | ||||
|     mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Use empty string timezone - should use the overridden default | ||||
|     template_str = "{% now '' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # Verify arrow.now was called with the overridden timezone | ||||
|     mock.assert_called_with('America/New_York') | ||||
|     assert '2025' in output | ||||
|     assert 'Jan' in output | ||||
|  | ||||
|  | ||||
| def test_default_timezone_not_overridden(mocker): | ||||
|     """ | ||||
|     Test that without override, the default 'UTC' from __init__ is used. | ||||
|     """ | ||||
|     # Create environment (TimeExtension.__init__ sets default_timezone='UTC') | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     # DON'T override - should use 'UTC' default | ||||
|  | ||||
|     # Mock arrow.now | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 17, 0, 0, tzinfo='UTC') | ||||
|     mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Use empty string timezone - should use 'UTC' default | ||||
|     template_str = "{% now '' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # Verify arrow.now was called with 'UTC' | ||||
|     mock.assert_called_with('UTC') | ||||
|     assert '2025' in output | ||||
|  | ||||
|  | ||||
| def test_datetime_format_override_like_safe_jinja(mocker): | ||||
|     """ | ||||
|     Test that environment.datetime_format can be overridden after creation. | ||||
|     """ | ||||
|     # Create environment (default format is '%a, %d %b %Y %H:%M:%S') | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     # Override the datetime format | ||||
|     jinja2_env.datetime_format = '%Y-%m-%d %H:%M:%S' | ||||
|  | ||||
|     # Mock arrow.now | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 14, 30, 45, tzinfo='UTC') | ||||
|     mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Don't specify format - should use overridden default | ||||
|     template_str = "{% now 'UTC' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # Should use custom format YYYY-MM-DD HH:MM:SS | ||||
|     assert output == '2025-01-15 14:30:45' | ||||
|  | ||||
|  | ||||
| def test_offset_with_overridden_timezone(mocker): | ||||
|     """ | ||||
|     Test that offset operations also respect the overridden default_timezone. | ||||
|     """ | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     # Override to use Europe/London | ||||
|     jinja2_env.default_timezone = 'Europe/London' | ||||
|  | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 10, 0, 0, tzinfo='Europe/London') | ||||
|     mock = mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Use offset with empty timezone string | ||||
|     template_str = "{% now '' + 'hours=2', '%Y-%m-%d %H:%M:%S' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # Should have called arrow.now with Europe/London | ||||
|     mock.assert_called_with('Europe/London') | ||||
|     # Should be 10:00 + 2 hours = 12:00 | ||||
|     assert output == '2025-01-15 12:00:00' | ||||
|  | ||||
|  | ||||
| def test_weekday_parameter_converted_to_int(mocker): | ||||
|     """ | ||||
|     Test that weekday parameter is properly converted from float to int. | ||||
|     This is important because arrow.shift() requires weekday as int, not float. | ||||
|     """ | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     # Wednesday, Jan 15, 2025 | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 12, 0, 0, tzinfo='UTC') | ||||
|     mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Add offset to next Monday (weekday=0) | ||||
|     template_str = "{% now 'UTC' + 'weekday=0', '%A' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # Should be Monday | ||||
|     assert output == 'Monday' | ||||
|  | ||||
|  | ||||
| def test_multiple_offset_parameters(mocker): | ||||
|     """ | ||||
|     Test that multiple offset parameters can be combined in one expression. | ||||
|     """ | ||||
|     jinja2_env = ImmutableSandboxedEnvironment(extensions=[TimeExtension]) | ||||
|  | ||||
|     fixed_time = arrow.Arrow(2025, 1, 15, 10, 30, 45, tzinfo='UTC') | ||||
|     mocker.patch("changedetectionio.jinja2_custom.extensions.TimeExtension.arrow.now", return_value=fixed_time) | ||||
|  | ||||
|     # Test multiple parameters: days, hours, minutes, seconds | ||||
|     template_str = "{% now 'UTC' + 'days=1,hours=2,minutes=15,seconds=10', '%Y-%m-%d %H:%M:%S' %}" | ||||
|     output = jinja2_env.from_string(template_str).render() | ||||
|  | ||||
|     # 2025-01-15 10:30:45 + 1 day + 2 hours + 15 minutes + 10 seconds | ||||
|     # = 2025-01-16 12:45:55 | ||||
|     assert output == '2025-01-16 12:45:55' | ||||
							
								
								
									
										429
									
								
								changedetectionio/tests/unit/test_time_handler.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										429
									
								
								changedetectionio/tests/unit/test_time_handler.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,429 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| """ | ||||
| Comprehensive tests for time_handler module refactored to use arrow. | ||||
|  | ||||
| Run from project root: | ||||
| python3 -m pytest changedetectionio/tests/unit/test_time_handler.py -v | ||||
| """ | ||||
|  | ||||
| import unittest | ||||
| import arrow | ||||
| from changedetectionio import time_handler | ||||
|  | ||||
|  | ||||
| class TestAmIInsideTime(unittest.TestCase): | ||||
|     """Tests for the am_i_inside_time function.""" | ||||
|  | ||||
|     def test_current_time_within_schedule(self): | ||||
|         """Test that current time is detected as within schedule.""" | ||||
|         # Get current time in a specific timezone | ||||
|         timezone_str = 'Europe/Berlin' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = now.format('HH:00')  # Current hour, 0 minutes | ||||
|         duration = 60  # 60 minutes | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result, f"Current time should be within {duration} minute window starting at {time_str}") | ||||
|  | ||||
|     def test_current_time_outside_schedule(self): | ||||
|         """Test that time in the past is not within current schedule.""" | ||||
|         timezone_str = 'Europe/Berlin' | ||||
|         # Get yesterday's date | ||||
|         yesterday = arrow.now(timezone_str).shift(days=-1) | ||||
|         day_of_week = yesterday.format('dddd') | ||||
|         time_str = yesterday.format('HH:mm') | ||||
|         duration = 30  # Only 30 minutes | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertFalse(result, "Yesterday's time should not be within current schedule") | ||||
|  | ||||
|     def test_timezone_pacific_within_schedule(self): | ||||
|         """Test with US/Pacific timezone.""" | ||||
|         timezone_str = 'US/Pacific' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = now.format('HH:00') | ||||
|         duration = 120  # 2 hours | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result) | ||||
|  | ||||
|     def test_timezone_tokyo_within_schedule(self): | ||||
|         """Test with Asia/Tokyo timezone.""" | ||||
|         timezone_str = 'Asia/Tokyo' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = now.format('HH:00') | ||||
|         duration = 90  # 1.5 hours | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result) | ||||
|  | ||||
|     def test_schedule_crossing_midnight(self): | ||||
|         """Test schedule that crosses midnight.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|  | ||||
|         # Set schedule to start 23:30 with 120 minute duration (crosses midnight) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = "23:30" | ||||
|         duration = 120  # 2 hours - goes into next day | ||||
|  | ||||
|         # If we're at 00:15 the next day, we should still be in the schedule | ||||
|         if now.hour == 0 and now.minute < 30: | ||||
|             # We're in the time window that spilled over from yesterday | ||||
|             result = time_handler.am_i_inside_time( | ||||
|                 day_of_week=day_of_week, | ||||
|                 time_str=time_str, | ||||
|                 timezone_str=timezone_str, | ||||
|                 duration=duration | ||||
|             ) | ||||
|             # This might be true or false depending on exact time | ||||
|             self.assertIsInstance(result, bool) | ||||
|  | ||||
|     def test_invalid_day_of_week(self): | ||||
|         """Test that invalid day raises ValueError.""" | ||||
|         with self.assertRaises(ValueError) as context: | ||||
|             time_handler.am_i_inside_time( | ||||
|                 day_of_week="Funday", | ||||
|                 time_str="12:00", | ||||
|                 timezone_str="UTC", | ||||
|                 duration=60 | ||||
|             ) | ||||
|         self.assertIn("Invalid day_of_week", str(context.exception)) | ||||
|  | ||||
|     def test_invalid_time_format(self): | ||||
|         """Test that invalid time format raises ValueError.""" | ||||
|         with self.assertRaises(ValueError) as context: | ||||
|             time_handler.am_i_inside_time( | ||||
|                 day_of_week="Monday", | ||||
|                 time_str="25:99", | ||||
|                 timezone_str="UTC", | ||||
|                 duration=60 | ||||
|             ) | ||||
|         self.assertIn("Invalid time_str", str(context.exception)) | ||||
|  | ||||
|     def test_invalid_time_format_non_numeric(self): | ||||
|         """Test that non-numeric time raises ValueError.""" | ||||
|         with self.assertRaises(ValueError) as context: | ||||
|             time_handler.am_i_inside_time( | ||||
|                 day_of_week="Monday", | ||||
|                 time_str="twelve:thirty", | ||||
|                 timezone_str="UTC", | ||||
|                 duration=60 | ||||
|             ) | ||||
|         self.assertIn("Invalid time_str", str(context.exception)) | ||||
|  | ||||
|     def test_invalid_timezone(self): | ||||
|         """Test that invalid timezone raises ValueError.""" | ||||
|         with self.assertRaises(ValueError) as context: | ||||
|             time_handler.am_i_inside_time( | ||||
|                 day_of_week="Monday", | ||||
|                 time_str="12:00", | ||||
|                 timezone_str="Invalid/Timezone", | ||||
|                 duration=60 | ||||
|             ) | ||||
|         self.assertIn("Invalid timezone_str", str(context.exception)) | ||||
|  | ||||
|     def test_short_duration(self): | ||||
|         """Test with very short duration (15 minutes default).""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = now.format('HH:mm') | ||||
|         duration = 15  # Default duration | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result, "Current time should be within 15 minute window") | ||||
|  | ||||
|     def test_long_duration(self): | ||||
|         """Test with long duration (24 hours).""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         # Set time to current hour | ||||
|         time_str = now.format('HH:00') | ||||
|         duration = 1440  # 24 hours in minutes | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result, "Current time should be within 24 hour window") | ||||
|  | ||||
|     def test_case_insensitive_day(self): | ||||
|         """Test that day of week is case insensitive.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd').lower()  # lowercase day | ||||
|         time_str = now.format('HH:00') | ||||
|         duration = 60 | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         self.assertTrue(result, "Lowercase day should work") | ||||
|  | ||||
|     def test_edge_case_midnight(self): | ||||
|         """Test edge case at exactly midnight.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = "00:00" | ||||
|         duration = 60 | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         # Should be true if we're in the first hour of the day | ||||
|         if now.hour == 0: | ||||
|             self.assertTrue(result) | ||||
|  | ||||
|     def test_edge_case_end_of_day(self): | ||||
|         """Test edge case near end of day.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         day_of_week = now.format('dddd') | ||||
|         time_str = "23:45" | ||||
|         duration = 30  # 30 minutes crosses midnight | ||||
|  | ||||
|         result = time_handler.am_i_inside_time( | ||||
|             day_of_week=day_of_week, | ||||
|             time_str=time_str, | ||||
|             timezone_str=timezone_str, | ||||
|             duration=duration | ||||
|         ) | ||||
|  | ||||
|         # Result depends on current time | ||||
|         self.assertIsInstance(result, bool) | ||||
|  | ||||
|  | ||||
| class TestIsWithinSchedule(unittest.TestCase): | ||||
|     """Tests for the is_within_schedule function.""" | ||||
|  | ||||
|     def test_schedule_disabled(self): | ||||
|         """Test that disabled schedule returns False.""" | ||||
|         time_schedule_limit = {'enabled': False} | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertFalse(result) | ||||
|  | ||||
|     def test_schedule_none(self): | ||||
|         """Test that None schedule returns False.""" | ||||
|         result = time_handler.is_within_schedule(None) | ||||
|         self.assertFalse(result) | ||||
|  | ||||
|     def test_schedule_empty_dict(self): | ||||
|         """Test that empty dict returns False.""" | ||||
|         result = time_handler.is_within_schedule({}) | ||||
|         self.assertFalse(result) | ||||
|  | ||||
|     def test_schedule_enabled_but_day_disabled(self): | ||||
|         """Test schedule enabled but current day disabled.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         current_day = now.format('dddd').lower() | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': False, | ||||
|                 'start_time': '09:00', | ||||
|                 'duration': {'hours': 8, 'minutes': 0} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertFalse(result, "Disabled day should return False") | ||||
|  | ||||
|     def test_schedule_enabled_within_time(self): | ||||
|         """Test schedule enabled and within time window.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         current_day = now.format('dddd').lower() | ||||
|         current_hour = now.format('HH:00') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': current_hour, | ||||
|                 'duration': {'hours': 2, 'minutes': 0} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertTrue(result, "Current time should be within schedule") | ||||
|  | ||||
|     def test_schedule_enabled_outside_time(self): | ||||
|         """Test schedule enabled but outside time window.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         current_day = now.format('dddd').lower() | ||||
|         # Set time to 3 hours ago | ||||
|         past_time = now.shift(hours=-3).format('HH:mm') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': past_time, | ||||
|                 'duration': {'hours': 1, 'minutes': 0}  # Only 1 hour duration | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertFalse(result, "3 hours ago with 1 hour duration should be False") | ||||
|  | ||||
|     def test_schedule_with_default_timezone(self): | ||||
|         """Test schedule without timezone uses default.""" | ||||
|         now = arrow.now('America/New_York') | ||||
|         current_day = now.format('dddd').lower() | ||||
|         current_hour = now.format('HH:00') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             # No timezone specified | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': current_hour, | ||||
|                 'duration': {'hours': 2, 'minutes': 0} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         # Should use default UTC, but since we're testing with NY time, | ||||
|         # the result depends on time difference | ||||
|         result = time_handler.is_within_schedule( | ||||
|             time_schedule_limit, | ||||
|             default_tz='America/New_York' | ||||
|         ) | ||||
|         self.assertTrue(result, "Should work with default timezone") | ||||
|  | ||||
|     def test_schedule_different_timezones(self): | ||||
|         """Test schedule works correctly across different timezones.""" | ||||
|         # Test with Tokyo timezone | ||||
|         timezone_str = 'Asia/Tokyo' | ||||
|         now = arrow.now(timezone_str) | ||||
|         current_day = now.format('dddd').lower() | ||||
|         current_hour = now.format('HH:00') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': current_hour, | ||||
|                 'duration': {'hours': 1, 'minutes': 30} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertTrue(result) | ||||
|  | ||||
|     def test_schedule_with_minutes_in_duration(self): | ||||
|         """Test schedule with minutes specified in duration.""" | ||||
|         timezone_str = 'UTC' | ||||
|         now = arrow.now(timezone_str) | ||||
|         current_day = now.format('dddd').lower() | ||||
|         current_time = now.format('HH:mm') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': current_time, | ||||
|                 'duration': {'hours': 0, 'minutes': 45} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertTrue(result, "Should handle minutes in duration") | ||||
|  | ||||
|     def test_schedule_with_timezone_whitespace(self): | ||||
|         """Test that timezone with whitespace is handled.""" | ||||
|         timezone_str = '  UTC  ' | ||||
|         now = arrow.now('UTC') | ||||
|         current_day = now.format('dddd').lower() | ||||
|         current_hour = now.format('HH:00') | ||||
|  | ||||
|         time_schedule_limit = { | ||||
|             'enabled': True, | ||||
|             'timezone': timezone_str, | ||||
|             current_day: { | ||||
|                 'enabled': True, | ||||
|                 'start_time': current_hour, | ||||
|                 'duration': {'hours': 1, 'minutes': 0} | ||||
|             } | ||||
|         } | ||||
|  | ||||
|         result = time_handler.is_within_schedule(time_schedule_limit) | ||||
|         self.assertTrue(result, "Should handle timezone with whitespace") | ||||
|  | ||||
|  | ||||
| class TestWeekdayEnum(unittest.TestCase): | ||||
|     """Tests for the Weekday enum.""" | ||||
|  | ||||
|     def test_weekday_values(self): | ||||
|         """Test that weekday enum has correct values.""" | ||||
|         self.assertEqual(time_handler.Weekday.Monday, 0) | ||||
|         self.assertEqual(time_handler.Weekday.Tuesday, 1) | ||||
|         self.assertEqual(time_handler.Weekday.Wednesday, 2) | ||||
|         self.assertEqual(time_handler.Weekday.Thursday, 3) | ||||
|         self.assertEqual(time_handler.Weekday.Friday, 4) | ||||
|         self.assertEqual(time_handler.Weekday.Saturday, 5) | ||||
|         self.assertEqual(time_handler.Weekday.Sunday, 6) | ||||
|  | ||||
|     def test_weekday_string_access(self): | ||||
|         """Test accessing weekday enum by string.""" | ||||
|         self.assertEqual(time_handler.Weekday['Monday'], 0) | ||||
|         self.assertEqual(time_handler.Weekday['Sunday'], 6) | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
| @@ -188,6 +188,10 @@ def new_live_server_setup(live_server): | ||||
|         ctype = request.args.get('content_type') | ||||
|         status_code = request.args.get('status_code') | ||||
|         content = request.args.get('content') or None | ||||
|         delay = int(request.args.get('delay', 0)) | ||||
|  | ||||
|         if delay: | ||||
|             time.sleep(delay) | ||||
|  | ||||
|         # Used to just try to break the header detection | ||||
|         uppercase_headers = request.args.get('uppercase_headers') | ||||
|   | ||||
| @@ -1,6 +1,5 @@ | ||||
| from datetime import timedelta, datetime | ||||
| import arrow | ||||
| from enum import IntEnum | ||||
| from zoneinfo import ZoneInfo | ||||
|  | ||||
|  | ||||
| class Weekday(IntEnum): | ||||
| @@ -40,54 +39,65 @@ def am_i_inside_time( | ||||
|  | ||||
|     # Parse the start time | ||||
|     try: | ||||
|         target_time = datetime.strptime(time_str, '%H:%M').time() | ||||
|     except ValueError: | ||||
|         hour, minute = map(int, time_str.split(':')) | ||||
|         if not (0 <= hour <= 23 and 0 <= minute <= 59): | ||||
|             raise ValueError | ||||
|     except (ValueError, AttributeError): | ||||
|         raise ValueError(f"Invalid time_str: '{time_str}'. Must be in 'HH:MM' format.") | ||||
|  | ||||
|     # Define the timezone | ||||
|     try: | ||||
|         tz = ZoneInfo(timezone_str) | ||||
|     except Exception: | ||||
|         raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.") | ||||
|  | ||||
|     # Get the current time in the specified timezone | ||||
|     now_tz = datetime.now(tz) | ||||
|     try: | ||||
|         now_tz = arrow.now(timezone_str.strip()) | ||||
|     except Exception as e: | ||||
|         raise ValueError(f"Invalid timezone_str: '{timezone_str}'. Must be a valid timezone identifier.") | ||||
|  | ||||
|     # Check if the current day matches the target day or overlaps due to duration | ||||
|     current_weekday = now_tz.weekday() | ||||
|     start_datetime_tz = datetime.combine(now_tz.date(), target_time, tzinfo=tz) | ||||
|     # Create start datetime for today in target timezone | ||||
|     start_datetime_tz = now_tz.replace(hour=hour, minute=minute, second=0, microsecond=0) | ||||
|  | ||||
|     # Handle previous day's overlap | ||||
|     if target_weekday == (current_weekday - 1) % 7: | ||||
|         # Calculate start and end times for the overlap from the previous day | ||||
|         start_datetime_tz -= timedelta(days=1) | ||||
|         end_datetime_tz = start_datetime_tz + timedelta(minutes=duration) | ||||
|         start_datetime_tz = start_datetime_tz.shift(days=-1) | ||||
|         end_datetime_tz = start_datetime_tz.shift(minutes=duration) | ||||
|         if start_datetime_tz <= now_tz < end_datetime_tz: | ||||
|             return True | ||||
|  | ||||
|     # Handle current day's range | ||||
|     if target_weekday == current_weekday: | ||||
|         end_datetime_tz = start_datetime_tz + timedelta(minutes=duration) | ||||
|         end_datetime_tz = start_datetime_tz.shift(minutes=duration) | ||||
|         if start_datetime_tz <= now_tz < end_datetime_tz: | ||||
|             return True | ||||
|  | ||||
|     # Handle next day's overlap | ||||
|     if target_weekday == (current_weekday + 1) % 7: | ||||
|         end_datetime_tz = start_datetime_tz + timedelta(minutes=duration) | ||||
|         if now_tz < start_datetime_tz and now_tz + timedelta(days=1) < end_datetime_tz: | ||||
|         end_datetime_tz = start_datetime_tz.shift(minutes=duration) | ||||
|         if now_tz < start_datetime_tz and now_tz.shift(days=1) < end_datetime_tz: | ||||
|             return True | ||||
|  | ||||
|     return False | ||||
|  | ||||
|  | ||||
| def is_within_schedule(time_schedule_limit, default_tz="UTC"): | ||||
|     """ | ||||
|     Check if the current time is within a scheduled time window. | ||||
|  | ||||
|     Parameters: | ||||
|         time_schedule_limit (dict): Schedule configuration with timezone, day settings, etc. | ||||
|         default_tz (str): Default timezone to use if not specified. Default is 'UTC'. | ||||
|  | ||||
|     Returns: | ||||
|         bool: True if current time is within the schedule, False otherwise. | ||||
|     """ | ||||
|     if time_schedule_limit and time_schedule_limit.get('enabled'): | ||||
|         # Get the timezone the time schedule is in, so we know what day it is there | ||||
|         tz_name = time_schedule_limit.get('timezone') | ||||
|         if not tz_name: | ||||
|             tz_name = default_tz | ||||
|  | ||||
|         now_day_name_in_tz = datetime.now(ZoneInfo(tz_name.strip())).strftime('%A') | ||||
|         # Get current day name in the target timezone | ||||
|         now_day_name_in_tz = arrow.now(tz_name.strip()).format('dddd') | ||||
|         selected_day_schedule = time_schedule_limit.get(now_day_name_in_tz.lower()) | ||||
|         if not selected_day_schedule.get('enabled'): | ||||
|             return False | ||||
|   | ||||
| @@ -1,5 +1,6 @@ | ||||
| # eventlet>=0.38.0  # Removed - replaced with threading mode for better Python 3.12+ compatibility | ||||
| feedgen~=0.9 | ||||
| feedparser~=6.0  # For parsing RSS/Atom feeds | ||||
| flask-compress | ||||
| # 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers) | ||||
| flask-login>=0.6.3 | ||||
| @@ -72,7 +73,7 @@ werkzeug==3.0.6 | ||||
|  | ||||
| # Templating, so far just in the URLs but in the future can be for the notifications also | ||||
| jinja2~=3.1 | ||||
| jinja2-time | ||||
| arrow | ||||
| openpyxl | ||||
| # https://peps.python.org/pep-0508/#environment-markers | ||||
| # https://github.com/dgtlmoon/changedetection.io/pull/1009 | ||||
| @@ -88,6 +89,7 @@ pyppeteerstealth>=0.0.4 | ||||
| # Include pytest, so if theres a support issue we can ask them to run these tests on their setup | ||||
| pytest ~=7.2 | ||||
| pytest-flask ~=1.2 | ||||
| pytest-mock ~=3.15 | ||||
|  | ||||
| # Anything 4.0 and up but not 5.0 | ||||
| jsonschema ~= 4.0 | ||||
| @@ -124,8 +126,9 @@ price-parser | ||||
|  | ||||
| # flask_socket_io - incorrect package name, already have flask-socketio above | ||||
|  | ||||
| # So far for detecting correct favicon type, but for other things in the future | ||||
| python-magic | ||||
| # Lightweight MIME type detection (saves ~14MB memory vs python-magic/libmagic) | ||||
| # Used for detecting correct favicon type and content-type detection | ||||
| puremagic | ||||
|  | ||||
| # Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !) | ||||
| tzdata | ||||
|   | ||||
		Reference in New Issue
	
	Block a user