diff --git a/changedetectionio/diff.py b/changedetectionio/diff.py index 31e37efc..a1e58830 100644 --- a/changedetectionio/diff.py +++ b/changedetectionio/diff.py @@ -7,17 +7,18 @@ HTML_ADDED_STYLE = "background-color: #dafbe1; color: #116329;" HTML_CHANGED_STYLE = "background-color: #ffd8b5; color: #953800;" # These get set to html or telegram type or discord compatible or whatever in handler.py -REMOVED_PLACEMARKER_OPEN = '<< List[str]: """Return a slice of the list, or a single element if start == end.""" diff --git a/changedetectionio/notification/handler.py b/changedetectionio/notification/handler.py index 6f32eab1..f2657098 100644 --- a/changedetectionio/notification/handler.py +++ b/changedetectionio/notification/handler.py @@ -9,9 +9,8 @@ from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \ ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \ CHANGED_PLACEMARKER_CLOSED, HTML_CHANGED_STYLE -from ..notification_service import NotificationContextData +from ..notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER -CUSTOM_LINEBREAK_PLACEHOLDER='$$BR$$' def markup_text_links_to_html(body): """ @@ -200,15 +199,6 @@ def process_notification(n_object: NotificationContextData, datastore): # Register custom Discord plugin from .apprise_plugin.discord import NotifyDiscordCustom - # Create list of custom handler protocols (both http and https versions) - custom_handler_protocols = [f"{method}://" for method in SUPPORTED_HTTP_METHODS] - custom_handler_protocols += [f"{method}s://" for method in SUPPORTED_HTTP_METHODS] - - has_custom_handler = any( - url.startswith(tuple(custom_handler_protocols)) - for url in n_object['notification_urls'] - ) - if not isinstance(n_object, NotificationContextData): raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") @@ -279,6 +269,13 @@ def process_notification(n_object: NotificationContextData, datastore): logger.info(f">> Process Notification: AppRise notifying {url}") url = jinja_render(template_str=url, **notification_parameters) + # If it's a plaintext document, and they want HTML type email/alerts, so it needs to be escaped + watch_mime_type = n_object.get('watch_mime_type') + if watch_mime_type and 'text/' in watch_mime_type.lower() and not 'html' in watch_mime_type.lower(): + if 'html' in requested_output_format: + from markupsafe import escape + n_body = str(escape(n_body)) + (url, n_body, n_title) = apply_service_tweaks(url=url, n_body=n_body, n_title=n_title, requested_output_format=requested_output_format_original) apprise_input_format = "NO-THANKS-WE-WILL-MANAGE-ALL-OF-THIS" @@ -301,26 +298,18 @@ def process_notification(n_object: NotificationContextData, datastore): requested_output_format = NotifyFormat.HTML.value apprise_input_format = NotifyFormat.MARKDOWN.value - # If it's a plaintext document, and they want HTML type email/alerts, so it needs to be escaped - watch_mime_type = n_object.get('watch_mime_type', '').lower() - if watch_mime_type and 'text/' in watch_mime_type and not 'html' in watch_mime_type: - if 'html' in requested_output_format: - from markupsafe import escape - n_body = str(escape(n_body)) # Could have arrived at any stage, so we dont end up running .escape on it if 'html' in requested_output_format: - n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '
') + n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '
\n') else: # Just incase n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '') - - apobj.add(url) - sent_objs.append({'title': n_title, 'body': n_body, 'url': url}) + apobj.add(url) apobj.notify( title=n_title, diff --git a/changedetectionio/notification_service.py b/changedetectionio/notification_service.py index e19f314a..2a987bb5 100644 --- a/changedetectionio/notification_service.py +++ b/changedetectionio/notification_service.py @@ -11,27 +11,32 @@ import time from changedetectionio.notification import default_notification_format +# This gets modified on notification time (handler.py) depending on the required notification output +CUSTOM_LINEBREAK_PLACEHOLDER='@BR@' + + # What is passed around as notification context, also used as the complete list of valid {{ tokens }} class NotificationContextData(dict): def __init__(self, initial_data=None, **kwargs): super().__init__({ + 'base_url': None, 'current_snapshot': None, 'diff': None, 'diff_added': None, 'diff_full': None, 'diff_patch': None, 'diff_removed': None, + 'diff_url': None, + 'markup_text_links_to_html_links': False, # If automatic conversion of plaintext to HTML should happen 'notification_timestamp': time.time(), + 'preview_url': None, 'screenshot': None, 'triggered_text': None, 'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', # Converted to 'watch_uuid' in create_notification_parameters - 'watch_url': 'https://WATCH-PLACE-HOLDER/', - 'base_url': None, - 'diff_url': None, - 'preview_url': None, + 'watch_mime_type': None, 'watch_tag': None, 'watch_title': None, - 'markup_text_links_to_html_links': False, # If automatic conversion of plaintext to HTML should happen + 'watch_url': 'https://WATCH-PLACE-HOLDER/', }) # Apply any initial data passed in @@ -92,24 +97,13 @@ class NotificationService: if n_object.get('notification_format') == default_notification_format_for_watch: n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format') - # HTML needs linebreak, but MarkDown and Text can use a linefeed - if n_object.get('notification_format') == 'HTML': - line_feed_sep = "
" - # Snapshot will be plaintext on the disk, convert to some kind of HTML - snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) - elif n_object.get('notification_format') == 'HTML Color': - line_feed_sep = "
" - # Snapshot will be plaintext on the disk, convert to some kind of HTML - snapshot_contents = snapshot_contents.replace('\n', line_feed_sep) - else: - line_feed_sep = "\n" triggered_text = '' if len(trigger_text): from . import html_tools triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text) if triggered_text: - triggered_text = line_feed_sep.join(triggered_text) + triggered_text = CUSTOM_LINEBREAK_PLACEHOLDER.join(triggered_text) # Could be called as a 'test notification' with only 1 snapshot available prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n" @@ -121,11 +115,11 @@ class NotificationService: n_object.update({ 'current_snapshot': snapshot_contents, - 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep), - 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep), - 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep), - 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True), - 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep), + 'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), + 'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), + 'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), + 'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER, patch_format=True), + 'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER), 'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None, 'triggered_text': triggered_text, 'uuid': watch.get('uuid') if watch else None, diff --git a/changedetectionio/tests/smtp/test_notification_smtp.py b/changedetectionio/tests/smtp/test_notification_smtp.py index 0cd7a7cb..b4619489 100644 --- a/changedetectionio/tests/smtp/test_notification_smtp.py +++ b/changedetectionio/tests/smtp/test_notification_smtp.py @@ -4,6 +4,7 @@ from email import message_from_string from email.policy import default as email_policy from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE +from changedetectionio.notification_service import NotificationContextData from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \ wait_for_all_checks, \ set_longer_modified_response, delete_all_watches @@ -14,6 +15,8 @@ import logging # NOTE - RELIES ON mailserver as hostname running, see github build recipes smtp_test_server = 'mailserver' +ALL_MARKUP_TOKENS = ''.join(f"TOKEN: '{t}'\n{{{{{t}}}}}\n" for t in NotificationContextData().keys()) + from changedetectionio.notification import ( default_notification_body, default_notification_format, @@ -170,7 +173,7 @@ def test_check_notification_html_color_format(client, live_server, measure_memor url_for("settings.settings_page"), data={"application-notification_urls": notification_url, "application-notification_title": "fallback-title " + default_notification_title, - "application-notification_body": "some text\n" + default_notification_body, #some text\n should get
+ "application-notification_body": f"some text\n{default_notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_format": 'HTML Color', "requests-time_between_check-minutes": 180, 'application-fetch_backend': "html_requests"}, @@ -225,6 +228,7 @@ def test_check_notification_html_color_format(client, live_server, measure_memor html_content = html_part.get_content() assert HTML_CHANGED_STYLE or HTML_REMOVED_STYLE in html_content assert HTML_ADDED_STYLE in html_content + assert '<' not in html_content assert 'some text
' in html_content delete_all_watches(client) @@ -432,7 +436,7 @@ def test_check_plaintext_document_plaintext_notification_smtp(client, live_serve url_for("settings.settings_page"), data={"application-notification_urls": notification_url, "application-notification_title": "fallback-title " + default_notification_title, - "application-notification_body": notification_body, + "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_format": 'Plain Text', "requests-time_between_check-minutes": 180, 'application-fetch_backend': "html_requests"}, @@ -465,6 +469,7 @@ def test_check_plaintext_document_plaintext_notification_smtp(client, live_serve assert 'talk about tags' in body assert '(added)' in body assert '<br' not in body + assert '<' not in body delete_all_watches(client) @@ -483,7 +488,7 @@ def test_check_plaintext_document_html_notifications(client, live_server, measur url_for("settings.settings_page"), data={"application-notification_urls": notification_url, "application-notification_title": "fallback-title " + default_notification_title, - "application-notification_body": notification_body, + "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_format": 'HTML', "requests-time_between_check-minutes": 180, 'application-fetch_backend': "html_requests"}, @@ -502,7 +507,7 @@ def test_check_plaintext_document_html_notifications(client, live_server, measur f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n") - time.sleep(1) + time.sleep(2) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) wait_for_all_checks(client) @@ -522,15 +527,94 @@ def test_check_plaintext_document_html_notifications(client, live_server, measur text_part = parts[0] assert text_part.get_content_type() == 'text/plain' text_content = text_part.get_content() - - - assert 'And let\'s talk about <title> tags\r\n' in text_content - - # Second part should be text/html html_part = parts[1] assert html_part.get_content_type() == 'text/html' html_content = html_part.get_content() + + + assert 'And let\'s talk about <title> tags\r\n' in text_content + assert '<br' not in text_content + assert '<span' not in text_content + + assert 'talk about <title>' not in html_content # the html part, should have got marked up to < etc + assert 'talk about <title>' in html_content + # Should be the HTML, but not HTML Color + assert 'background-color' not in html_content assert '<br>\r\n(added) And let's talk about <title> tags<br>' in html_content + assert '<br' not in html_content + + delete_all_watches(client) + + +def test_check_plaintext_document_html_color_notifications(client, live_server, measure_memory_usage): + """When following a plaintext document, notification in Plain Text format is sent correctly""" + + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write("Some nice plain text\nwhich we add some extra data\nover here\n") + + notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com' + notification_body = f"""{default_notification_body}""" + + ##################### + # Set this up for when we remove the notification from the watch, it should fallback with these details + res = client.post( + url_for("settings.settings_page"), + data={"application-notification_urls": notification_url, + "application-notification_title": "fallback-title " + default_notification_title, + "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", + "application-notification_format": 'HTML Color', + "requests-time_between_check-minutes": 180, + 'application-fetch_backend': "html_requests"}, + follow_redirects=True + ) + + assert b"Settings updated." in res.data + + # Add our URL to the import page + test_url = url_for('test_endpoint', content_type="text/plain", _external=True) + uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) + client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) + wait_for_all_checks(client) + + # Change the content + with open("test-datastore/endpoint-content.txt", "w") as f: + f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n") + + time.sleep(1) + client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) + wait_for_all_checks(client) + + # Parse the email properly using Python's email library + msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy) + + # The email should have two bodies (multipart/alternative) + assert msg.is_multipart() + assert msg.get_content_type() == 'multipart/alternative' + + # Get the parts + parts = list(msg.iter_parts()) + assert len(parts) == 2 + + # First part should be text/plain + text_part = parts[0] + assert text_part.get_content_type() == 'text/plain' + text_content = text_part.get_content() + html_part = parts[1] + assert html_part.get_content_type() == 'text/html' + html_content = html_part.get_content() + + + assert 'And let\'s talk about <title> tags\r\n' in text_content + assert '<br' not in text_content + assert '<span' not in text_content + + assert 'talk about <title>' not in html_content # the html part, should have got marked up to < etc + assert 'talk about <title>' in html_content + # Should be the HTML, but not HTML Color + assert 'background-color' in html_content + assert '(added) And let' not in html_content + assert '<br' not in html_content + assert '<br>' in html_content delete_all_watches(client)