Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
401c4cd092 Preserve whitespace's in HTML style notifications 2025-10-25 17:00:38 +02:00
35 changed files with 183 additions and 542 deletions

View File

@@ -21,7 +21,7 @@ jobs:
- name: Build a binary wheel and a source tarball - name: Build a binary wheel and a source tarball
run: python3 -m build run: python3 -m build
- name: Store the distribution packages - name: Store the distribution packages
uses: actions/upload-artifact@v5 uses: actions/upload-artifact@v4
with: with:
name: python-package-distributions name: python-package-distributions
path: dist/ path: dist/
@@ -34,7 +34,7 @@ jobs:
- build - build
steps: steps:
- name: Download all the dists - name: Download all the dists
uses: actions/download-artifact@v6 uses: actions/download-artifact@v5
with: with:
name: python-package-distributions name: python-package-distributions
path: dist/ path: dist/
@@ -93,7 +93,7 @@ jobs:
steps: steps:
- name: Download all the dists - name: Download all the dists
uses: actions/download-artifact@v6 uses: actions/download-artifact@v5
with: with:
name: python-package-distributions name: python-package-distributions
path: dist/ path: dist/

View File

@@ -282,7 +282,7 @@ jobs:
- name: Store everything including test-datastore - name: Store everything including test-datastore
if: always() if: always()
uses: actions/upload-artifact@v5 uses: actions/upload-artifact@v4
with: with:
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }} name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
path: . path: .

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki # Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.33' __version__ = '0.50.31'
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError from json.decoder import JSONDecodeError

View File

@@ -1,22 +1,9 @@
import os
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from flask_restful import abort, Resource from flask_restful import abort, Resource
from flask import request from flask import request
from functools import wraps import validators
from . import auth, validate_openapi_request from . import auth, validate_openapi_request
from ..validate_url import is_safe_valid_url
def default_content_type(content_type='text/plain'):
"""Decorator to set a default Content-Type header if none is provided."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
if not request.content_type:
# Set default content type in the request environment
request.environ['CONTENT_TYPE'] = content_type
return f(*args, **kwargs)
return wrapper
return decorator
class Import(Resource): class Import(Resource):
@@ -25,7 +12,6 @@ class Import(Resource):
self.datastore = kwargs['datastore'] self.datastore = kwargs['datastore']
@auth.check_token @auth.check_token
@default_content_type('text/plain') #3547 #3542
@validate_openapi_request('importWatches') @validate_openapi_request('importWatches')
def post(self): def post(self):
"""Import a list of watched URLs.""" """Import a list of watched URLs."""
@@ -49,13 +35,14 @@ class Import(Resource):
urls = request.get_data().decode('utf8').splitlines() urls = request.get_data().decode('utf8').splitlines()
added = [] added = []
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
for url in urls: for url in urls:
url = url.strip() url = url.strip()
if not len(url): if not len(url):
continue continue
# If hosts that only contain alphanumerics are allowed ("localhost" for example) # If hosts that only contain alphanumerics are allowed ("localhost" for example)
if not is_safe_valid_url(url): if not validators.url(url, simple_host=allow_simplehost):
return f"Invalid or unsupported URL - {url}", 400 return f"Invalid or unsupported URL - {url}", 400
if dedupe and self.datastore.url_exists(url): if dedupe and self.datastore.url_exists(url):

View File

@@ -1,12 +1,12 @@
import os import os
from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_safe_valid_url
from flask_expects_json import expects_json from flask_expects_json import expects_json
from changedetectionio import queuedWatchMetaData from changedetectionio import queuedWatchMetaData
from changedetectionio import worker_handler from changedetectionio import worker_handler
from flask_restful import abort, Resource from flask_restful import abort, Resource
from flask import request, make_response, send_from_directory from flask import request, make_response, send_from_directory
import validators
from . import auth from . import auth
import copy import copy
@@ -121,10 +121,6 @@ class Watch(Resource):
if validation_error: if validation_error:
return validation_error, 400 return validation_error, 400
# XSS etc protection
if request.json.get('url') and not is_safe_valid_url(request.json.get('url')):
return "Invalid URL", 400
watch.update(request.json) watch.update(request.json)
return "OK", 200 return "OK", 200
@@ -230,7 +226,9 @@ class CreateWatch(Resource):
json_data = request.get_json() json_data = request.get_json()
url = json_data['url'].strip() url = json_data['url'].strip()
if not is_safe_valid_url(url): # If hosts that only contain alphanumerics are allowed ("localhost" for example)
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
if not validators.url(url, simple_host=allow_simplehost):
return "Invalid or unsupported URL", 400 return "Invalid or unsupported URL", 400
if json_data.get('proxy'): if json_data.get('proxy'):

View File

@@ -240,7 +240,9 @@ nav
<p> <p>
{{ render_field(form.application.form.scheduler_timezone_default) }} {{ render_field(form.application.form.scheduler_timezone_default) }}
<datalist id="timezones" style="display: none;"> <datalist id="timezones" style="display: none;">
{%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%} {% for tz_name in available_timezones %}
<option value="{{ tz_name }}">{{ tz_name }}</option>
{% endfor %}
</datalist> </datalist>
</p> </p>
</div> </div>

View File

@@ -76,14 +76,14 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
elif (op == 'notification-default'): elif (op == 'notification-default'):
from changedetectionio.notification import ( from changedetectionio.notification import (
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH default_notification_format_for_watch
) )
for uuid in uuids: for uuid in uuids:
if datastore.data['watching'].get(uuid): if datastore.data['watching'].get(uuid):
datastore.data['watching'][uuid]['notification_title'] = None datastore.data['watching'][uuid]['notification_title'] = None
datastore.data['watching'][uuid]['notification_body'] = None datastore.data['watching'][uuid]['notification_body'] = None
datastore.data['watching'][uuid]['notification_urls'] = [] datastore.data['watching'][uuid]['notification_urls'] = []
datastore.data['watching'][uuid]['notification_format'] = USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH datastore.data['watching'][uuid]['notification_format'] = default_notification_format_for_watch
if emit_flash: if emit_flash:
flash(f"{len(uuids)} watches set to use default notification settings") flash(f"{len(uuids)} watches set to use default notification settings")

View File

@@ -75,6 +75,7 @@ class Fetcher():
self.screenshot = None self.screenshot = None
self.xpath_data = None self.xpath_data = None
# Keep headers and status_code as they're small # Keep headers and status_code as they're small
logger.trace("Fetcher content cleared from memory")
@abstractmethod @abstractmethod
def get_error(self): def get_error(self):

View File

@@ -133,11 +133,6 @@ def get_socketio_path():
# Socket.IO will be available at {prefix}/socket.io/ # Socket.IO will be available at {prefix}/socket.io/
return prefix return prefix
@app.template_global('is_safe_valid_url')
def _is_safe_valid_url(test_url):
from .validate_url import is_safe_valid_url
return is_safe_valid_url(test_url)
@app.template_filter('format_number_locale') @app.template_filter('format_number_locale')
def _jinja2_filter_format_number_locale(value: float) -> str: def _jinja2_filter_format_number_locale(value: float) -> str:
@@ -387,7 +382,7 @@ def changedetection_app(config=None, datastore_o=None):
# We would sometimes get login loop errors on sites hosted in sub-paths # We would sometimes get login loop errors on sites hosted in sub-paths
# note for the future: # note for the future:
# if not is_safe_valid_url(next): # if not is_safe_url(next):
# return flask.abort(400) # return flask.abort(400)
return redirect(url_for('watchlist.index')) return redirect(url_for('watchlist.index'))

View File

@@ -28,8 +28,11 @@ from wtforms.utils import unset_value
from wtforms.validators import ValidationError from wtforms.validators import ValidationError
from validators.url import url as url_validator
from changedetectionio.widgets import TernaryNoneBooleanField from changedetectionio.widgets import TernaryNoneBooleanField
# default # default
# each select <option data-enabled="enabled-0-0" # each select <option data-enabled="enabled-0-0"
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
@@ -538,10 +541,19 @@ class validateURL(object):
def validate_url(test_url): def validate_url(test_url):
from changedetectionio.validate_url import is_safe_valid_url # If hosts that only contain alphanumerics are allowed ("localhost" for example)
if not is_safe_valid_url(test_url): try:
url_validator(test_url, simple_host=allow_simplehost)
except validators.ValidationError:
#@todo check for xss
message = f"'{test_url}' is not a valid URL."
# This should be wtforms.validators. # This should be wtforms.validators.
raise ValidationError('Watch protocol is not permitted or invalid URL format') raise ValidationError(message)
from .model.Watch import is_safe_url
if not is_safe_url(test_url):
# This should be wtforms.validators.
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
class ValidateSinglePythonRegexString(object): class ValidateSinglePythonRegexString(object):
@@ -729,6 +741,7 @@ class quickWatchForm(Form):
edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"}) edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"})
# Common to a single watch and the global settings # Common to a single watch and the global settings
class commonSettingsForm(Form): class commonSettingsForm(Form):
from . import processors from . import processors
@@ -741,7 +754,7 @@ class commonSettingsForm(Form):
fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()]) fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()]) notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
notification_format = SelectField('Notification format', choices=list(valid_notification_formats.items())) notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()]) notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()]) notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff") processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")

View File

@@ -1,5 +1,3 @@
from functools import lru_cache
from loguru import logger from loguru import logger
from typing import List from typing import List
import html import html
@@ -15,6 +13,7 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I) META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I) META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
# 'price' , 'lowPrice', 'highPrice' are usually under here # 'price' , 'lowPrice', 'highPrice' are usually under here
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here # All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"] LD_JSON_PRODUCT_OFFER_SELECTORS = ["json:$..offers", "json:$..Offers"]
@@ -23,9 +22,9 @@ class JSONNotFound(ValueError):
def __init__(self, msg): def __init__(self, msg):
ValueError.__init__(self, msg) ValueError.__init__(self, msg)
# Doesn't look like python supports forward slash auto enclosure in re.findall # Doesn't look like python supports forward slash auto enclosure in re.findall
# So convert it to inline flag "(?i)foobar" type configuration # So convert it to inline flag "(?i)foobar" type configuration
@lru_cache(maxsize=100)
def perl_style_slash_enclosed_regex_to_options(regex): def perl_style_slash_enclosed_regex_to_options(regex):
res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE) res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE)

View File

@@ -1,5 +1,4 @@
from blinker import signal from blinker import signal
from changedetectionio.validate_url import is_safe_valid_url
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from changedetectionio.jinja2_custom import render as jinja_render from changedetectionio.jinja2_custom import render as jinja_render
@@ -13,12 +12,32 @@ from .. import jinja2_custom as safe_jinja
from ..diff import ADDED_PLACEMARKER_OPEN from ..diff import ADDED_PLACEMARKER_OPEN
from ..html_tools import TRANSLATE_WHITESPACE_TABLE from ..html_tools import TRANSLATE_WHITESPACE_TABLE
# Allowable protocols, protects against javascript: etc
# file:// is further checked by ALLOW_FILE_URI
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
FAVICON_RESAVE_THRESHOLD_SECONDS=86400 FAVICON_RESAVE_THRESHOLD_SECONDS=86400
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
def is_safe_url(test_url):
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile(re.escape('source:'), re.IGNORECASE)
test_url = r.sub('', test_url)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
if not pattern.match(test_url.strip()):
return False
return True
class model(watch_base): class model(watch_base):
__newest_history_key = None __newest_history_key = None
__history_n = 0 __history_n = 0
@@ -61,7 +80,7 @@ class model(watch_base):
def link(self): def link(self):
url = self.get('url', '') url = self.get('url', '')
if not is_safe_valid_url(url): if not is_safe_url(url):
return 'DISABLED' return 'DISABLED'
ready_url = url ready_url = url
@@ -82,7 +101,7 @@ class model(watch_base):
ready_url=ready_url.replace('source:', '') ready_url=ready_url.replace('source:', '')
# Also double check it after any Jinja2 formatting just incase # Also double check it after any Jinja2 formatting just incase
if not is_safe_valid_url(ready_url): if not is_safe_url(ready_url):
return 'DISABLED' return 'DISABLED'
return ready_url return ready_url

View File

@@ -2,7 +2,7 @@ import os
import uuid import uuid
from changedetectionio import strtobool from changedetectionio import strtobool
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH = 'System default' default_notification_format_for_watch = 'System default'
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL' CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
class watch_base(dict): class watch_base(dict):
@@ -44,7 +44,7 @@ class watch_base(dict):
'method': 'GET', 'method': 'GET',
'notification_alert_count': 0, 'notification_alert_count': 0,
'notification_body': None, 'notification_body': None,
'notification_format': USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, 'notification_format': default_notification_format_for_watch,
'notification_muted': False, 'notification_muted': False,
'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL 'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL
'notification_title': None, 'notification_title': None,

View File

@@ -1,16 +1,17 @@
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH from changedetectionio.model import default_notification_format_for_watch
default_notification_format = 'htmlcolor' default_notification_format = 'HTML Color'
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n' default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}' default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
# The values (markdown etc) are from apprise NotifyFormat, # The values (markdown etc) are from apprise NotifyFormat,
# But to avoid importing the whole heavy module just use the same strings here. # But to avoid importing the whole heavy module just use the same strings here.
valid_notification_formats = { valid_notification_formats = {
'text': 'Plain Text', 'Plain Text': 'text',
'html': 'HTML', 'HTML': 'html',
'htmlcolor': 'HTML Color', 'HTML Color': 'htmlcolor',
'markdown': 'Markdown to HTML', 'Markdown to HTML': 'markdown',
# Used only for editing a watch (not for global) # Used only for editing a watch (not for global)
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH default_notification_format_for_watch: default_notification_format_for_watch
} }

View File

@@ -1,42 +0,0 @@
def as_monospaced_html_email(content: str, title: str) -> str:
"""
Wraps `content` in a minimal, email-safe HTML template
that forces monospace rendering across Gmail, Hotmail, Apple Mail, etc.
Args:
content: The body text (plain text or HTML-like).
title: The title plaintext
Returns:
A complete HTML document string suitable for sending as an email body.
"""
# All line feed types should be removed and then this function should only be fed <br>'s
# Then it works with our <pre> styling without double linefeeds
content = content.translate(str.maketrans('', '', '\r\n'))
if title:
import html
title = html.escape(title)
else:
title = ''
# 2. Full email-safe HTML
html_email = f"""<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="x-apple-disable-message-reformatting">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<!--[if mso]>
<style>
body, div, pre, td {{ font-family: "Courier New", Courier, monospace !important; }}
</style>
<![endif]-->
<title>{title}</title>
</head>
<body style="-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%;">
<pre role="article" aria-roledescription="email" lang="en"
style="font-family: monospace, 'Courier New', Courier; font-size: 0.8em;
white-space: pre-wrap; word-break: break-word;">{content}</pre>
</body>
</html>"""
return html_email

View File

@@ -6,7 +6,6 @@ from loguru import logger
from urllib.parse import urlparse from urllib.parse import urlparse
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS
from .email_helpers import as_monospaced_html_email
from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \ from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \
ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \ ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \
CHANGED_PLACEMARKER_CLOSED, HTML_CHANGED_STYLE, HTML_CHANGED_INTO_STYLE CHANGED_PLACEMARKER_CLOSED, HTML_CHANGED_STYLE, HTML_CHANGED_INTO_STYLE
@@ -63,13 +62,13 @@ def notification_format_align_with_apprise(n_format : str):
:return: :return:
""" """
if n_format.startswith('html'): if n_format.lower().startswith('html'):
# Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here # Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here
n_format = NotifyFormat.HTML.value n_format = NotifyFormat.HTML.value
elif n_format.startswith('markdown'): elif n_format.lower().startswith('markdown'):
# probably the same but just to be safe # probably the same but just to be safe
n_format = NotifyFormat.MARKDOWN.value n_format = NotifyFormat.MARKDOWN.value
elif n_format.startswith('text'): elif n_format.lower().startswith('text'):
# probably the same but just to be safe # probably the same but just to be safe
n_format = NotifyFormat.TEXT.value n_format = NotifyFormat.TEXT.value
else: else:
@@ -77,55 +76,6 @@ def notification_format_align_with_apprise(n_format : str):
return n_format return n_format
def apply_discord_markdown_to_body(n_body):
"""
Discord does not support <del> but it supports non-standard ~~strikethrough~~
:param n_body:
:return:
"""
import re
# Define the mapping between your placeholders and markdown markers
replacements = [
(REMOVED_PLACEMARKER_OPEN, '~~', REMOVED_PLACEMARKER_CLOSED, '~~'),
(ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'),
(CHANGED_PLACEMARKER_OPEN, '~~', CHANGED_PLACEMARKER_CLOSED, '~~'),
(CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'),
]
# So that the markdown gets added without any whitespace following it which would break it
for open_tag, open_md, close_tag, close_md in replacements:
# Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag
pattern = re.compile(
re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag),
flags=re.DOTALL
)
n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body)
return n_body
def apply_standard_markdown_to_body(n_body):
"""
Apprise does not support ~~strikethrough~~ but it will convert <del> to HTML strikethrough.
:param n_body:
:return:
"""
import re
# Define the mapping between your placeholders and markdown markers
replacements = [
(REMOVED_PLACEMARKER_OPEN, '<del>', REMOVED_PLACEMARKER_CLOSED, '</del>'),
(ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'),
(CHANGED_PLACEMARKER_OPEN, '<del>', CHANGED_PLACEMARKER_CLOSED, '</del>'),
(CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'),
]
# So that the markdown gets added without any whitespace following it which would break it
for open_tag, open_md, close_tag, close_md in replacements:
# Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag
pattern = re.compile(
re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag),
flags=re.DOTALL
)
n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body)
return n_body
def apply_service_tweaks(url, n_body, n_title, requested_output_format): def apply_service_tweaks(url, n_body, n_title, requested_output_format):
@@ -156,7 +106,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
n_body = n_body.replace('<br>', '\n') n_body = n_body.replace('<br>', '\n')
n_body = n_body.replace('</br>', '\n') n_body = n_body.replace('</br>', '\n')
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n') n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n')
# Use strikethrough for removed content, bold for added content # Use strikethrough for removed content, bold for added content
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '<s>') n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '<s>')
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '</s>') n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '</s>')
@@ -190,7 +140,15 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
if requested_output_format == 'html': if requested_output_format == 'html':
# No diff placeholders, use Discord markdown for any other formatting # No diff placeholders, use Discord markdown for any other formatting
# Use Discord markdown: strikethrough for removed, bold for added # Use Discord markdown: strikethrough for removed, bold for added
n_body = apply_discord_markdown_to_body(n_body=n_body) n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '~~')
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '~~')
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '**')
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '**')
# Handle changed/replaced lines (old → new)
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, '~~')
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, '~~')
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, '**')
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, '**')
# Apply 2000 char limit for plain content # Apply 2000 char limit for plain content
payload_max_size = 1700 payload_max_size = 1700
@@ -222,9 +180,6 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ') n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ')
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'') n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'')
n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n') n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n')
elif requested_output_format == 'markdown':
# Markdown to HTML - Apprise will convert this to HTML
n_body = apply_standard_markdown_to_body(n_body=n_body)
else: #plaintext etc default else: #plaintext etc default
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ') n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ')
@@ -241,7 +196,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
def process_notification(n_object: NotificationContextData, datastore): def process_notification(n_object: NotificationContextData, datastore):
from changedetectionio.jinja2_custom import render as jinja_render from changedetectionio.jinja2_custom import render as jinja_render
from . import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, default_notification_format, valid_notification_formats from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
# be sure its registered # be sure its registered
from .apprise_plugin.custom_handlers import apprise_http_custom_handler from .apprise_plugin.custom_handlers import apprise_http_custom_handler
# Register custom Discord plugin # Register custom Discord plugin
@@ -257,17 +212,18 @@ def process_notification(n_object: NotificationContextData, datastore):
# Insert variables into the notification content # Insert variables into the notification content
notification_parameters = create_notification_parameters(n_object, datastore) notification_parameters = create_notification_parameters(n_object, datastore)
requested_output_format = n_object.get('notification_format', default_notification_format) requested_output_format = valid_notification_formats.get(
logger.debug(f"Requested notification output format: '{requested_output_format}'") n_object.get('notification_format', default_notification_format),
valid_notification_formats[default_notification_format],
)
# If we arrived with 'System default' then look it up # If we arrived with 'System default' then look it up
if requested_output_format == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: if requested_output_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
# Initially text or whatever # Initially text or whatever
requested_output_format = datastore.data['settings']['application'].get('notification_format', default_notification_format) requested_output_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower()
requested_output_format_original = requested_output_format requested_output_format_original = requested_output_format
# Now clean it up so it fits perfectly with apprise
requested_output_format = notification_format_align_with_apprise(n_format=requested_output_format) requested_output_format = notification_format_align_with_apprise(n_format=requested_output_format)
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s") logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
@@ -344,45 +300,24 @@ def process_notification(n_object: NotificationContextData, datastore):
apprise_input_format = NotifyFormat.TEXT.value apprise_input_format = NotifyFormat.TEXT.value
elif requested_output_format == NotifyFormat.MARKDOWN.value: elif requested_output_format == NotifyFormat.MARKDOWN.value:
# Convert markdown to HTML ourselves since not all plugins do this # This actually means we request "Markdown to HTML", we want HTML output
from apprise.conversion import markdown_to_html
# Make sure there are paragraph breaks around horizontal rules
n_body = n_body.replace('---', '\n\n---\n\n')
n_body = markdown_to_html(n_body)
url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}" url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}"
requested_output_format = NotifyFormat.HTML.value requested_output_format = NotifyFormat.HTML.value
apprise_input_format = NotifyFormat.HTML.value # Changed from MARKDOWN to HTML apprise_input_format = NotifyFormat.MARKDOWN.value
# Could have arrived at any stage, so we dont end up running .escape on it # Could have arrived at any stage, so we dont end up running .escape on it
if 'html' in requested_output_format: if 'html' in requested_output_format:
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n') n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\n')
else: else:
# texty types # Markup, text types etc
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n') n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
else:
# ?format was IN the apprise URL, they are kind of on their own here, we will try our best
if 'format=html' in url:
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n')
# This will also prevent apprise from doing conversion
apprise_input_format = NotifyFormat.HTML.value
requested_output_format = NotifyFormat.HTML.value
elif 'format=text' in url:
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
apprise_input_format = NotifyFormat.TEXT.value
requested_output_format = NotifyFormat.TEXT.value
sent_objs.append({'title': n_title, sent_objs.append({'title': n_title,
'body': n_body, 'body': n_body,
'url': url}) 'url': url})
apobj.add(url) apobj.add(url)
# Since the output is always based on the plaintext of the 'diff' engine, wrap it nicely.
# It should always be similar to the 'history' part of the UI.
if url.startswith('mail') and 'html' in requested_output_format:
if not '<pre' in n_body and not '<body' in n_body: # No custom HTML-ish body was setup already
n_body = as_monospaced_html_email(content=n_body, title=n_title)
apobj.notify( apobj.notify(
title=n_title, title=n_title,
body=n_body, body=n_body,

View File

@@ -9,8 +9,7 @@ for both sync and async workers
from loguru import logger from loguru import logger
import time import time
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH from changedetectionio.notification import default_notification_format
from changedetectionio.notification import default_notification_format, valid_notification_formats
# This gets modified on notification time (handler.py) depending on the required notification output # This gets modified on notification time (handler.py) depending on the required notification output
CUSTOM_LINEBREAK_PLACEHOLDER='@BR@' CUSTOM_LINEBREAK_PLACEHOLDER='@BR@'
@@ -49,28 +48,15 @@ class NotificationContextData(dict):
if kwargs: if kwargs:
self.update(kwargs) self.update(kwargs)
n_format = self.get('notification_format')
if n_format and not valid_notification_formats.get(n_format):
raise ValueError(f'Invalid notification format: "{n_format}"')
def set_random_for_validation(self): def set_random_for_validation(self):
import random, string import random, string
"""Randomly fills all dict keys with random strings (for validation/testing). """Randomly fills all dict keys with random strings (for validation/testing)."""
So we can test the output in the notification body
"""
for key in self.keys(): for key in self.keys():
if key in ['uuid', 'time', 'watch_uuid']: if key in ['uuid', 'time', 'watch_uuid']:
continue continue
rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12)) rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12))
self[key] = rand_str self[key] = rand_str
def __setitem__(self, key, value):
if key == 'notification_format' and isinstance(value, str) and not value.startswith('RANDOM-PLACEHOLDER-'):
if not valid_notification_formats.get(value):
raise ValueError(f'Invalid notification format: "{value}"')
super().__setitem__(key, value)
class NotificationService: class NotificationService:
""" """
Standalone notification service that handles all notification functionality Standalone notification service that handles all notification functionality
@@ -86,7 +72,7 @@ class NotificationService:
Queue a notification for a watch with full diff rendering and template variables Queue a notification for a watch with full diff rendering and template variables
""" """
from changedetectionio import diff from changedetectionio import diff
from changedetectionio.notification import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH from changedetectionio.notification import default_notification_format_for_watch
if not isinstance(n_object, NotificationContextData): if not isinstance(n_object, NotificationContextData):
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}") raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
@@ -108,7 +94,7 @@ class NotificationService:
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once." snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
# If we ended up here with "System default" # If we ended up here with "System default"
if n_object.get('notification_format') == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: if n_object.get('notification_format') == default_notification_format_for_watch:
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format') n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
@@ -155,7 +141,7 @@ class NotificationService:
Individual watch settings > Tag settings > Global settings Individual watch settings > Tag settings > Global settings
""" """
from changedetectionio.notification import ( from changedetectionio.notification import (
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, default_notification_format_for_watch,
default_notification_body, default_notification_body,
default_notification_title default_notification_title
) )
@@ -163,7 +149,7 @@ class NotificationService:
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc # Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name) v = watch.get(var_name)
if v and not watch.get('notification_muted'): if v and not watch.get('notification_muted'):
if var_name == 'notification_format' and v == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: if var_name == 'notification_format' and v == default_notification_format_for_watch:
return self.datastore.data['settings']['application'].get('notification_format') return self.datastore.data['settings']['application'].get('notification_format')
return v return v
@@ -180,7 +166,7 @@ class NotificationService:
# Otherwise could be defaults # Otherwise could be defaults
if var_name == 'notification_format': if var_name == 'notification_format':
return USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH return default_notification_format_for_watch
if var_name == 'notification_body': if var_name == 'notification_body':
return default_notification_body return default_notification_body
if var_name == 'notification_title': if var_name == 'notification_title':
@@ -235,6 +221,7 @@ class NotificationService:
if not watch: if not watch:
return return
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format)
filter_list = ", ".join(watch['include_filters']) filter_list = ", ".join(watch['include_filters'])
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed
body = f"""Hello, body = f"""Hello,
@@ -251,9 +238,9 @@ Thanks - Your omniscient changedetection.io installation.
n_object = NotificationContextData({ n_object = NotificationContextData({
'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page', 'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
'notification_body': body, 'notification_body': body,
'notification_format': self._check_cascading_vars('notification_format', watch), 'notification_format': n_format,
'markup_text_links_to_html_links': n_format.lower().startswith('html')
}) })
n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html')
if len(watch['notification_urls']): if len(watch['notification_urls']):
n_object['notification_urls'] = watch['notification_urls'] n_object['notification_urls'] = watch['notification_urls']
@@ -281,7 +268,7 @@ Thanks - Your omniscient changedetection.io installation.
if not watch: if not watch:
return return
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts') threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format).lower()
step = step_n + 1 step = step_n + 1
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed # @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed
@@ -300,9 +287,9 @@ Thanks - Your omniscient changedetection.io installation.
n_object = NotificationContextData({ n_object = NotificationContextData({
'notification_title': f"Changedetection.io - Alert - Browser step at position {step} could not be run", 'notification_title': f"Changedetection.io - Alert - Browser step at position {step} could not be run",
'notification_body': body, 'notification_body': body,
'notification_format': self._check_cascading_vars('notification_format', watch), 'notification_format': n_format,
'markup_text_links_to_html_links': n_format.lower().startswith('html')
}) })
n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html')
if len(watch['notification_urls']): if len(watch['notification_urls']):
n_object['notification_urls'] = watch['notification_urls'] n_object['notification_urls'] = watch['notification_urls']

View File

@@ -1,13 +1,11 @@
from changedetectionio.strtobool import strtobool from changedetectionio.strtobool import strtobool
from changedetectionio.validate_url import is_safe_valid_url
from flask import ( from flask import (
flash flash
) )
from .html_tools import TRANSLATE_WHITESPACE_TABLE from .html_tools import TRANSLATE_WHITESPACE_TABLE
from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH from . model import App, Watch
from copy import deepcopy, copy from copy import deepcopy, copy
from os import path, unlink from os import path, unlink
from threading import Lock from threading import Lock
@@ -342,10 +340,9 @@ class ChangeDetectionStore:
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
flash("Error fetching metadata for {}".format(url), 'error') flash("Error fetching metadata for {}".format(url), 'error')
return False return False
from .model.Watch import is_safe_url
if not is_safe_valid_url(url): if not is_safe_url(url):
flash('Watch protocol is not permitted or invalid URL format', 'error') flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
return None return None
if tag and type(tag) == str: if tag and type(tag) == str:
@@ -990,35 +987,10 @@ class ChangeDetectionStore:
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title') self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
def update_21(self): def update_21(self):
if self.data['settings']['application'].get('timezone'): self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone') del self.data['settings']['application']['timezone']
del self.data['settings']['application']['timezone']
# Some notification formats got the wrong name type
def update_22(self):
from .notification import valid_notification_formats
sys_n_format = self.data['settings']['application'].get('notification_format')
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == sys_n_format), None)
if key_exists_as_value: # key of "Plain text"
logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['notification_format'] = key_exists_as_value
for uuid, watch in self.data['watching'].items():
n_format = self.data['watching'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
for uuid, tag in self.data['settings']['application']['tags'].items():
n_format = self.data['settings']['application']['tags'][uuid].get('notification_format')
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None)
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
logger.success(f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
self.data['settings']['application']['tags'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
def add_notification_url(self, notification_url): def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'") logger.debug(f">>> Adding new notification_url - '{notification_url}'")

View File

@@ -266,7 +266,9 @@
<li id="timezone-info"> <li id="timezone-info">
{{ render_field(form.time_schedule_limit.timezone, placeholder=timezone_default_config) }} <span id="local-time-in-tz"></span> {{ render_field(form.time_schedule_limit.timezone, placeholder=timezone_default_config) }} <span id="local-time-in-tz"></span>
<datalist id="timezones" style="display: none;"> <datalist id="timezones" style="display: none;">
{%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%} {% for timezone in available_timezones %}
<option value="{{ timezone }}">{{ timezone }}</option>
{% endfor %}
</datalist> </datalist>
</li> </li>
</ul> </ul>

View File

@@ -53,7 +53,7 @@
<a class="pure-menu-heading" href="{{url_for('watchlist.index')}}"> <a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
<strong>Change</strong>Detection.io</a> <strong>Change</strong>Detection.io</a>
{% endif %} {% endif %}
{% if current_diff_url and is_safe_valid_url(current_diff_url) %} {% if current_diff_url %}
<a class="current-diff-url" href="{{ current_diff_url }}"> <a class="current-diff-url" href="{{ current_diff_url }}">
<span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a> <span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
{% else %} {% else %}

View File

@@ -4,7 +4,7 @@ from email import message_from_string
from email.policy import default as email_policy from email.policy import default as email_policy
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE
from changedetectionio.notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER from changedetectionio.notification_service import NotificationContextData
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \ from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \ wait_for_all_checks, \
set_longer_modified_response, delete_all_watches set_longer_modified_response, delete_all_watches
@@ -53,7 +53,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body, "application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
"application-notification_format": 'html', "application-notification_format": 'HTML',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -99,7 +99,6 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
text_content = text_part.get_content() text_content = text_part.get_content()
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
assert 'fallback-body\r\n' in text_content # The plaintext part assert 'fallback-body\r\n' in text_content # The plaintext part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
# Second part should be text/html # Second part should be text/html
html_part = parts[1] html_part = parts[1]
@@ -108,7 +107,6 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
assert 'some text<br>' in html_content # We converted \n from the notification body assert 'some text<br>' in html_content # We converted \n from the notification body
assert 'fallback-body<br>' in html_content # kept the original <br> assert 'fallback-body<br>' in html_content # kept the original <br>
assert '(added) So let\'s see what happens.<br>' in html_content # the html part assert '(added) So let\'s see what happens.<br>' in html_content # the html part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
delete_all_watches(client) delete_all_watches(client)
@@ -124,7 +122,7 @@ def test_check_notification_plaintext_format(client, live_server, measure_memory
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "some text\n" + default_notification_body, "application-notification_body": "some text\n" + default_notification_body,
"application-notification_format": 'text', "application-notification_format": 'Plain Text',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -176,7 +174,7 @@ def test_check_notification_html_color_format(client, live_server, measure_memor
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": f"some text\n{default_notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_body": f"some text\n{default_notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
"application-notification_format": 'htmlcolor', "application-notification_format": 'HTML Color',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -247,7 +245,7 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "*header*\n\nsome text\n" + default_notification_body, "application-notification_body": "*header*\n\nsome text\n" + default_notification_body,
"application-notification_format": 'markdown', "application-notification_format": 'Markdown to HTML',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -292,8 +290,7 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
text_part = parts[0] text_part = parts[0]
assert text_part.get_content_type() == 'text/plain' assert text_part.get_content_type() == 'text/plain'
text_content = text_part.get_content() text_content = text_part.get_content()
# We wont see anything in the "FALLBACK" text but that's OK (no added/strikethrough etc) assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
assert 'So let\'s see what happens.\r\n' in text_content # The plaintext part
# Second part should be text/html and roughly converted from markdown to HTML # Second part should be text/html and roughly converted from markdown to HTML
@@ -301,7 +298,7 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
assert html_part.get_content_type() == 'text/html' assert html_part.get_content_type() == 'text/html'
html_content = html_part.get_content() html_content = html_part.get_content()
assert '<p><em>header</em></p>' in html_content assert '<p><em>header</em></p>' in html_content
assert '<strong>So let\'s see what happens.</strong><br>' in html_content # Additions are <strong> in markdown assert '(added) So let\'s see what happens.<br' in html_content
delete_all_watches(client) delete_all_watches(client)
# Custom notification body with HTML, that is either sent as HTML or rendered to plaintext and sent # Custom notification body with HTML, that is either sent as HTML or rendered to plaintext and sent
@@ -331,7 +328,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": notification_body, "application-notification_body": notification_body,
"application-notification_format": 'text', "application-notification_format": 'Plain Text',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -381,7 +378,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
url_for("ui.ui_edit.edit_page", uuid="first"), url_for("ui.ui_edit.edit_page", uuid="first"),
data={ data={
"url": test_url, "url": test_url,
"notification_format": 'html', "notification_format": 'HTML',
'fetch_backend': "html_requests", 'fetch_backend': "html_requests",
"time_between_check_use_default": "y"}, "time_between_check_use_default": "y"},
follow_redirects=True follow_redirects=True
@@ -440,7 +437,7 @@ def test_check_plaintext_document_plaintext_notification_smtp(client, live_serve
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
"application-notification_format": 'text', "application-notification_format": 'Plain Text',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -473,7 +470,7 @@ def test_check_plaintext_document_plaintext_notification_smtp(client, live_serve
assert '(added)' in body assert '(added)' in body
assert '<br' not in body assert '<br' not in body
assert '&lt;' not in body assert '&lt;' not in body
assert '<pre' not in body
delete_all_watches(client) delete_all_watches(client)
def test_check_plaintext_document_html_notifications(client, live_server, measure_memory_usage): def test_check_plaintext_document_html_notifications(client, live_server, measure_memory_usage):
@@ -492,7 +489,7 @@ def test_check_plaintext_document_html_notifications(client, live_server, measur
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
"application-notification_format": 'html', "application-notification_format": 'HTML',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -544,13 +541,13 @@ def test_check_plaintext_document_html_notifications(client, live_server, measur
assert 'talk about &lt;title&gt;' in html_content assert 'talk about &lt;title&gt;' in html_content
# Should be the HTML, but not HTML Color # Should be the HTML, but not HTML Color
assert 'background-color' not in html_content assert 'background-color' not in html_content
assert '<br>(added) And let&#39;s talk about &lt;title&gt; tags<br>' in html_content assert '<br>\r\n(added) And let&#39;s talk about &lt;title&gt; tags<br>' in html_content
assert '&lt;br' not in html_content assert '&lt;br' not in html_content
assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py
# And now for the whitespace retention # And now for the whitespace retention
assert '&nbsp;&nbsp;&nbsp;&nbsp;Some nice plain text' in html_content assert '&nbsp;&nbsp;&nbsp;&nbsp;Some nice plain text' in html_content
assert '(added) And let' in html_content # just to show a single whitespace didnt get touched assert '(added) And let' in html_content # just to show a single whitespace didnt get touched
delete_all_watches(client) delete_all_watches(client)
@@ -570,7 +567,7 @@ def test_check_plaintext_document_html_color_notifications(client, live_server,
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
"application-notification_format": 'htmlcolor', "application-notification_format": 'HTML Color',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -623,7 +620,7 @@ def test_check_plaintext_document_html_color_notifications(client, live_server,
assert '(added) And let' not in html_content assert '(added) And let' not in html_content
assert '&lt;br' not in html_content assert '&lt;br' not in html_content
assert '<br>' in html_content assert '<br>' in html_content
assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py
delete_all_watches(client) delete_all_watches(client)
def test_check_html_document_plaintext_notification(client, live_server, measure_memory_usage): def test_check_html_document_plaintext_notification(client, live_server, measure_memory_usage):
@@ -642,7 +639,7 @@ def test_check_html_document_plaintext_notification(client, live_server, measure
data={"application-notification_urls": notification_url, data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title, "application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}", "application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
"application-notification_format": 'text', "application-notification_format": 'Plain Text',
"requests-time_between_check-minutes": 180, "requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"}, 'application-fetch_backend': "html_requests"},
follow_redirects=True follow_redirects=True
@@ -682,73 +679,3 @@ def test_check_html_document_plaintext_notification(client, live_server, measure
delete_all_watches(client) delete_all_watches(client)
def test_check_html_notification_with_apprise_format_is_html(client, live_server, measure_memory_usage):
## live_server_setup(live_server) # Setup on conftest per function
set_original_response()
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com&format=html'
#####################
# Set this up for when we remove the notification from the watch, it should fallback with these details
res = client.post(
url_for("settings.settings_page"),
data={"application-notification_urls": notification_url,
"application-notification_title": "fallback-title " + default_notification_title,
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
"application-notification_format": 'html',
"requests-time_between_check-minutes": 180,
'application-fetch_backend': "html_requests"},
follow_redirects=True
)
assert b"Settings updated." in res.data
# Add a watch and trigger a HTTP POST
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": 'nice one'},
follow_redirects=True
)
assert b"Watch added" in res.data
wait_for_all_checks(client)
set_longer_modified_response()
time.sleep(2)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(3)
msg_raw = get_last_message_from_smtp_server()
assert len(msg_raw) >= 1
# Parse the email properly using Python's email library
msg = message_from_string(msg_raw, policy=email_policy)
# The email should have two bodies (multipart/alternative with text/plain and text/html)
assert msg.is_multipart()
assert msg.get_content_type() == 'multipart/alternative'
# Get the parts
parts = list(msg.iter_parts())
assert len(parts) == 2
# First part should be text/plain (the auto-generated plaintext version)
text_part = parts[0]
assert text_part.get_content_type() == 'text/plain'
text_content = text_part.get_content()
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
assert 'fallback-body\r\n' in text_content # The plaintext part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in text_content
# Second part should be text/html
html_part = parts[1]
assert html_part.get_content_type() == 'text/html'
html_content = html_part.get_content()
assert 'some text<br>' in html_content # We converted \n from the notification body
assert 'fallback-body<br>' in html_content # kept the original <br>
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
assert CUSTOM_LINEBREAK_PLACEHOLDER not in html_content
delete_all_watches(client)

View File

@@ -124,7 +124,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
"application-notification_body": 'triggered text was -{{triggered_text}}- ### 网站监测 内容更新了 ####', "application-notification_body": 'triggered text was -{{triggered_text}}- ### 网站监测 内容更新了 ####',
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation # https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
"application-notification_urls": test_notification_url, "application-notification_urls": test_notification_url,
"application-notification_format": 'text', "application-notification_format": 'Plain Text',
"application-minutes_between_check": 180, "application-minutes_between_check": 180,
"application-fetch_backend": "html_requests" "application-fetch_backend": "html_requests"
}, },

View File

@@ -370,7 +370,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
###################################################### ######################################################
# HTTP PUT try a field that doesn't exist # HTTP PUT try a field that doenst exist
# HTTP PUT an update # HTTP PUT an update
res = client.put( res = client.put(
@@ -383,17 +383,6 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
# Message will come from `flask_expects_json` # Message will come from `flask_expects_json`
assert b'Additional properties are not allowed' in res.data assert b'Additional properties are not allowed' in res.data
# Try a XSS URL
res = client.put(
url_for("watch", uuid=watch_uuid),
headers={'x-api-key': api_key, 'content-type': 'application/json'},
data=json.dumps({
'url': 'javascript:alert(document.domain)'
}),
)
assert res.status_code == 400
# Cleanup everything # Cleanup everything
delete_all_watches(client) delete_all_watches(client)
@@ -405,8 +394,7 @@ def test_api_import(client, live_server, measure_memory_usage):
res = client.post( res = client.post(
url_for("import") + "?tag=import-test", url_for("import") + "?tag=import-test",
data='https://website1.com\r\nhttps://website2.com', data='https://website1.com\r\nhttps://website2.com',
# We removed 'content-type': 'text/plain', the Import API should assume this if none is set #3547 #3542 headers={'x-api-key': api_key, 'content-type': 'text/plain'},
headers={'x-api-key': api_key},
follow_redirects=True follow_redirects=True
) )

View File

@@ -86,7 +86,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
"Diff Full: {{diff_full}}\n" "Diff Full: {{diff_full}}\n"
"Diff as Patch: {{diff_patch}}\n" "Diff as Patch: {{diff_patch}}\n"
":-)", ":-)",
"notification_format": 'text'} "notification_format": 'Plain Text'}
notification_form_data.update({ notification_form_data.update({
"url": test_url, "url": test_url,

View File

@@ -63,7 +63,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
"Diff Full: {{diff_full}}\n" "Diff Full: {{diff_full}}\n"
"Diff as Patch: {{diff_patch}}\n" "Diff as Patch: {{diff_patch}}\n"
":-)", ":-)",
"notification_format": 'text', "notification_format": 'Plain Text',
"fetch_backend": "html_requests", "fetch_backend": "html_requests",
"filter_failure_notification_send": 'y', "filter_failure_notification_send": 'y',
"time_between_check_use_default": "y", "time_between_check_use_default": "y",
@@ -175,13 +175,13 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage): def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage):
# # live_server_setup(live_server) # Setup on conftest per function # # live_server_setup(live_server) # Setup on conftest per function
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor')) run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('HTML Color'))
# Check markup send conversion didnt affect plaintext preference # Check markup send conversion didnt affect plaintext preference
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text')) run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Plain Text'))
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage): def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage):
# # live_server_setup(live_server) # Setup on conftest per function # # live_server_setup(live_server) # Setup on conftest per function
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor')) run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('HTML Color'))
# Test that notification is never sent # Test that notification is never sent

View File

@@ -195,7 +195,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage):
"Diff as Patch: {{diff_patch}}\n" "Diff as Patch: {{diff_patch}}\n"
":-)", ":-)",
"notification_screenshot": True, "notification_screenshot": True,
"notification_format": 'text', "notification_format": 'Plain Text',
"title": "test-tag"} "title": "test-tag"}
res = client.post( res = client.post(

View File

@@ -64,21 +64,29 @@ def test_jinja2_time_offset_in_url_query(client, live_server, measure_memory_usa
# Should not have template error # Should not have template error
assert b'Invalid template' not in res.data assert b'Invalid template' not in res.data
# https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456 # https://techtonics.medium.com/secure-templating-with-jinja2-understanding-ssti-and-jinja2-sandbox-environment-b956edd60456
def test_jinja2_security_url_query(client, live_server, measure_memory_usage): def test_jinja2_security_url_query(client, live_server, measure_memory_usage):
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_return_query', _external=True) test_url = url_for('test_return_query', _external=True)
full_url = test_url + "?date={{ ''.__class__.__mro__[1].__subclasses__()}}" # because url_for() will URL-encode the var, but we dont here
full_url = "{}?{}".format(test_url,
"date={{ ''.__class__.__mro__[1].__subclasses__()}}", )
res = client.post( res = client.post(
url_for("ui.ui_views.form_quick_watch_add"), url_for("ui.ui_views.form_quick_watch_add"),
data={"url": full_url, "tags": "test"}, data={"url": full_url, "tags": "test"},
follow_redirects=True follow_redirects=True
) )
assert b"Watch added" not in res.data assert b"Watch added" in res.data
wait_for_all_checks(client)
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))
assert b'is invalid and cannot be used' in res.data
# Some of the spewed output from the subclasses
assert b'dict_values' not in res.data
def test_timezone(mocker): def test_timezone(mocker):
"""Verify that timezone is parsed.""" """Verify that timezone is parsed."""

View File

@@ -13,10 +13,10 @@ import base64
from changedetectionio.notification import ( from changedetectionio.notification import (
default_notification_body, default_notification_body,
default_notification_format, default_notification_format,
default_notification_title, valid_notification_formats default_notification_title,
valid_notification_formats,
) )
from ..diff import HTML_CHANGED_STYLE from ..diff import HTML_CHANGED_STYLE
from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
# Hard to just add more live server URLs when one test is already running (I think) # Hard to just add more live server URLs when one test is already running (I think)
@@ -47,14 +47,6 @@ def test_check_notification(client, live_server, measure_memory_usage):
assert b"Settings updated." in res.data assert b"Settings updated." in res.data
res = client.get(url_for("settings.settings_page"))
for k,v in valid_notification_formats.items():
if k == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
continue
assert f'value="{k}"'.encode() in res.data # Should be by key NOT value
assert f'value="{v}"'.encode() not in res.data # Should be by key NOT value
# When test mode is in BASE_URL env mode, we should see this already configured # When test mode is in BASE_URL env mode, we should see this already configured
env_base_url = os.getenv('BASE_URL', '').strip() env_base_url = os.getenv('BASE_URL', '').strip()
if len(env_base_url): if len(env_base_url):
@@ -109,7 +101,7 @@ def test_check_notification(client, live_server, measure_memory_usage):
"Diff as Patch: {{diff_patch}}\n" "Diff as Patch: {{diff_patch}}\n"
":-)", ":-)",
"notification_screenshot": True, "notification_screenshot": True,
"notification_format": 'text'} "notification_format": 'Plain Text'}
notification_form_data.update({ notification_form_data.update({
"url": test_url, "url": test_url,
@@ -275,7 +267,7 @@ def test_notification_validation(client, live_server, measure_memory_usage):
# data={"notification_urls": 'json://localhost/foobar', # data={"notification_urls": 'json://localhost/foobar',
# "notification_title": "", # "notification_title": "",
# "notification_body": "", # "notification_body": "",
# "notification_format": 'text', # "notification_format": 'Plain Text',
# "url": test_url, # "url": test_url,
# "tag": "my tag", # "tag": "my tag",
# "title": "my title", # "title": "my title",
@@ -529,7 +521,7 @@ def _test_color_notifications(client, notification_body_token):
"application-fetch_backend": "html_requests", "application-fetch_backend": "html_requests",
"application-minutes_between_check": 180, "application-minutes_between_check": 180,
"application-notification_body": notification_body_token, "application-notification_body": notification_body_token,
"application-notification_format": "htmlcolor", "application-notification_format": "HTML Color",
"application-notification_urls": test_notification_url, "application-notification_urls": test_notification_url,
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}", "application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
}, },

View File

@@ -30,7 +30,7 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
data={"notification_urls": f"{broken_notification_url}\r\n{working_notification_url}", data={"notification_urls": f"{broken_notification_url}\r\n{working_notification_url}",
"notification_title": "xxx", "notification_title": "xxx",
"notification_body": "xxxxx", "notification_body": "xxxxx",
"notification_format": 'text', "notification_format": 'Plain Text',
"url": test_url, "url": test_url,
"tags": "", "tags": "",
"title": "", "title": "",

View File

@@ -1,8 +1,6 @@
import os import os
from flask import url_for from flask import url_for
from changedetectionio.tests.util import set_modified_response
from .util import live_server_setup, wait_for_all_checks, delete_all_watches from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .. import strtobool from .. import strtobool
@@ -25,7 +23,7 @@ def set_original_response():
return None return None
def test_bad_access(client, live_server, measure_memory_usage): def test_bad_access(client, live_server, measure_memory_usage):
res = client.post( res = client.post(
url_for("imports.import_page"), url_for("imports.import_page"),
data={"urls": 'https://localhost'}, data={"urls": 'https://localhost'},
@@ -48,7 +46,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True follow_redirects=True
) )
assert b'Watch protocol is not permitted or invalid URL format' in res.data assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post( res = client.post(
url_for("ui.ui_views.form_quick_watch_add"), url_for("ui.ui_views.form_quick_watch_add"),
@@ -56,7 +54,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True follow_redirects=True
) )
assert b'Watch protocol is not permitted or invalid URL format' in res.data assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post( res = client.post(
url_for("ui.ui_views.form_quick_watch_add"), url_for("ui.ui_views.form_quick_watch_add"),
@@ -64,7 +62,7 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True follow_redirects=True
) )
assert b'Watch protocol is not permitted or invalid URL format' in res.data assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post( res = client.post(
@@ -73,15 +71,8 @@ def test_bad_access(client, live_server, measure_memory_usage):
follow_redirects=True follow_redirects=True
) )
assert b'Watch protocol is not permitted or invalid URL format' in res.data assert b'Watch protocol is not permitted by SAFE_PROTOCOL_REGEX' in res.data
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": 'https://i-wanna-xss-you.com?hereis=<script>alert(1)</script>', "tags": ''},
follow_redirects=True
)
assert b'Watch protocol is not permitted or invalid URL format' in res.data
def _runner_test_various_file_slash(client, file_uri): def _runner_test_various_file_slash(client, file_uri):
@@ -118,8 +109,8 @@ def test_file_slash_access(client, live_server, measure_memory_usage):
test_file_path = os.path.abspath(__file__) test_file_path = os.path.abspath(__file__)
_runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}") _runner_test_various_file_slash(client, file_uri=f"file://{test_file_path}")
# _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}") _runner_test_various_file_slash(client, file_uri=f"file:/{test_file_path}")
# _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509 _runner_test_various_file_slash(client, file_uri=f"file:{test_file_path}") # CVE-2024-56509
def test_xss(client, live_server, measure_memory_usage): def test_xss(client, live_server, measure_memory_usage):
@@ -141,26 +132,6 @@ def test_xss(client, live_server, measure_memory_usage):
assert b"<img src=x onerror=alert(" not in res.data assert b"<img src=x onerror=alert(" not in res.data
assert b"&lt;img" in res.data assert b"&lt;img" in res.data
# Check that even forcing an update directly still doesnt get to the frontend
set_original_response()
XSS_HACK = 'javascript:alert(document.domain)'
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_modified_response()
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
live_server.app.config['DATASTORE'].data['watching'][uuid]['url']=XSS_HACK
res = client.get(url_for("ui.ui_views.preview_page", uuid=uuid))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
res = client.get(url_for("watchlist.index"))
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
def test_xss_watch_last_error(client, live_server, measure_memory_usage): def test_xss_watch_last_error(client, live_server, measure_memory_usage):
set_original_response() set_original_response()

View File

@@ -1,5 +1,3 @@
from functools import lru_cache
import arrow import arrow
from enum import IntEnum from enum import IntEnum
@@ -14,7 +12,7 @@ class Weekday(IntEnum):
Saturday = 5 Saturday = 5
Sunday = 6 Sunday = 6
@lru_cache(maxsize=100)
def am_i_inside_time( def am_i_inside_time(
day_of_week: str, day_of_week: str,
time_str: str, time_str: str,

View File

@@ -1,109 +0,0 @@
from functools import lru_cache
from loguru import logger
from urllib.parse import urlparse, urlunparse, parse_qsl, urlencode
def normalize_url_encoding(url):
"""
Safely encode a URL's query parameters, regardless of whether they're already encoded.
Why this is necessary:
URLs can arrive in various states - some with already encoded query parameters (%20 for spaces),
some with unencoded parameters (literal spaces), or a mix of both. The validators.url() function
requires proper encoding, but simply encoding an already-encoded URL would double-encode it
(e.g., %20 would become %2520).
This function solves the problem by:
1. Parsing the URL to extract query parameters
2. parse_qsl() automatically decodes parameters if they're encoded
3. urlencode() re-encodes them properly
4. Returns a consistently encoded URL that will pass validation
Example:
- Input: "http://example.com/test?time=2025-10-28 09:19" (space not encoded)
- Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
- Input: "http://example.com/test?time=2025-10-28%2009:19" (already encoded)
- Output: "http://example.com/test?time=2025-10-28+09%3A19" (properly encoded)
Returns a properly encoded URL string.
"""
try:
# Parse the URL into components (scheme, netloc, path, params, query, fragment)
parsed = urlparse(url)
# Parse query string - this automatically decodes it if encoded
# parse_qsl handles both encoded and unencoded query strings gracefully
query_params = parse_qsl(parsed.query, keep_blank_values=True)
# Re-encode the query string properly using standard URL encoding
encoded_query = urlencode(query_params, safe='')
# Reconstruct the URL with properly encoded query string
normalized = urlunparse((
parsed.scheme,
parsed.netloc,
parsed.path,
parsed.params,
encoded_query, # Use the re-encoded query
parsed.fragment
))
return normalized
except Exception as e:
# If parsing fails for any reason, return original URL
logger.debug(f"URL normalization failed for '{url}': {e}")
return url
@lru_cache(maxsize=10000)
def is_safe_valid_url(test_url):
from changedetectionio import strtobool
from changedetectionio.jinja2_custom import render as jinja_render
import os
import re
import validators
allow_file_access = strtobool(os.getenv('ALLOW_FILE_URI', 'false'))
safe_protocol_regex = '^(http|https|ftp|file):' if allow_file_access else '^(http|https|ftp):'
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
# 'source:' is a valid way to tell us to return the source
r = re.compile('^source:', re.IGNORECASE)
test_url = r.sub('', test_url)
# Check the actual rendered URL in case of any Jinja markup
try:
test_url = jinja_render(test_url)
except Exception as e:
logger.error(f'URL "{test_url}" is not correct Jinja2? {str(e)}')
return False
# Check query parameters and fragment
if re.search(r'[<>]', test_url):
logger.warning(f'URL "{test_url}" contains suspicious characters')
return False
# Normalize URL encoding - handle both encoded and unencoded query parameters
test_url = normalize_url_encoding(test_url)
# Be sure the protocol is safe (no file, etcetc)
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', safe_protocol_regex), re.IGNORECASE)
if not pattern.match(test_url.strip()):
logger.warning(f'URL "{test_url}" is not safe, aborting.')
return False
# If hosts that only contain alphanumerics are allowed ("localhost" for example)
allow_simplehost = not strtobool(os.getenv('BLOCK_SIMPLEHOSTS', 'False'))
try:
if not test_url.strip().lower().startswith('file:') and not validators.url(test_url, simple_host=allow_simplehost):
logger.warning(f'URL "{test_url}" failed validation, aborting.')
return False
except validators.ValidationError:
logger.warning(f'URL f"{test_url}" failed validation, aborting.')
return False
return True

View File

@@ -2,9 +2,6 @@
import sys import sys
import os import os
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..')) sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
from changedetectionio.widgets import TernaryNoneBooleanField from changedetectionio.widgets import TernaryNoneBooleanField
@@ -96,7 +93,7 @@ def test_custom_text():
print(f"Does NOT contain 'System default': {'System default' not in boolean_html}") print(f"Does NOT contain 'System default': {'System default' not in boolean_html}")
print(f"Does NOT contain 'Default': {'Default' not in boolean_html}") print(f"Does NOT contain 'Default': {'Default' not in boolean_html}")
assert 'Enabled' in boolean_html and 'Disabled' in boolean_html assert 'Enabled' in boolean_html and 'Disabled' in boolean_html
assert USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH not in boolean_html and 'Default' not in boolean_html assert 'System default' not in boolean_html and 'Default' not in boolean_html
# Test FontAwesome field # Test FontAwesome field
print("\n--- FontAwesome Icons Field ---") print("\n--- FontAwesome Icons Field ---")

View File

@@ -28,7 +28,7 @@ info:
For example: `x-api-key: YOUR_API_KEY` For example: `x-api-key: YOUR_API_KEY`
version: 0.1.2 version: 0.1.1
contact: contact:
name: ChangeDetection.io name: ChangeDetection.io
url: https://github.com/dgtlmoon/changedetection.io url: https://github.com/dgtlmoon/changedetection.io
@@ -143,7 +143,7 @@ components:
paused: paused:
type: boolean type: boolean
description: Whether the web page change monitor (watch) is paused description: Whether the web page change monitor (watch) is paused
notification_muted: muted:
type: boolean type: boolean
description: Whether notifications are muted description: Whether notifications are muted
method: method:
@@ -207,7 +207,7 @@ components:
maxLength: 5000 maxLength: 5000
notification_format: notification_format:
type: string type: string
enum: ['text', 'html', 'htmlcolor', 'markdown', 'System default'] enum: [Text, HTML, Markdown]
description: Format for notifications description: Format for notifications
track_ldjson_price_data: track_ldjson_price_data:
type: boolean type: boolean
@@ -406,7 +406,7 @@ paths:
page_title: "The HTML <title> from the page" page_title: "The HTML <title> from the page"
tags: ["550e8400-e29b-41d4-a716-446655440000"] tags: ["550e8400-e29b-41d4-a716-446655440000"]
paused: false paused: false
notification_muted: false muted: false
method: "GET" method: "GET"
fetch_backend: "html_requests" fetch_backend: "html_requests"
last_checked: 1640995200 last_checked: 1640995200
@@ -419,7 +419,7 @@ paths:
page_title: "The HTML <title> from the page" page_title: "The HTML <title> from the page"
tags: ["330e8400-e29b-41d4-a716-446655440001"] tags: ["330e8400-e29b-41d4-a716-446655440001"]
paused: false paused: false
notification_muted: true muted: true
method: "GET" method: "GET"
fetch_backend: "html_webdriver" fetch_backend: "html_webdriver"
last_checked: 1640998800 last_checked: 1640998800
@@ -1224,7 +1224,7 @@ paths:
title: "Example Website Monitor" title: "Example Website Monitor"
tags: ["550e8400-e29b-41d4-a716-446655440000"] tags: ["550e8400-e29b-41d4-a716-446655440000"]
paused: false paused: false
notification_muted: false muted: false
/import: /import:
post: post:

View File

@@ -1,5 +1,5 @@
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility # eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
feedgen~=1.0 feedgen~=0.9
feedparser~=6.0 # For parsing RSS/Atom feeds feedparser~=6.0 # For parsing RSS/Atom feeds
flask-compress flask-compress
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers) # 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
@@ -12,7 +12,7 @@ janus # Thread-safe async/sync queue bridge
flask_wtf~=1.2 flask_wtf~=1.2
flask~=3.1 flask~=3.1
flask-socketio~=5.5.1 flask-socketio~=5.5.1
python-socketio~=5.14.2 python-socketio~=5.13.0
python-engineio~=4.12.3 python-engineio~=4.12.3
inscriptis~=2.2 inscriptis~=2.2
pytz pytz
@@ -22,7 +22,7 @@ validators~=0.35
# Set these versions together to avoid a RequestsDependencyWarning # Set these versions together to avoid a RequestsDependencyWarning
# >= 2.26 also adds Brotli support if brotli is installed # >= 2.26 also adds Brotli support if brotli is installed
brotli~=1.1 brotli~=1.0
requests[socks] requests[socks]
requests-file requests-file
@@ -30,7 +30,7 @@ requests-file
# If specific version needed for security, use urllib3>=1.26.19,<3.0 # If specific version needed for security, use urllib3>=1.26.19,<3.0
chardet>2.3.0 chardet>2.3.0
wtforms~=3.2 wtforms~=3.0
jsonpath-ng~=1.5.3 jsonpath-ng~=1.5.3
# dnspython - Used by paho-mqtt for MQTT broker resolution # dnspython - Used by paho-mqtt for MQTT broker resolution