Compare commits

..

1 Commits

48 changed files with 436 additions and 1190 deletions

View File

@@ -11,4 +11,6 @@ updates:
- package-ecosystem: pip
directory: /
schedule:
interval: "weekly"
interval: "daily"
allow:
- dependency-name: "apprise"

View File

@@ -74,5 +74,5 @@ jobs:
file: ${{ matrix.dockerfile }}
platforms: ${{ matrix.platform }}
cache-from: type=gha
cache-to: type=gha,mode=min
cache-to: type=gha,mode=max

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.17'
__version__ = '0.50.14'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -91,7 +91,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
try:
processor_module = importlib.import_module(processor_module_name)
except ModuleNotFoundError as e:
logger.error(f"Processor module '{processor}' not found.")
print(f"Processor module '{processor}' not found.")
raise e
update_handler = processor_module.perform_site_check(datastore=datastore,

View File

@@ -5,7 +5,6 @@
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
const notification_test_render_preview_url="{{url_for('ui.ui_notification.ajax_callback_test_render_preview', mode="global-settings")}}";
{% if emailprefix %}
const email_notification_prefix=JSON.parse('{{emailprefix|tojson}}');
{% endif %}
@@ -44,6 +43,10 @@
</div>
</div>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.filter_failure_notification_threshold_attempts, class="filter_failure_notification_threshold_attempts") }}
<span class="pure-form-message-inline">After this many consecutive times that the CSS/xPath filter is missing, send a notification
@@ -130,10 +133,6 @@
<span class="pure-form-message-inline">Number of concurrent workers to process watches. More workers = faster processing but higher memory usage.<br>
Currently running: <strong>{{ worker_info.count }}</strong> operational {{ worker_info.type }} workers{% if worker_info.active_workers > 0 %} ({{ worker_info.active_workers }} actively processing){% endif %}.</span>
</div>
<div class="pure-control-group">
{{ render_field(form.requests.form.jitter_seconds, class="jitter_seconds") }}
<span class="pure-form-message-inline">Example - 3 seconds random jitter could trigger up to 3 seconds earlier or up to 3 seconds later</span>
</div>
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.default_ua) }}
<span class="pure-form-message-inline">
@@ -192,21 +191,24 @@ nav
</ul>
</span>
</fieldset>
<fieldset class="pure-group">
{{ render_checkbox_field(form.application.form.strip_ignored_lines) }}
<span class="pure-form-message-inline">Remove any text that appears in the "Ignore text" from the output (otherwise its just ignored for change-detection)<br>
<i>Note:</i> Changing this will change the status of your existing watches, possibly trigger alerts etc.
</span>
</fieldset>
</div>
<div class="tab-pane-inner" id="api">
<p>
<strong>Chrome extension and API Access</strong><br>
</p>
<h4>API Access</h4>
<p>Drive your changedetection.io via API, More about <a href="https://changedetection.io/docs/api_v1/index.html">API access and examples here</a>.</p>
<div class="pure-control-group border-fieldset">
<strong>Chrome Extension</strong><br>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.api_access_token_enabled) }}
<div class="pure-form-message-inline">Restrict API access limit by using <code>x-api-key</code> header - required for the Chrome Extension to work</div><br>
<div class="pure-form-message-inline"><br>API Key <span id="api-key">{{api_key}}</span>
<span style="display:none;" id="api-key-copy" >copy</span>
</div>
</div>
<div class="pure-control-group">
<a href="{{url_for('settings.settings_reset_api_key')}}" class="pure-button button-small button-cancel">Regenerate API key</a>
</div>
<div class="pure-control-group">
<h4>Chrome Extension</h4>
<p>Easily add any web-page to your changedetection.io installation from within Chrome.</p>
<strong>Step 1</strong> Install the extension, <strong>Step 2</strong> Navigate to this page,
<strong>Step 3</strong> Open the extension from the toolbar and click "<i>Sync API Access</i>"
@@ -219,20 +221,6 @@ nav
</a>
</p>
</div>
<div class="pure-control-group border-fieldset">
Drive your changedetection.io via API, More about <a href="https://changedetection.io/docs/api_v1/index.html">API access and examples here</a>.<br>
<p>
{{ render_checkbox_field(form.application.form.api_access_token_enabled) }}
</p>
<div class="pure-form-message-inline">Restrict API access limit by using <code>x-api-key</code> header - required for the Chrome Extension to work</div><br>
<div class="pure-form-message-inline"><br>API Key <span id="api-key">{{api_key}}</span>
<span style="display:none;" id="api-key-copy" >copy</span>
</div>
<p>
<a href="{{url_for('settings.settings_reset_api_key')}}" class="pure-button button-small button-cancel">Regenerate API key</a>
</p>
</div>
</div>
<div class="tab-pane-inner" id="timedate">
<div class="pure-control-group">

View File

@@ -4,8 +4,6 @@
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="group-settings")}}";
const notification_test_render_preview_url="{{url_for('ui.ui_notification.ajax_callback_test_render_preview', mode="group-settings", watch_uuid=data.uuid)}}";
//alert(notification_test_render_preview_url)
</script>
<script src="{{url_for('static_content', group='js', filename='tabs.js')}}" defer></script>
@@ -21,8 +19,6 @@
<script src="{{url_for('static_content', group='js', filename='watch-settings.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='notifications.js')}}" defer></script>
<script src="{{url_for('static_content', group='js', filename='plugins.js')}}" defer></script>
<div class="edit-form monospaced-textarea">

View File

@@ -106,14 +106,14 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
for uuid in uuids:
watch_check_update.send(watch_uuid=uuid)
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update, notification_q):
def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handler, queuedWatchMetaData, watch_check_update):
ui_blueprint = Blueprint('ui', __name__, template_folder="templates")
# Register the edit blueprint
edit_blueprint = construct_edit_blueprint(datastore, update_q, queuedWatchMetaData)
ui_blueprint.register_blueprint(edit_blueprint)
# Register the notification blueprint - mostly used for sending test
# Register the notification blueprint
notification_blueprint = construct_notification_blueprint(datastore)
ui_blueprint.register_blueprint(notification_blueprint)

View File

@@ -1,85 +1,50 @@
from flask import Blueprint, request, make_response, jsonify
from flask import Blueprint, request, make_response
import random
from loguru import logger
from changedetectionio.notification.handler import process_notification
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
def construct_blueprint(datastore: ChangeDetectionStore):
notification_blueprint = Blueprint('ui_notification', __name__, template_folder="../ui/templates")
@notification_blueprint.route("/notification/render-preview/<string:watch_uuid>", methods=['POST'])
@notification_blueprint.route("/notification/render-preview", methods=['POST'])
@notification_blueprint.route("/notification/render-preview/", methods=['POST'])
@login_optionally_required
def ajax_callback_test_render_preview(watch_uuid=None):
return ajax_callback_send_notification_test(watch_uuid=watch_uuid, send_as_null_test=True)
# AJAX endpoint for sending a test
@notification_blueprint.route("/notification/send-test/<string:watch_uuid>", methods=['POST'])
@notification_blueprint.route("/notification/send-test", methods=['POST'])
@notification_blueprint.route("/notification/send-test/", methods=['POST'])
@login_optionally_required
def ajax_callback_send_notification_test(watch_uuid=None, send_as_null_test=False):
def ajax_callback_send_notification_test(watch_uuid=None):
# Watch_uuid could be unset in the case it`s used in tag editor, global settings
import apprise
from urllib.parse import urlparse
from changedetectionio.notification.handler import process_notification
from changedetectionio.notification.apprise_plugin.assets import apprise_asset
# Necessary so that we import our custom handlers
from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler, apprise_null_custom_handler
from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler
apobj = apprise.Apprise(asset=apprise_asset)
sent_obj = {}
is_global_settings_form = request.args.get('mode', '') == 'global-settings'
is_group_settings_form = request.args.get('mode', '') == 'group-settings'
# Use an existing random one on the global/main settings form
if not watch_uuid and is_global_settings_form and datastore.data.get('watching'):
if not watch_uuid and (is_global_settings_form or is_group_settings_form) \
and datastore.data.get('watching'):
logger.debug(f"Send test notification - Choosing random Watch {watch_uuid}")
watch_uuid = random.choice(list(datastore.data['watching'].keys()))
logger.debug(f"Send test notification - Chose random watch UUID: {watch_uuid}")
if is_group_settings_form and datastore.data.get('watching'):
logger.debug(f"Send test notification - Choosing random Watch from group {watch_uuid}")
matching_watches = [uuid for uuid, watch in datastore.data['watching'].items() if watch.get('tags') and watch_uuid in watch['tags']]
if matching_watches:
watch_uuid = random.choice(matching_watches)
else:
# Just fallback to any
watch_uuid = random.choice(list(datastore.data['watching'].keys()))
if not watch_uuid:
return make_response("Error: You must have atleast one watch configured for 'test notification' to work", 400)
watch = datastore.data['watching'].get(watch_uuid)
notification_urls = []
notification_urls = None
if send_as_null_test:
test_schema = ''
try:
if request.form.get('notification_urls') and '://' in request.form.get('notification_urls'):
first_test_notification_url = request.form['notification_urls'].strip().splitlines()[0]
test_schema = urlparse(first_test_notification_url).scheme.lower().strip()
except Exception as e:
logger.error(f"Error trying to get a test schema based on the first notification_url {str(e)}")
notification_urls = [
# Null lets us do the whole chain of the same code without any extra repeated code
f'null://null-test-just-to-render-everything-on-the-same-codepath-and-get-preview?test_schema={test_schema}'
]
else:
if request.form.get('notification_urls'):
notification_urls += request.form['notification_urls'].strip().splitlines()
if request.form.get('notification_urls'):
notification_urls = request.form['notification_urls'].strip().splitlines()
if not notification_urls:
logger.debug("Test notification - Trying by group/tag in the edit form if available")
# @todo this logic is not clear, omegaconf?
# On an edit page, we should also fire off to the tags if they have notifications
if request.form.get('tags') and request.form['tags'].strip():
for k in request.form['tags'].split(','):
@@ -93,28 +58,23 @@ def construct_blueprint(datastore: ChangeDetectionStore):
notification_urls = datastore.data['settings']['application']['notification_urls']
if not notification_urls:
return make_response("Error: No Notification URLs set/found.", 400)
return 'Error: No Notification URLs set/found'
for n_url in notification_urls:
if len(n_url.strip()):
if not apobj.add(n_url):
return make_response(f'Error: {n_url} is not a valid AppRise URL.', 400)
return f'Error: {n_url} is not a valid AppRise URL.'
try:
# use the same as when it is triggered, but then override it with the form test values
n_object = {
'watch_url': request.form.get('window_url', "https://changedetection.io"),
'notification_urls': notification_urls,
'uuid': watch_uuid # Ensure uuid is present so diff rendering works
'notification_urls': notification_urls
}
# Only use if present, if not set in n_object it should use the default system value
notif_format = request.form.get('notification_format', '').strip()
# Use it if provided and not "System default", otherwise fall back
if notif_format and notif_format != 'System default':
n_object['notification_format'] = notif_format
else:
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
if 'notification_format' in request.form and request.form['notification_format'].strip():
n_object['notification_format'] = request.form.get('notification_format', '').strip()
if 'notification_title' in request.form and request.form['notification_title'].strip():
n_object['notification_title'] = request.form.get('notification_title', '').strip()
@@ -132,13 +92,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
n_object['as_async'] = False
n_object.update(watch.extra_notification_token_values())
# This uses the same processor that the queue runner uses
# @todo - Split the notification URLs so we know which one worked, maybe highlight them in green in the UI
result = process_notification(n_object, datastore)
if result:
sent_obj['result'] = result[0]
sent_obj['status'] = 'OK - Sent test notifications'
sent_obj = process_notification(n_object, datastore)
except Exception as e:
e_str = str(e)
@@ -146,9 +100,9 @@ def construct_blueprint(datastore: ChangeDetectionStore):
e_str = e_str.replace(
"DEBUG - <class 'apprise.decorators.base.CustomNotifyPlugin.instantiate_plugin.<locals>.CustomNotifyPluginWrapper'>",
'')
return make_response(e_str, 400)
# it will be a list of things reached, for this purpose just the first is good so we can see the body that was sent
return make_response(sent_obj, 200)
return 'OK - Sent test notifications'
return notification_blueprint

View File

@@ -21,7 +21,6 @@
const email_notification_prefix=JSON.parse('{{ emailprefix|tojson }}');
{% endif %}
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', watch_uuid=uuid)}}";
const notification_test_render_preview_url="{{url_for('ui.ui_notification.ajax_callback_test_render_preview', watch_uuid=uuid)}}";
const playwright_enabled={% if playwright_enabled %}true{% else %}false{% endif %};
const recheck_proxy_start_url="{{url_for('check_proxies.start_check', uuid=uuid)}}";
const proxy_recheck_status_url="{{url_for('check_proxies.get_recheck_status', uuid=uuid)}}";
@@ -357,12 +356,12 @@ Math: {{ 1 + 1 }}") }}
</script>
<br>
{#<div id="text-preview-controls"><span id="text-preview-refresh" class="pure-button button-xsmall">Refresh</span></div>#}
<div class="minitabs-wrapper" id="filter-preview-minitabs">
<div class="minitabs-wrapper">
<div class="minitabs-content">
<div id="text-preview-inner" class="tab-contents-monospace-preview">
<div id="text-preview-inner" class="monospace-preview">
<p>Loading...</p>
</div>
<div id="text-preview-before-inner" style="display: none;" class="tab-contents-monospace-preview">
<div id="text-preview-before-inner" style="display: none;" class="monospace-preview">
<p>Loading...</p>
</div>
</div>

View File

@@ -1,7 +1,7 @@
import pluggy
import os
import importlib
from loguru import logger
import sys
from . import default_plugin
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager
@@ -65,7 +65,7 @@ def load_plugins_from_directory():
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
logger.critical(f"Error loading plugin {module_name}: {e}")
print(f"Error loading plugin {module_name}: {e}")
# Load plugins from the plugins directory
load_plugins_from_directory()

View File

@@ -519,7 +519,7 @@ def changedetection_app(config=None, datastore_o=None):
# watchlist UI buttons etc
import changedetectionio.blueprint.ui as ui
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update, notification_q))
app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update))
import changedetectionio.blueprint.watchlist as watchlist
app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='')

View File

@@ -759,7 +759,6 @@ class processor_text_json_diff_form(commonSettingsForm):
check_unique_lines = BooleanField('Only trigger when unique lines appear in all history', default=False)
remove_duplicate_lines = BooleanField('Remove duplicate lines of text', default=False)
sort_text_alphabetically = BooleanField('Sort text alphabetically', default=False)
strip_ignored_lines = TernaryNoneBooleanField('Strip ignored lines', default=None)
trim_text_whitespace = BooleanField('Trim whitespace before and after text', default=False)
filter_text_added = BooleanField('Added lines', default=True)
@@ -937,7 +936,6 @@ class globalSettingsApplicationForm(commonSettingsForm):
removepassword_button = SubmitField('Remove password', render_kw={"class": "pure-button pure-button-primary"})
render_anchor_tag_content = BooleanField('Render anchor tag content', default=False)
shared_diff_access = BooleanField('Allow anonymous access to watch history page when password is enabled', default=False, validators=[validators.Optional()])
strip_ignored_lines = BooleanField('Strip ignored lines')
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',

View File

@@ -57,7 +57,6 @@ class model(dict):
'rss_hide_muted_watches': True,
'schema_version' : 0,
'shared_diff_access': False,
'strip_ignored_lines': False,
'tags': {}, #@todo use Tag.model initialisers
'timezone': None, # Default IANA timezone name
'webdriver_delay': None , # Extra delay in seconds before extracting text

View File

@@ -642,10 +642,8 @@ class model(watch_base):
def extra_notification_token_placeholder_info(self):
# Used for providing extra tokens
values = []
values.append(('watch_html_link', "Link to URL as <a href>"))
values.append(('watch_url_raw', "Raw URL/link before any jinja2 macro"))
return values
# return [('widget', "Get widget amounts")]
return []
def extract_regex_from_all_history(self, regex):

View File

@@ -58,7 +58,6 @@ class watch_base(dict):
'proxy': None, # Preferred proxy connection
'remote_server_reply': None, # From 'server' reply header
'sort_text_alphabetically': False,
'strip_ignored_lines': None,
'subtractive_selectors': [],
'tag': '', # Old system of text name for a tag, to be removed
'tags': [], # list of UUIDs to App.Tags

View File

@@ -2,8 +2,8 @@ from changedetectionio.model import default_notification_format_for_watch
ult_notification_format_for_watch = 'System default'
default_notification_format = 'HTML Color'
default_notification_body = '{{watch_title}} had a change.\n---\n{{diff}}\n---\n'
default_notification_title = 'ChangeDetection.io Notification - {{watch_title}}'
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
# The values (markdown etc) are from apprise NotifyFormat,
# But to avoid importing the whole heavy module just use the same strings here.

View File

@@ -19,11 +19,6 @@ def notify_supported_methods(func):
return func
def notify_null_method(func):
func = notify(on="null")(func)
return func
def _get_auth(parsed_url: dict) -> str | tuple[str, str]:
user: str | None = parsed_url.get("user")
password: str | None = parsed_url.get("password")
@@ -115,21 +110,3 @@ def apprise_http_custom_handler(
except Exception as e:
logger.error(f"Unexpected error occurred while sending custom notification to {url}: {e}")
return False
@notify_null_method
def apprise_null_custom_handler(
body: str,
title: str,
notify_type: str,
meta: dict,
*args,
**kwargs,
) -> bool:
url: str = meta.get("url")
schema: str = meta.get("schema")
method: str = re.sub(r"s$", "", schema).upper()
logger.info(f"Processed 'null' notification")
return True

View File

@@ -1,167 +1,31 @@
import os
import time
import apprise
from loguru import logger
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
from changedetectionio.safe_jinja import render as jinja_render
from urllib.parse import urlparse
def _populate_notification_tokens(n_object, datastore):
"""
Populate notification tokens (diff, current_snapshot, etc.) if not already present.
This ensures both queued notifications and test notifications have the same data.
"""
from changedetectionio import diff
from changedetectionio.notification import default_notification_format_for_watch
from markupsafe import escape
watch_uuid = n_object.get('uuid')
if not watch_uuid:
return
watch = datastore.data['watching'].get(watch_uuid)
if not watch:
return
dates = []
trigger_text = ''
watch_html_link = ''
if watch:
watch_history = watch.history
dates = list(watch_history.keys())
trigger_text = watch.get('trigger_text', [])
# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
if n_object.get('notification_format').lower().startswith('html'):
snapshot_contents = str(escape(snapshot_contents))
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
# If we ended up here with "System default"
if n_object.get('notification_format') == default_notification_format_for_watch:
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
html_colour_enable = False
line_feed_sep = "\n"
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object.get('notification_format').lower().startswith('html'):
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
if n_object.get('notification_format') == 'HTML Color':
html_colour_enable = True
triggered_text = ''
if len(trigger_text):
from changedetectionio import html_tools
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
if triggered_text:
triggered_text = line_feed_sep.join(triggered_text)
# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
current_snapshot = "Example text: example test\nExample text: More than 1 watch change needs to exist to build a nice preview!"
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])
if n_object.get('notification_format').lower().startswith('html'):
prev_snapshot = str(escape(prev_snapshot))
current_snapshot = str(escape(current_snapshot))
if watch:
v = {'url': watch.get('url'), 'label': watch.label}
watch_html_link = jinja_render(template_str='<a href="{{ label or url | e }}" rel="noopener noreferrer">{{ url | e }}</a>', **v)
n_object.update({
'current_snapshot': snapshot_contents,
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'triggered_text': triggered_text,
'watch_html_link': watch_html_link,
'watch_url': watch.link,
'watch_url_raw': watch.get('url'),
})
if watch:
n_object.update(watch.extra_notification_token_values())
def scan_notification_file_templates(url, datastore, n_body, notification_parameters):
import glob
from urllib.parse import urlparse, parse_qs
try:
scheme = urlparse(url).scheme.lower().strip()
# schema could be overriden dynamically
if scheme == 'null' and 'test_schema=' in url:
scheme = parse_qs(urlparse(url).query).get("test_schema", [None])[0]
logger.debug(f"Looking for '{scheme}' notification wrapper templates...")
# Try exact match first, then wildcard matches
candidates = [
os.path.join(datastore.datastore_path, f"notification-wrapper-{scheme}.html"),
*[f for f in glob.glob(os.path.join(datastore.datastore_path, "notification-wrapper-*--.html"))
if scheme.startswith(os.path.basename(f).replace("notification-wrapper-", "").replace("--.html", ""))]
]
for tpl_name in candidates:
if os.path.isfile(tpl_name):
template_params = notification_parameters.copy()
template_params['notification_body'] = n_body
with open(tpl_name, 'r', encoding='utf-8') as f:
logger.info(f"Using HTML notification template wrapper from '{tpl_name}'")
return jinja_render(template_str=f.read(), **template_params)
except Exception as e:
logger.warning(f"Failed to load notification template: {e}")
return None
def process_notification(n_object, datastore):
from changedetectionio.safe_jinja import render as jinja_render
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
# be sure its registered
from .apprise_plugin.custom_handlers import apprise_http_custom_handler, apprise_null_custom_handler
n_body = ''
n_title = ''
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
now = time.time()
if n_object.get('notification_timestamp'):
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
# Insert variables into the notification content
notification_parameters = create_notification_parameters(n_object, datastore)
n_format = valid_notification_formats.get(
n_object.get('notification_format', default_notification_format),
valid_notification_formats[default_notification_format],
)
# If we arrived with 'System default' then look it up
if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
# Initially text or whatever
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
# Ensure diff rendering is done if not already present (for test notifications)
if not n_object.get('diff') and n_object.get('uuid'):
_populate_notification_tokens(n_object, datastore)
# Insert variables into the notification content
notification_parameters = create_notification_parameters(n_object, datastore)
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
# https://github.com/caronc/apprise/wiki/Development_LogCapture
@@ -180,27 +44,22 @@ def process_notification(n_object, datastore):
with apprise.LogCapture(level=apprise.logging.DEBUG) as logs:
for url in n_object['notification_urls']:
# Commented out is OK
if url.startswith('#') or not url or not url.strip():
logger.trace(f"Skipping notification URL - '{url}'")
continue
# Get the notification body from datastore
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
# hmm unsure about this, but why not
if n_object.get('notification_format', '').startswith('HTML'):
n_body = n_body.replace("\n", '<br>')
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
n_body_from_file_template = scan_notification_file_templates(url=url,
datastore=datastore,
n_body=n_body,
notification_parameters=notification_parameters)
if n_body_from_file_template:
n_body = n_body_from_file_template
url = url.strip()
if url.startswith('#'):
logger.trace(f"Skipping commented out notification URL - {url}")
continue
if not url:
logger.warning(f"Process Notification: skipping empty notification URL.")
continue
logger.info(f">> Process Notification: AppRise notifying {url}")
url = jinja_render(template_str=url, **notification_parameters)
@@ -245,7 +104,7 @@ def process_notification(n_object, datastore):
# Apprise will default to HTML, so we need to override it
# So that whats' generated in n_body is in line with what is going to be sent.
# https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
if not 'format=' in url and (n_format.lower() == 'text' or n_format.lower() == 'markdown'):
if not 'format=' in url and (n_format == 'Text' or n_format == 'Markdown'):
prefix = '?' if not '?' in url else '&'
# Apprise format is lowercase text https://github.com/caronc/apprise/issues/633
n_format = n_format.lower()
@@ -259,15 +118,14 @@ def process_notification(n_object, datastore):
'url': url,
'body_format': n_format})
if n_object.get('notification_urls'):
# Blast off the notifications tht are set in .add()
apobj.notify(
title=n_title,
body=n_body,
body_format=n_format,
# False is not an option for AppRise, must be type None
attach=n_object.get('screenshot', None)
)
# Blast off the notifications tht are set in .add()
apobj.notify(
title=n_title,
body=n_body,
body_format=n_format,
# False is not an option for AppRise, must be type None
attach=n_object.get('screenshot', None)
)
# Returns empty string if nothing found, multi-line string otherwise

View File

@@ -1,15 +0,0 @@
{# Copy this to your data-store directory if you wish to enable it for HTML style notifications, applies to all as a wrapper :) #}
<html>
<body>
Hello,<br>
<p>A change was detected on your web page watch for <p>{{ watch_html_link }}.</p>
[ view history ] [ pause checks ] [ mute notifications ]
<div>
{{ notification_body }}
</div>
</body>
</html>

View File

@@ -1,17 +0,0 @@
## Notification syntax
All notifications use the https://github.com/caronc/apprise syntax, there are some custom ones such as `posts` etc for general web-services usability.
## Template file notification wrappers
You can by default wrap all notifications by creating a `notification-wrapper-HTML-schema.html` in your datastore directory.
For example
You can use "`--`" in the filename where the _schema_ is to symbolize a wildcard. For example `notification-wrapper-HTML-mail--.html` would
apply to `mail://` `mailto://` etc etc
See is `notification-wrapper-HTML-mail--.html` which applies to `mail://`, `mailto://foobar..` etc notifications

View File

@@ -22,14 +22,70 @@ class NotificationService:
def queue_notification_for_watch(self, n_object, watch):
"""
Queue a notification for a watch. Diff rendering and template variables will be
handled by process_notification() to ensure consistency with test notifications.
Queue a notification for a watch with full diff rendering and template variables
"""
from changedetectionio import diff
from changedetectionio.notification import default_notification_format_for_watch
dates = []
trigger_text = ''
now = time.time()
# Add basic metadata for the notification
if watch:
watch_history = watch.history
dates = list(watch_history.keys())
trigger_text = watch.get('trigger_text', [])
# Add text that was triggered
if len(dates):
snapshot_contents = watch.get_history_snapshot(dates[-1])
else:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
# If we ended up here with "System default"
if n_object.get('notification_format') == default_notification_format_for_watch:
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
html_colour_enable = False
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object.get('notification_format') == 'HTML':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
elif n_object.get('notification_format') == 'HTML Color':
line_feed_sep = "<br>"
# Snapshot will be plaintext on the disk, convert to some kind of HTML
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
html_colour_enable = True
else:
line_feed_sep = "\n"
triggered_text = ''
if len(trigger_text):
from . import html_tools
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
if triggered_text:
triggered_text = line_feed_sep.join(triggered_text)
# Could be called as a 'test notification' with only 1 snapshot available
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
if len(dates) > 1:
prev_snapshot = watch.get_history_snapshot(dates[-2])
current_snapshot = watch.get_history_snapshot(dates[-1])
n_object.update({
'current_snapshot': snapshot_contents,
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
'notification_timestamp': now,
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
'triggered_text': triggered_text,
'uuid': watch.get('uuid') if watch else None,
'watch_url': watch.get('url') if watch else None,
})
@@ -37,6 +93,7 @@ class NotificationService:
if watch:
n_object.update(watch.extra_notification_token_values())
logger.trace(f"Main rendered notification placeholders (diff_added etc) calculated in {time.time()-now:.3f}s")
logger.debug("Queued notification for sending")
self.notification_q.put(n_object)

View File

@@ -1,7 +1,7 @@
import pluggy
import os
import importlib
from loguru import logger
import sys
# Global plugin namespace for changedetection.io
PLUGIN_NAMESPACE = "changedetectionio"
@@ -57,7 +57,7 @@ def load_plugins_from_directories():
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
logger.critical(f"Error loading plugin {module_name}: {e}")
print(f"Error loading plugin {module_name}: {e}")
# Load plugins
load_plugins_from_directories()

View File

@@ -1,138 +0,0 @@
"""
Content Type Detection and Stream Classification
This module provides intelligent content-type detection for changedetection.io.
It addresses the common problem where HTTP Content-Type headers are missing, incorrect,
or too generic, which would otherwise cause the wrong processor to be used.
The guess_stream_type class combines:
1. HTTP Content-Type headers (when available and reliable)
2. Python-magic library for MIME detection (analyzing actual file content)
3. Content-based pattern matching for text formats (HTML tags, XML declarations, etc.)
This multi-layered approach ensures accurate detection of RSS feeds, JSON, HTML, PDF,
plain text, CSV, YAML, and XML formats - even when servers provide misleading headers.
Used by: processors/text_json_diff/processor.py and other content processors
"""
# When to apply the 'cdata to real HTML' hack
RSS_XML_CONTENT_TYPES = [
"application/rss+xml",
"application/rdf+xml",
"text/xml",
"application/xml",
"application/atom+xml",
"text/rss+xml", # rare, non-standard
"application/x-rss+xml", # legacy (older feed software)
"application/x-atom+xml", # legacy (older Atom)
]
# JSON Content-types
JSON_CONTENT_TYPES = [
"application/activity+json",
"application/feed+json",
"application/json",
"application/ld+json",
"application/vnd.api+json",
]
# CSV Content-types
CSV_CONTENT_TYPES = [
"text/csv",
"application/csv",
]
# Generic XML Content-types (non-RSS/Atom)
XML_CONTENT_TYPES = [
"text/xml",
"application/xml",
]
# YAML Content-types
YAML_CONTENT_TYPES = [
"text/yaml",
"text/x-yaml",
"application/yaml",
"application/x-yaml",
]
HTML_PATTERNS = ['<!doctype html', '<html', '<head', '<body', '<script', '<iframe', '<div']
import re
import magic
from loguru import logger
class guess_stream_type():
is_pdf = False
is_json = False
is_html = False
is_plaintext = False
is_rss = False
is_csv = False
is_xml = False # Generic XML, not RSS/Atom
is_yaml = False
def __init__(self, http_content_header, content):
magic_content_header = http_content_header
test_content = content[:200].lower().strip()
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
test_content_normalized = re.sub(r'<\s+', '<', test_content)
# Magic will sometimes call text/plain as text/html!
magic_result = None
try:
mime = magic.from_buffer(content[:200], mime=True) # Send the original content
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
if mime and "/" in mime:
magic_result = mime
# Ignore generic/fallback mime types from magic
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
logger.debug(f"Ignoring generic mime type '{mime}' from magic library")
# Trust magic for non-text types immediately
elif mime not in ['text/html', 'text/plain']:
magic_content_header = mime
except Exception as e:
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection")
# Content-based detection (most reliable for text formats)
# Check for HTML patterns first - if found, override magic's text/plain
has_html_patterns = any(p in test_content_normalized for p in HTML_PATTERNS)
# Always trust headers first
if any(s in http_content_header for s in RSS_XML_CONTENT_TYPES) or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
self.is_rss = True
elif any(s in http_content_header for s in JSON_CONTENT_TYPES) or any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
elif any(s in http_content_header for s in CSV_CONTENT_TYPES) or any(s in magic_content_header for s in CSV_CONTENT_TYPES):
self.is_csv = True
elif any(s in http_content_header for s in XML_CONTENT_TYPES) or any(s in magic_content_header for s in XML_CONTENT_TYPES):
# Only mark as generic XML if not already detected as RSS
if not self.is_rss:
self.is_xml = True
elif any(s in http_content_header for s in YAML_CONTENT_TYPES) or any(s in magic_content_header for s in YAML_CONTENT_TYPES):
self.is_yaml = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
###
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
# If magic says text/plain and we found no HTML patterns, trust it
elif magic_result == 'text/plain':
self.is_plaintext = True
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized:
self.is_rss = True
elif test_content_normalized.startswith('<?xml'):
# Generic XML that's not RSS/Atom (RSS/Atom checked above)
self.is_xml = True
elif '%pdf-1' in test_content:
self.is_pdf = True
# Only trust magic for 'text' if no other patterns matched
elif 'text' in magic_content_header:
self.is_plaintext = True

View File

@@ -13,8 +13,6 @@ from changedetectionio import html_tools, content_fetchers
from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT
from loguru import logger
from changedetectionio.processors.magic import guess_stream_type
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
name = 'Webpage Text/HTML, JSON and PDF changes'
@@ -22,9 +20,6 @@ description = 'Detects all text changes where possible'
json_filter_prefixes = ['json:', 'jq:', 'jqraw:']
# Assume it's this type if the server says nothing on content-type
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
class FilterNotFoundInResponse(ValueError):
def __init__(self, msg, screenshot=None, xpath_data=None):
self.screenshot = screenshot
@@ -50,9 +45,6 @@ class perform_site_check(difference_detection_processor):
if not watch:
raise Exception("Watch no longer exists.")
ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower()
stream_content_type = guess_stream_type(http_content_header=ctype_header, content=self.fetcher.content)
# Unset any existing notification error
update_obj = {'last_notification_error': False, 'last_error': False}
@@ -62,7 +54,7 @@ class perform_site_check(difference_detection_processor):
self.xpath_data = self.fetcher.xpath_data
# Track the content type
update_obj['content_type'] = ctype_header
update_obj['content_type'] = self.fetcher.get_all_headers().get('content-type', '').lower()
# Watches added automatically in the queue manager will skip if its the same checksum as the previous run
# Saves a lot of CPU
@@ -77,12 +69,24 @@ class perform_site_check(difference_detection_processor):
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
# return content().textfilter().jsonextract().checksumcompare() ?
is_json = 'application/json' in self.fetcher.get_all_headers().get('content-type', '').lower()
is_html = not is_json
is_rss = False
ctype_header = self.fetcher.get_all_headers().get('content-type', '').lower()
# Go into RSS preprocess for converting CDATA/comment to usable text
if stream_content_type.is_rss:
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']):
if '<rss' in self.fetcher.content[:100].lower():
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
is_rss = True
if watch.is_pdf or stream_content_type.is_pdf:
# source: support, basically treat it as plaintext
if watch.is_source_type_url:
is_html = False
is_json = False
inline_pdf = self.fetcher.get_all_headers().get('content-disposition', '') and '%PDF-1' in self.fetcher.content[:10]
if watch.is_pdf or 'application/pdf' in self.fetcher.get_all_headers().get('content-type', '').lower() or inline_pdf:
from shutil import which
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
if not which(tool):
@@ -126,12 +130,11 @@ class perform_site_check(difference_detection_processor):
has_filter_rule = len(include_filters_rule) and len(include_filters_rule[0].strip())
has_subtractive_selectors = len(subtractive_selectors) and len(subtractive_selectors[0].strip())
if stream_content_type.is_json:
if not has_filter_rule:
# Force a reformat
include_filters_rule.append("json:$")
has_filter_rule = True
if is_json and not has_filter_rule:
include_filters_rule.append("json:$")
has_filter_rule = True
if is_json:
# Sort the JSON so we dont get false alerts when the content is just re-ordered
try:
self.fetcher.content = json.dumps(json.loads(self.fetcher.content), sort_keys=True)
@@ -139,25 +142,34 @@ class perform_site_check(difference_detection_processor):
# Might have just been a snippet, or otherwise bad JSON, continue
pass
if has_filter_rule:
for filter in include_filters_rule:
if any(prefix in filter for prefix in json_filter_prefixes):
stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter)
if stripped_text_from_html:
stream_content_type.is_json = True
stream_content_type.is_html = False
if has_filter_rule:
for filter in include_filters_rule:
if any(prefix in filter for prefix in json_filter_prefixes):
stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter)
is_html = False
# We have 'watch.is_source_type_url' because we should be able to use selectors on the raw HTML but return just that selected HTML
if stream_content_type.is_html or watch.is_source_type_url or stream_content_type.is_plaintext or stream_content_type.is_rss or stream_content_type.is_xml or stream_content_type.is_pdf:
if is_html or watch.is_source_type_url:
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content)
html_content = self.fetcher.content
content_type = self.fetcher.get_all_headers().get('content-type', '').lower()
is_attachment = 'attachment' in self.fetcher.get_all_headers().get('content-disposition', '').lower()
# Some kind of "text" but definitely not RSS looking
if stream_content_type.is_plaintext:
# Try to detect better mime types if its a download or not announced as HTML
if is_attachment or 'octet-stream' in content_type or not 'html' in content_type:
logger.debug(f"Got a reply that may be a download or possibly a text attachment, checking..")
try:
import magic
mime = magic.from_buffer(html_content, mime=True)
logger.debug(f"Guessing mime type, original content_type '{content_type}', mime type detected '{mime}'")
if mime and "/" in mime: # looks valid and is a valid mime type
content_type = mime
except Exception as e:
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}")
if 'text/' in content_type and not 'html' in content_type:
# Don't run get_text or xpath/css filters on plaintext
# We are not HTML, we are not any kind of RSS, doesnt even look like HTML
stripped_text_from_html = html_content
else:
# If not JSON, and if it's not text/plain..
@@ -174,13 +186,13 @@ class perform_site_check(difference_detection_processor):
html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''),
html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=stream_content_type.is_rss)
is_rss=is_rss)
elif filter_rule.startswith('xpath1:'):
html_content += html_tools.xpath1_filter(xpath_filter=filter_rule.replace('xpath1:', ''),
html_content=self.fetcher.content,
append_pretty_line_formatting=not watch.is_source_type_url,
is_rss=stream_content_type.is_rss)
is_rss=is_rss)
else:
html_content += html_tools.include_filters(include_filters=filter_rule,
html_content=self.fetcher.content,
@@ -199,7 +211,7 @@ class perform_site_check(difference_detection_processor):
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
stripped_text_from_html = html_tools.html_to_text(html_content=html_content,
render_anchor_tag_content=do_anchor,
is_rss=stream_content_type.is_rss) # 1874 activate the <title workaround hack
is_rss=is_rss) # 1874 activate the <title workaround hack
if watch.get('trim_text_whitespace'):
stripped_text_from_html = '\n'.join(line.strip() for line in stripped_text_from_html.replace("\n\n", "\n").splitlines())
@@ -238,7 +250,7 @@ class perform_site_check(difference_detection_processor):
# Treat pages with no renderable text content as a change? No by default
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text_from_html.strip()) == 0:
if not is_json and not empty_pages_are_a_change and len(stripped_text_from_html.strip()) == 0:
raise content_fetchers.exceptions.ReplyWithContentButNoText(url=url,
status_code=self.fetcher.get_last_status_code(),
screenshot=self.fetcher.screenshot,
@@ -303,11 +315,6 @@ class perform_site_check(difference_detection_processor):
text_for_checksuming = stripped_text_from_html
if text_to_ignore:
text_for_checksuming = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
# Some people prefer to also completely remove it
strip_ignored_lines = watch.get('strip_ignored_lines') if watch.get('strip_ignored_lines') is not None else self.datastore.data['settings']['application'].get('strip_ignored_lines')
if strip_ignored_lines:
# @todo add test in the 'preview' mode, check the widget works? compare to datastruct
stripped_text_from_html = text_for_checksuming
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
if text_for_checksuming and self.datastore.data['settings']['application'].get('ignore_whitespace', False):
@@ -324,7 +331,7 @@ class perform_site_check(difference_detection_processor):
# Filter and trigger works the same, so reuse it
# It should return the line numbers that match
# Unblock flow if the trigger was found (some text remained after stripped what didnt match)
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
result = html_tools.strip_ignore_text(content=str(text_for_checksuming),
wordlist=trigger_text,
mode="line numbers")
# Unblock if the trigger was found

View File

@@ -18,7 +18,7 @@ def render(template_str, **args: t.Any) -> str:
return output[:JINJA2_MAX_RETURN_PAYLOAD_SIZE]
def render_fully_escaped(content):
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True, extensions=['jinja2_time.TimeExtension'])
env = jinja2.sandbox.ImmutableSandboxedEnvironment(autoescape=True)
template = env.from_string("{{ some_html|e }}")
return template.render(some_html=content)

View File

@@ -1,18 +1,5 @@
$(document).ready(function () {
// Could be from 'watch' or system settings or other
function getNotificationData() {
data = {
notification_body: $('textarea.notification-body').val(),
notification_format: $('select.notification-format').val(),
notification_title: $('input.notification-title').val(),
notification_urls: $('textarea.notification-urls').val(),
tags: $('#tags').val(),
window_url: window.location.href,
}
return data
}
$('#add-email-helper').click(function (e) {
e.preventDefault();
email = prompt("Destination email");
@@ -23,82 +10,17 @@ $(document).ready(function () {
}
});
$('#notifications-minitabs').miniTabs({
"Customise": "#notification-setup",
"Preview": "#notification-preview",
});
$(document).on('click', '[data-target="#notification-preview"]', function (e) {
var data = getNotificationData();
$('#notification-iframe-html-preview').contents().find('body').html('Loading...');
$.ajax({
type: "POST",
url: notification_test_render_preview_url,
data: data,
statusCode: {
400: function (data) {
$('#notification-test-log').show().toggleClass('error', true);
$("#notification-test-log>span").text(data.responseText);
},
}
}).done(function (data) {
$('#notification-test-log').toggleClass('error', false);
setPreview(data['result']);
})
});
function setPreview(data) {
const iframe = document.getElementById("notification-iframe-html-preview");
const isDark = document.documentElement.getAttribute('data-darkmode') === 'true';
// this should come back in the data objk
const isTextFormat = $('select.notification-format').val() === 'Text';
$('#notification-preview-title-text').text(data['title']);
$('#notification-div-text-preview').text(data['body']);
return;
iframe.srcdoc = `
<html data-darkmode="${isDark}">
<head>
<style>
:root {
--color-white: #fff;
--color-grey-200: #333;
--color-grey-800: #e0e0e0;
--color-black: #000;
--color-dark-red: #a00;
--color-light-red: #dd0000;
--color-background: var(--color-grey-800);
--color-text: var(--color-grey-200);
}
html[data-darkmode="true"] {
--color-background: var(--color-grey-200);
--color-text: var(--color-white);
}
body { /* no darkmode */
background-color: var(--color-background);
color: var(--color-text);
padding: 5px;
}
body.text-format {
font-family: monospace;
white-space: pre;
overflow-wrap: normal;
overflow-x: auto;
}
</style>
</head>
<body class="${isTextFormat ? 'text-format' : ''}">${data['body']}</body>
</html>`;
}
$('#send-test-notification').click(function (e) {
e.preventDefault();
var data = getNotificationData();
data = {
notification_body: $('#notification_body').val(),
notification_format: $('#notification_format').val(),
notification_title: $('#notification_title').val(),
notification_urls: $('.notification-urls').val(),
tags: $('#tags').val(),
window_url: window.location.href,
}
$('.notifications-wrapper .spinner').fadeIn();
$('#notification-test-log').show();
@@ -108,14 +30,11 @@ $(document).ready(function () {
data: data,
statusCode: {
400: function (data) {
$("#notification-test-log").toggleClass('error', true);
$("#notification-test-log>span").text(data.responseText);
},
}
}).done(function (data) {
$("#notification-test-log").toggleClass('error', false);
$("#notification-test-log>span").text(data['status']);
$("#notification-test-log>span").text(data);
}).fail(function (jqXHR, textStatus, errorThrown) {
// Handle connection refused or other errors
if (textStatus === "error" && errorThrown === "") {
@@ -123,13 +42,11 @@ $(document).ready(function () {
$("#notification-test-log>span").text("Error: Connection refused or server is unreachable.");
} else {
console.error("Error:", textStatus, errorThrown);
$("#notification-test-log>span").text("An error occurred: " + errorThrown);
$("#notification-test-log>span").text("An error occurred: " + textStatus);
}
}).always(function () {
$('.notifications-wrapper .spinner').hide();
})
});
});

View File

@@ -1,11 +1,11 @@
// Rewrite this is a plugin.. is all this JS really 'worth it?'
window.addEventListener('hashchange', function () {
var tabs = document.querySelectorAll('.tabs .active');
tabs.forEach(function (tab) {
tab.classList.remove('active');
var tabs = document.getElementsByClassName('active');
while (tabs[0]) {
tabs[0].classList.remove('active');
document.body.classList.remove('full-width');
});
}
set_active_tab();
}, false);

View File

@@ -74,7 +74,7 @@ $(document).ready(function () {
$('#filters-and-triggers input')[method]('change', request_textpreview_update.throttle(1000));
$("#filters-and-triggers-tab")[method]('click', request_textpreview_update.throttle(1000));
});
$('#filter-preview-minitabs').miniTabs({
$('.minitabs-wrapper').miniTabs({
"Content after filters": "#text-preview-inner",
"Content raw/before filters": "#text-preview-before-inner"
});

View File

@@ -18,15 +18,8 @@ html[data-darkmode="true"] {
display: block;
}
}
.minitabs-content {
> div {
background-color: rgb(249 249 249 / 13%) !important;
}
}
}

View File

@@ -1,13 +1,6 @@
.minitabs-wrapper {
width: 100%;
.tab-contents-monospace-preview {
font-family: "Courier New", Courier, monospace; /* Sets the font to a monospace type */
font-size: 70%;
word-break: break-word;
white-space: pre-wrap; /* Preserves whitespace and line breaks like <pre> */
}
> div[id] {
padding: 20px;
border: 1px solid #ccc;
@@ -17,45 +10,38 @@
.minitabs-content {
width: 100%;
display: flex;
> div {
flex: 1 1 auto;
min-width: 0;
padding: 1rem;
border: 1px solid #ddd;
background-color: #eee;
overflow: scroll;
}
}
.minitabs {
display: flex;
border-bottom: 1px solid #ccc;
.minitab {
flex: 1;
text-align: center;
padding: 12px 0;
text-decoration: none;
color: #333;
background-color: #f1f1f1;
border: 1px solid #ccc;
border-bottom: none;
cursor: pointer;
transition: background-color 0.3s;
border-top-left-radius: 5px;
border-top-right-radius: 5px;
opacity: 0.45;
&:hover {
background-color: #ddd;
}
&.active {
background-color: #eee;
font-weight: bold;
opacity: 1.0;
}
}
}
.minitab {
flex: 1;
text-align: center;
padding: 12px 0;
text-decoration: none;
color: #333;
background-color: #f1f1f1;
border: 1px solid #ccc;
border-bottom: none;
cursor: pointer;
transition: background-color 0.3s;
}
.minitab:hover {
background-color: #ddd;
}
.minitab.active {
background-color: #fff;
font-weight: bold;
}
}

View File

@@ -1,12 +0,0 @@
#notification-preview {
resize: both;
overflow: hidden;
}
#notification-iframe-html-preview {
width: 100%;
height: 100%;
border: 0;
display: block;
overflow: auto;
}

View File

@@ -1,3 +1,5 @@
@use "minitabs";
body.preview-text-enabled {
@media (min-width: 800px) {
@@ -29,7 +31,19 @@ body.preview-text-enabled {
}
#activate-text-preview {
background-color: var(--color-grey-500);
background-color: var(--color-grey-500);
}
/* actual preview area */
.monospace-preview {
background: var(--color-background-input);
border: 1px solid var(--color-grey-600);
padding: 1rem;
color: var(--color-text-input);
font-family: "Courier New", Courier, monospace; /* Sets the font to a monospace type */
font-size: 70%;
word-break: break-word;
white-space: pre-wrap; /* Preserves whitespace and line breaks like <pre> */
}
}
@@ -39,11 +53,3 @@ body.preview-text-enabled {
z-index: 3;
box-shadow: 1px 1px 4px var(--color-shadow-jump);
}
#filter-preview-minitabs {
.minitabs-content {
> div {
overflow: scroll;
}
}
}

View File

@@ -34,6 +34,7 @@
transition: all 0.2s ease;
cursor: pointer;
display: block;
min-width: 60px;
text-align: center;
}

View File

@@ -20,8 +20,6 @@
@use "parts/lister_extra";
@use "parts/socket";
@use "parts/visualselector";
@use "parts/_minitabs";
@use "parts/_notification";
@use "parts/widgets";
body {
@@ -337,11 +335,6 @@ a.pure-button-selected {
overflow-wrap: break-word;
max-width: 100%;
box-sizing: border-box;
&.error {
> span {
color: var(--color-error) !important;
}
}
}
}
@@ -351,6 +344,11 @@ label {
}
}
#notification-customisation {
border: 1px solid var(--color-border-notification);
padding: 0.5rem;
border-radius: 5px;
}
#notification-error-log {
border: 1px solid var(--color-border-notification);

File diff suppressed because one or more lines are too long

View File

@@ -24,153 +24,121 @@
</ul>
</div>
<div class="notifications-wrapper">
<a id="send-test-notification" class="pure-button button-secondary" >Send test notification</a> <div class="spinner" style="display: none;"></div>
<a id="send-test-notification" class="pure-button button-secondary button-xsmall" >Send test notification</a> <div class="spinner" style="display: none;"></div>
{% if emailprefix %}
<a id="add-email-helper" class="pure-button button-secondary" >Add email <img style="height: 1em; display: inline-block" src="{{url_for('static_content', group='images', filename='email.svg')}}" alt="Add an email address"> </a>
<a id="add-email-helper" class="pure-button button-secondary button-xsmall" >Add email <img style="height: 1em; display: inline-block" src="{{url_for('static_content', group='images', filename='email.svg')}}" alt="Add an email address"> </a>
{% endif %}
<a href="{{url_for('settings.notification_logs')}}" class="pure-button button-secondary " >Notification debug logs</a>
<a href="{{url_for('settings.notification_logs')}}" class="pure-button button-secondary button-xsmall" >Notification debug logs</a>
<br>
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
</div>
</div>
<div class="pure-control-group">
<p>Customise the contents of the notification using the form below, this is not necessary but you can create quite interesting integrations :-)</p>
<div class="minitabs-wrapper" id="notifications-minitabs">
<div class="minitabs-content">
<div id="notification-setup">
<div class="pure-control-group">
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
</div>
<div class="pure-control-group">
{{ render_field(form.notification_body , rows=5, class="notification-body", placeholder=settings_application['notification_body']) }}
<span class="pure-form-message-inline">Body for the notification &dash; You can use <a
target="newwindow"
href="https://jinja.palletsprojects.com/en/3.0.x/templates/">Jinja2</a> templating in the notification title, body and URL, and tokens from below.
</span>
<div id="notification-customisation" class="pure-control-group">
<div class="pure-control-group">
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
<span class="pure-form-message-inline">Title for all notifications</span>
</div>
<div class="pure-control-group">
{{ render_field(form.notification_body , rows=5, class="notification-body", placeholder=settings_application['notification_body']) }}
<span class="pure-form-message-inline">Body for all notifications &dash; You can use <a target="newwindow" href="https://jinja.palletsprojects.com/en/3.0.x/templates/">Jinja2</a> templating in the notification title, body and URL, and tokens from below.
</span>
</div>
<div class="pure-controls">
<div data-target="#notification-tokens-info"
class="toggle-show pure-button button-tag button-xsmall">Show
token/placeholders
</div>
</div>
<div class="pure-controls" style="display: none;" id="notification-tokens-info">
<table class="pure-table" id="token-table">
<thead>
</div>
<div class="pure-controls">
<div data-target="#notification-tokens-info" class="toggle-show pure-button button-tag button-xsmall">Show token/placeholders</div>
</div>
<div class="pure-controls" style="display: none;" id="notification-tokens-info">
<table class="pure-table" id="token-table">
<thead>
<tr>
<th>Token</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td><code>{{ '{{base_url}}' }}</code></td>
<td>The URL of the changedetection.io instance you are running.</td>
</tr>
<tr>
<td><code>{{ '{{watch_url}}' }}</code></td>
<td>The URL being watched.</td>
</tr>
<tr>
<td><code>{{ '{{watch_uuid}}' }}</code></td>
<td>The UUID of the watch.</td>
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The page title of the watch, uses &lt;title&gt; if not set, falls back to URL</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>
<td>The watch group / tag</td>
</tr>
<tr>
<td><code>{{ '{{preview_url}}' }}</code></td>
<td>The URL of the preview page generated by changedetection.io.</td>
</tr>
<tr>
<td><code>{{ '{{diff_url}}' }}</code></td>
<td>The URL of the diff output for the watch.</td>
</tr>
<tr>
<td><code>{{ '{{diff}}' }}</code></td>
<td>The diff output - only changes, additions, and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_added}}' }}</code></td>
<td>The diff output - only changes and additions</td>
</tr>
<tr>
<td><code>{{ '{{diff_removed}}' }}</code></td>
<td>The diff output - only changes and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_full}}' }}</code></td>
<td>The diff output - full difference output</td>
</tr>
<tr>
<td><code>{{ '{{diff_patch}}' }}</code></td>
<td>The diff output - patch in unified format</td>
</tr>
<tr>
<td><code>{{ '{{current_snapshot}}' }}</code></td>
<td>The current snapshot text contents value, useful when combined with JSON or CSS filters
</td>
</tr>
<tr>
<td><code>{{ '{{triggered_text}}' }}</code></td>
<td>Text that tripped the trigger from filters</td>
{% if extra_notification_token_placeholder_info %}
{% for token in extra_notification_token_placeholder_info %}
<tr>
<th>Token</th>
<th>Description</th>
<td><code>{{ '{{' }}{{ token[0] }}{{ '}}' }}</code></td>
<td>{{ token[1] }}</td>
</tr>
</thead>
<tbody>
<tr>
<td><code>{{ '{{base_url}}' }}</code></td>
<td>The URL of the changedetection.io instance you are running.</td>
</tr>
<tr>
<td><code>{{ '{{watch_url}}' }}</code></td>
<td>The URL being watched.</td>
</tr>
<tr>
<td><code>{{ '{{watch_uuid}}' }}</code></td>
<td>The UUID of the watch.</td>
</tr>
<tr>
<td><code>{{ '{{watch_title}}' }}</code></td>
<td>The page title of the watch, uses &lt;title&gt; if not set, falls back to URL</td>
</tr>
<tr>
<td><code>{{ '{{watch_tag}}' }}</code></td>
<td>The watch group / tag</td>
</tr>
<tr>
<td><code>{{ '{{preview_url}}' }}</code></td>
<td>The URL of the preview page generated by changedetection.io.
</td>
</tr>
<tr>
<td><code>{{ '{{diff_url}}' }}</code></td>
<td>The URL of the diff output for the watch.</td>
</tr>
<tr>
<td><code>{{ '{{diff}}' }}</code></td>
<td>The diff output - only changes, additions, and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_added}}' }}</code></td>
<td>The diff output - only changes and additions</td>
</tr>
<tr>
<td><code>{{ '{{diff_removed}}' }}</code></td>
<td>The diff output - only changes and removals</td>
</tr>
<tr>
<td><code>{{ '{{diff_full}}' }}</code></td>
<td>The diff output - full difference output</td>
</tr>
<tr>
<td><code>{{ '{{diff_patch}}' }}</code></td>
<td>The diff output - patch in unified format</td>
</tr>
<tr>
<td><code>{{ '{{current_snapshot}}' }}</code></td>
<td>The current snapshot text contents value, useful when combined
with JSON or CSS filters
</td>
</tr>
<tr>
<td><code>{{ '{{triggered_text}}' }}</code></td>
<td>Text that tripped the trigger from filters</td>
</tr>
{% if extra_notification_token_placeholder_info %}
{% for token in extra_notification_token_placeholder_info %}
<tr>
<td><code>{{ '{{' }}{{ token[0] }}{{ '}}' }}</code></td>
<td>{{ token[1] }}</td>
</tr>
{% endfor %}
{% endif %}
</tbody>
</table>
<div class="pure-form-message-inline">
<p>
Warning: Contents of <code>{{ '{{diff}}' }}</code>,
<code>{{ '{{diff_removed}}' }}</code>, and
<code>{{ '{{diff_added}}' }}</code> depend on how the difference
algorithm perceives the change. <br>
For example, an addition or removal could be perceived as a change
in some cases. <a target="newwindow"
href="https://github.com/dgtlmoon/changedetection.io/wiki/Using-the-%7B%7Bdiff%7D%7D,-%7B%7Bdiff_added%7D%7D,-and-%7B%7Bdiff_removed%7D%7D-notification-tokens">More
Here</a> <br>
</p>
<p>
For JSON payloads, use <strong>|tojson</strong> without quotes for
automatic escaping, for example - <code>{
"name": {{ '{{ watch_title|tojson }}' }} }</code>
</p>
<p>
URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code>
</p>
</div>
</div>
<div class="pure-control-group">
{{ render_field(form.notification_format , class="notification-format") }}
<span class="pure-form-message-inline">Format for all notifications</span>
</div>
</div>
<div id="notification-preview" style="display: none; height:100%; display:flex; flex-direction:column;">
<p><strong>Title: </strong><span id="notification-preview-title-text">Preview loading..</span></p>
<div style="flex:1; display:flex; flex-direction:column;">
<strong>Body: </strong>
<div id="notification-div-text-preview" style="flex:1; height:95%; width:100%; border-radius:4px; margin-top:0.5rem; border:none;"></div>
<iframe id="notification-iframe-html-preview"
style="flex:1; height:95%; width:100%; border-radius:4px; margin-top:0.5rem; border:none;">
Preview loading...
</iframe>
</div>
</div>
{% endfor %}
{% endif %}
</tbody>
</table>
<div class="pure-form-message-inline">
<p>
Warning: Contents of <code>{{ '{{diff}}' }}</code>, <code>{{ '{{diff_removed}}' }}</code>, and <code>{{ '{{diff_added}}' }}</code> depend on how the difference algorithm perceives the change. <br>
For example, an addition or removal could be perceived as a change in some cases. <a target="newwindow" href="https://github.com/dgtlmoon/changedetection.io/wiki/Using-the-%7B%7Bdiff%7D%7D,-%7B%7Bdiff_added%7D%7D,-and-%7B%7Bdiff_removed%7D%7D-notification-tokens">More Here</a> <br>
</p>
<p>
For JSON payloads, use <strong>|tojson</strong> without quotes for automatic escaping, for example - <code>{ "name": {{ '{{ watch_title|tojson }}' }} }</code>
</p>
<p>
URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code>
</p>
</div>
</div>
<div class="pure-control-group">
{{ render_field(form.notification_format , class="notification-format") }}
<span class="pure-form-message-inline">Format for all notifications</span>
</div>
</div>
{% endmacro %}
{% endmacro %}

View File

@@ -27,15 +27,8 @@
{% endmacro %}
{% macro render_checkbox_field(field) %}
<div class="checkbox {% if field.errors or field.top_errors %} error {% endif %}">
<div class="checkbox {% if field.errors %} error {% endif %}">
{{ field(**kwargs)|safe }} {{ field.label }}
{% if field.top_errors %}
<ul class="errors top-errors">
{% for error in field.top_errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}
@@ -50,16 +43,9 @@
{% if BooleanField %}
{% set _ = field.__setattr__('boolean_mode', true) %}
{% endif %}
<div class="ternary-field {% if field.errors or field.top_errors %} error {% endif %}">
<div class="ternary-field {% if field.errors %} error {% endif %}">
<div class="ternary-field-label">{{ field.label }}</div>
<div class="ternary-field-widget">{{ field(**kwargs)|safe }}</div>
{% if field.top_errors %}
<ul class="errors top-errors">
{% for error in field.top_errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}
@@ -72,15 +58,8 @@
{% macro render_simple_field(field) %}
<span class="label {% if field.errors or field.top_errors %}error{% endif %}">{{ field.label }}</span>
<span {% if field.errors or field.top_errors %} class="error" {% endif %}>{{ field(**kwargs)|safe }}
{% if field.top_errors %}
<ul class="errors top-errors">
{% for error in field.top_errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
<span class="label {% if field.errors %}error{% endif %}">{{ field.label }}</span>
<span {% if field.errors %} class="error" {% endif %}>{{ field(**kwargs)|safe }}
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}
@@ -95,15 +74,8 @@
{% macro render_nolabel_field(field) %}
<span>
{{ field(**kwargs)|safe }}
{% if field.top_errors or field.errors %}
{% if field.errors %}
<span class="error">
{% if field.top_errors %}
<ul class="errors top-errors">
{% for error in field.top_errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
{% if field.errors %}
<ul class=errors>
{% for error in field.errors %}

View File

@@ -26,10 +26,7 @@
<li>Changing this will affect the comparison checksum which may trigger an alert</li>
</ul>
</span>
<br><br>
<div class="pure-control-group">
{{ render_ternary_field(form.strip_ignored_lines) }}
</div>
</fieldset>
<fieldset>

View File

@@ -1,17 +1,17 @@
import json
import os
import time
import re
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \
set_longer_modified_response
from changedetectionio.tests.util import extract_UUID_from_client
import logging
import base64
# NOTE - RELIES ON mailserver as hostname running, see github build recipes
# Should be hostname (never IP), looks for our test mailserver that repeats the content
# python3 changedetectionio/tests/smtp/smtp-test-server.py &
# mailserver=localhost pytest tests/smtp/test_notification_smtp.py::test_check_notification_email_formats_default_HTML
smtp_test_server = os.getenv('mailserver', 'mailserver')
smtp_test_server = 'mailserver'
from changedetectionio.notification import (
default_notification_body,
@@ -20,35 +20,7 @@ from changedetectionio.notification import (
valid_notification_formats,
)
from email import policy
from email.parser import BytesParser, Parser
def parse_mime(raw):
"""Return (EmailMessage, dict[str, list[str]] bodies by content-type)."""
if isinstance(raw, (bytes, bytearray)):
msg = BytesParser(policy=policy.default).parsebytes(raw)
else:
msg = Parser(policy=policy.default).parsestr(raw)
parts_by_type = {}
if msg.is_multipart():
for part in msg.walk():
if part.get_content_maintype() == "multipart":
continue
ctype = part.get_content_type() # e.g. "text/plain"
text = part.get_content() # decoded str
parts_by_type.setdefault(ctype, []).append(text)
else:
parts_by_type.setdefault(msg.get_content_type(), []).append(msg.get_content())
return msg, parts_by_type
def one_or_join(parts_dict, ctype):
"""Join multiple parts of the same type (rare but possible)."""
return "\n".join(parts_dict.get(ctype, []))
def norm_newlines(s: str) -> str:
return s.replace("\r\n", "\n").replace("\r", "\n")
def get_last_message_from_smtp_server():
import socket
@@ -105,36 +77,37 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
time.sleep(3)
raw = get_last_message_from_smtp_server()
assert raw # not empty
msg, bodies = parse_mime(raw)
plain = norm_newlines(one_or_join(bodies, "text/plain"))
html = norm_newlines(one_or_join(bodies, "text/html"))
# Now assert against the decoded bodies
assert "(added) So let's see what happens.\n" in plain # plaintext uses a literal apostrophe
assert "(added) So let&#39;s see what happens.<br>" in html # html uses &#39; and <br>
# You can also check counts, boundaries, etc.
assert html.count("So let&#39;s see what happens.") == 3
assert "modified head title had a change." in plain
assert "modified head title had a change.<br>" in html
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
# The email should have two bodies, and the text/html part should be <br>
assert 'Content-Type: text/plain' in msg
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
assert 'Content-Type: text/html' in msg
assert '(added) So let\'s see what happens.<br>' in msg # the html part
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage):
## live_server_setup(live_server) # Setup on conftest per function
# HTML problems? see this
# https://github.com/caronc/apprise/issues/633
set_original_response()
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
notification_body = f"""{default_notification_body}"""
notification_body = f"""<!DOCTYPE html>
<html lang="en">
<head>
<title>My Webpage</title>
</head>
<body>
<h1>Test</h1>
{default_notification_body}
</body>
</html>
"""
#####################
# Set this up for when we remove the notification from the watch, it should fallback with these details
@@ -143,12 +116,11 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": notification_body,
"application-notification_format": 'Text', # handler.py should be sure to add &format=text to override default html from apprise
"application-notification_format": 'Text',
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add a watch and trigger a HTTP POST
@@ -168,26 +140,18 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
wait_for_all_checks(client)
time.sleep(3)
raw = get_last_message_from_smtp_server()
assert raw
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
# with open('/tmp/m.txt', 'w') as f:
# f.write(msg)
msg, bodies = parse_mime(raw)
# The email should not have two bodies, should be TEXT only
plain = norm_newlines(one_or_join(bodies, "text/plain"))
html = norm_newlines(one_or_join(bodies, "text/html"))
assert not html # should be no HTML here
assert 'Content-Type: text/plain' in msg
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
# Expect ONLY text/plain body
assert "text/plain" in bodies
assert "text/html" not in bodies
# Assert on decoded plaintext (literal apostrophe, not &#39;)
# Should be NO markup when in text mode
assert "(added) So let's see what happens.\n" in plain
# ---------- Flip to HTML format, then expect multipart with both ----------
set_original_response()
# Now override as HTML format
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
@@ -197,28 +161,23 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
"time_between_check_use_default": "y"},
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
time.sleep(3)
msg = get_last_message_from_smtp_server()
assert len(msg) >= 1
raw = get_last_message_from_smtp_server()
assert raw
# The email should have two bodies, and the text/html part should be <br>
assert 'Content-Type: text/plain' in msg
assert '(removed) So let\'s see what happens.\r\n' in msg # The plaintext part with \n
assert 'Content-Type: text/html' in msg
assert '(removed) So let\'s see what happens.<br>' in msg # the html part
msg, bodies = parse_mime(raw)
plain = norm_newlines(one_or_join(bodies, "text/plain"))
html = norm_newlines(one_or_join(bodies, "text/html"))
# https://github.com/dgtlmoon/changedetection.io/issues/2103
assert '<h1>Test</h1>' in msg
assert '&lt;' not in msg
assert 'Content-Type: text/html' in msg
# Expect both text/plain and text/html bodies now
assert "text/plain" in bodies
assert "text/html" in bodies
# Plaintext reflects the removal line (literal apostrophe)
assert "(removed) So let's see what happens.\n" in plain
assert "(removed) So let&#39;s see what happens.<br>" in html
# Optional: ensure we got multipart/alternative (typical for dual bodies)
if msg.is_multipart():
# most senders do "multipart/alternative" for text/plain + text/html
assert msg.get_content_subtype() in ("alternative", "mixed", "related")
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data

View File

@@ -167,20 +167,6 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
assert b'Deleted' in res.data
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
"""
https://github.com/dgtlmoon/changedetection.io/issues/3434
I noticed that a watched website can be monitored fine as long as the server sends content-type: text/plain; charset=utf-8,
but once the server sends content-type: application/octet-stream (which is usually done to force the browser to show the Download dialog),
changedetection somehow ignores all line breaks and treats the document file as if everything is on one line.
WHAT THIS DOES - makes the system rely on 'magic' to determine what is it
:param client:
:param live_server:
:param measure_memory_usage:
:return:
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""some random text that should be split by line
and not parsed with html_to_text
@@ -229,69 +215,3 @@ got it\r\n
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def test_standard_text_plain(client, live_server, measure_memory_usage):
"""
https://github.com/dgtlmoon/changedetection.io/issues/3434
I noticed that a watched website can be monitored fine as long as the server sends content-type: text/plain; charset=utf-8,
but once the server sends content-type: application/octet-stream (which is usually done to force the browser to show the Download dialog),
changedetection somehow ignores all line breaks and treats the document file as if everything is on one line.
The real bug here can be that it will try to process plain-text as HTML, losing <etc>
:param client:
:param live_server:
:param measure_memory_usage:
:return:
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""some random text that should be split by line
and not parsed with html_to_text
<title>Even this title should stay because we are just plain text</title>
this way we know that it correctly parsed as plain text
\r\n
ok\r\n
got it\r\n
""")
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
### check the front end
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
assert b"some random text that should be split by line\n" in res.data
####
# Check the snapshot by API that it has linefeeds too
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
res = client.get(
url_for("watchhistory", uuid=watch_uuid),
headers={'x-api-key': api_key},
)
# Fetch a snapshot by timestamp, check the right one was found
res = client.get(
url_for("watchsinglehistory", uuid=watch_uuid, timestamp=list(res.json.keys())[-1]),
headers={'x-api-key': api_key},
)
assert b"some random text that should be split by line\n" in res.data
assert b"<title>Even this title should stay because we are just plain text</title>" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)

View File

@@ -264,6 +264,8 @@ def test_limit_tag_ui(client, live_server, measure_memory_usage):
client.get(url_for('ui.mark_all_viewed', tag=tag_uuid), follow_redirects=True)
wait_for_all_checks(client)
with open('/tmp/fuck.html', 'wb') as f:
f.write(res.data)
# Should be only 1 unviewed
res = client.get(url_for("watchlist.index"))
assert res.data.count(b' unviewed ') == 1

View File

@@ -3,8 +3,9 @@
import time
import os
import json
import logging
from flask import url_for
from .util import wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks
from urllib.parse import urlparse, parse_qs
def test_consistent_history(client, live_server, measure_memory_usage):

View File

@@ -58,39 +58,3 @@ def test_ignore(client, live_server, measure_memory_usage):
# Should be in base.html
assert b'csrftoken' in res.data
def test_strip_ignore_lines(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
set_original_ignore_response()
# Goto the settings page, add our ignore text
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-strip_ignored_lines": "y",
"application-global_ignore_text": "Which is across multiple",
'application-fetch_backend': "html_requests"
},
follow_redirects=True
)
assert b"Settings updated." in res.data
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# It should not be in the preview anymore
res = client.get(url_for("ui.ui_views.preview_page", uuid=uuid))
assert b'<div class="ignored">' not in res.data
assert b'Which is across multiple' not in res.data

View File

@@ -3,7 +3,6 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from ..notification import default_notification_title, default_notification_body, default_notification_format
# def test_setup(client, live_server, measure_memory_usage):
@@ -11,6 +10,7 @@ from ..notification import default_notification_title, default_notification_body
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_jinja2_in_url_query(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_return_query', _external=True)
@@ -56,20 +56,3 @@ def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
assert b'is invalid and cannot be used' in res.data
# Some of the spewed output from the subclasses
assert b'dict_values' not in res.data
def test_jinja2_notification(client, live_server, measure_memory_usage):
res = client.post(
url_for("settings.settings_page"),
data={"application-notification_urls": "posts://127.0.0.1",
"application-notification_title": "on the {% now 'America/New_York', '%Y-%m-%d' %}",
"application-notification_body": "on the {% now 'America/New_York', '%Y-%m-%d' %}",
"application-notification_format": default_notification_format,
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
assert b"Settings updated." in res.data

View File

@@ -291,20 +291,6 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
# test_endpoint - that sends the contents of a file
# test_notification_endpoint - that takes a POST and writes it to file (test-datastore/notification.txt)
# Drop in a custom wrapping template
with open("test-datastore/notification-wrapper.html", "w" ) as f:
f.write("""<html>
<body id="notification-wrapper">
A change was detected at {{watch_html_link}}
template_params = notification_parameters.copy()
template_params['notification_body'] = n_body
template_params['notification_url_current'] = url
n_body = jinja_render(template_str=notification_template, **template_params)
</body>
""")
# CUSTOM JSON BODY CHECK for POST://
set_original_response()
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation

View File

@@ -72,7 +72,13 @@ def test_trigger_functionality(client, live_server, measure_memory_usage):
)
assert b"1 Imported" in res.data
# Trigger a check
# And set the trigger text as 'ignore text', it should then not trigger
live_server.app.config['DATASTORE'].data['settings']['application']['global_ignore_text'] = [trigger_text]
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Goto the edit page, add our ignore text
@@ -123,12 +129,23 @@ def test_trigger_functionality(client, live_server, measure_memory_usage):
# Now set the content which contains the trigger text
set_modified_with_trigger_text_response()
# There is a "ignore text" set of the change that should be also the trigger, it should not trigger
# because the ignore text should be stripped from the response, therefor, the trigger should not fire
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
live_server.app.config['DATASTORE'].data['settings']['application']['global_ignore_text'] = []
# check that the trigger fired once we stopped ignore it
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
# https://github.com/dgtlmoon/changedetection.io/issues/616
# https://github.com/dgtlmoon/changedetection.io/issues/616
# Apparently the actual snapshot that contains the trigger never shows
res = client.get(url_for("ui.ui_views.diff_history_page", uuid="first"))
assert b'Add to cart' in res.data

View File

@@ -1,42 +1,12 @@
# -*- coding: utf-8 -*-
import time
from flask import url_for
from .util import wait_for_all_checks
from ..processors.magic import RSS_XML_CONTENT_TYPES
from .util import live_server_setup, wait_for_all_checks
from ..html_tools import *
def set_rss_atom_feed_response(header=''):
test_return_data = f"""{header}<!-- Generated on Wed, 08 Oct 2025 08:42:33 -0700, really really honestly -->
<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<atom:link href="https://store.waterpowered.com/news/collection//" rel="self" type="application/rss+xml"/>
<title>RSS Feed</title>
<link>
<![CDATA[ https://store.waterpowered.com/news/collection// ]]>
</link>
<description>
<![CDATA[ Events and Announcements for ]]>
</description>
<language>en-us</language>
<generator>water News RSS</generator>
<item>
<title> 🍁 Lets go discount</title>
<description><p class="bb_paragraph">ok heres the description</p></description>
<link>
<![CDATA[ https://store.waterpowered.com/news/app/1643320/view/511845698831908921 ]]>
</link>
<pubDate>Wed, 08 Oct 2025 15:28:55 +0000</pubDate>
<guid isPermaLink="true">https://store.waterpowered.com/news/app/1643320/view/511845698831908921</guid>
<enclosure url="https://clan.fastly.waterstatic.com/images/40721482/42822e5f00b2becf520ace9500981bb56f3a89f2.jpg" length="0" type="image/jpeg"/>
</item>
</channel>
</rss>"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
return None
@@ -605,47 +575,3 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
def _subtest_xpath_rss(client, content_type='text/html'):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type=content_type, _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first", unpause_on_save=1),
data={
"url": test_url,
"include_filters": "xpath://item",
"tags": '',
"fetch_backend": "html_requests",
"time_between_check_use_default": "y",
},
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
assert b"Lets go discount" in res.data, f"When testing for Lets go discount called with content type '{content_type}'"
assert b"Events and Announcements" not in res.data, f"When testing for Lets go discount called with content type '{content_type}'" # It should not be here because thats not our selector target
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
# Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server):
for feed_header in ['', '<?xml version="1.0" encoding="utf-8"?>']:
set_rss_atom_feed_response(header=feed_header)
for content_type in RSS_XML_CONTENT_TYPES:
_subtest_xpath_rss(client, content_type=content_type)

View File

@@ -39,7 +39,7 @@ jsonpath-ng~=1.5.3
# jq not available on Windows so must be installed manually
# Notification library
apprise==1.9.5
apprise==1.9.4
# - Needed for apprise/spush, and maybe others? hopefully doesnt trigger a rust compile.
# - Requires extra wheel for rPi, adds build time for arm/v8 which is not in piwheels
@@ -135,7 +135,7 @@ tzdata
pluggy ~= 1.5
# Needed for testing, cross-platform for process and system monitoring
psutil==7.1.0
psutil==7.0.0
ruff >= 0.11.2
pre_commit >= 4.2.0