mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-01 23:28:06 +00:00
Compare commits
29 Commits
0.50.29
...
small-func
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
5fc37dfff4 | ||
|
|
2116b2cb93 | ||
|
|
acee89c0a9 | ||
|
|
552e98519b | ||
|
|
8f580ac96b | ||
|
|
a8cadc3d16 | ||
|
|
c9290d73e0 | ||
|
|
2db5e906e9 | ||
|
|
0751bd371a | ||
|
|
3ffa0805e9 | ||
|
|
3335270692 | ||
|
|
a7573b10ec | ||
|
|
df945ad743 | ||
|
|
4536e95205 | ||
|
|
1479d7bd46 | ||
|
|
9ba2094f75 | ||
|
|
8aa012ba8e | ||
|
|
8bc6b10db1 | ||
|
|
76d799c95b | ||
|
|
7c8bdfcc9f | ||
|
|
01a938d7ce | ||
|
|
e44853c439 | ||
|
|
3830bec891 | ||
|
|
88ab663330 | ||
|
|
68335b95c3 | ||
|
|
7bbfa0ef32 | ||
|
|
e233d52931 | ||
|
|
181d32e82a | ||
|
|
a51614f83d |
6
.github/workflows/pypi-release.yml
vendored
6
.github/workflows/pypi-release.yml
vendored
@@ -21,7 +21,7 @@ jobs:
|
||||
- name: Build a binary wheel and a source tarball
|
||||
run: python3 -m build
|
||||
- name: Store the distribution packages
|
||||
uses: actions/upload-artifact@v4
|
||||
uses: actions/upload-artifact@v5
|
||||
with:
|
||||
name: python-package-distributions
|
||||
path: dist/
|
||||
@@ -34,7 +34,7 @@ jobs:
|
||||
- build
|
||||
steps:
|
||||
- name: Download all the dists
|
||||
uses: actions/download-artifact@v5
|
||||
uses: actions/download-artifact@v6
|
||||
with:
|
||||
name: python-package-distributions
|
||||
path: dist/
|
||||
@@ -93,7 +93,7 @@ jobs:
|
||||
|
||||
steps:
|
||||
- name: Download all the dists
|
||||
uses: actions/download-artifact@v5
|
||||
uses: actions/download-artifact@v6
|
||||
with:
|
||||
name: python-package-distributions
|
||||
path: dist/
|
||||
|
||||
@@ -282,7 +282,7 @@ jobs:
|
||||
|
||||
- name: Store everything including test-datastore
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v4
|
||||
uses: actions/upload-artifact@v5
|
||||
with:
|
||||
name: test-cdio-basic-tests-output-py${{ env.PYTHON_VERSION }}
|
||||
path: .
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.29'
|
||||
__version__ = '0.50.33'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -3,15 +3,30 @@ from changedetectionio.strtobool import strtobool
|
||||
from flask_restful import abort, Resource
|
||||
from flask import request
|
||||
import validators
|
||||
from functools import wraps
|
||||
from . import auth, validate_openapi_request
|
||||
|
||||
|
||||
def default_content_type(content_type='text/plain'):
|
||||
"""Decorator to set a default Content-Type header if none is provided."""
|
||||
def decorator(f):
|
||||
@wraps(f)
|
||||
def wrapper(*args, **kwargs):
|
||||
if not request.content_type:
|
||||
# Set default content type in the request environment
|
||||
request.environ['CONTENT_TYPE'] = content_type
|
||||
return f(*args, **kwargs)
|
||||
return wrapper
|
||||
return decorator
|
||||
|
||||
|
||||
class Import(Resource):
|
||||
def __init__(self, **kwargs):
|
||||
# datastore is a black box dependency
|
||||
self.datastore = kwargs['datastore']
|
||||
|
||||
@auth.check_token
|
||||
@default_content_type('text/plain') #3547 #3542
|
||||
@validate_openapi_request('importWatches')
|
||||
def post(self):
|
||||
"""Import a list of watched URLs."""
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
import os
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
|
||||
from flask_expects_json import expects_json
|
||||
from changedetectionio import queuedWatchMetaData
|
||||
@@ -121,6 +123,10 @@ class Watch(Resource):
|
||||
if validation_error:
|
||||
return validation_error, 400
|
||||
|
||||
# XSS etc protection
|
||||
if request.json.get('url') and not is_safe_url(request.json.get('url')):
|
||||
return "Invalid URL", 400
|
||||
|
||||
watch.update(request.json)
|
||||
|
||||
return "OK", 200
|
||||
|
||||
@@ -240,9 +240,7 @@ nav
|
||||
<p>
|
||||
{{ render_field(form.application.form.scheduler_timezone_default) }}
|
||||
<datalist id="timezones" style="display: none;">
|
||||
{% for tz_name in available_timezones %}
|
||||
<option value="{{ tz_name }}">{{ tz_name }}</option>
|
||||
{% endfor %}
|
||||
{%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%}
|
||||
</datalist>
|
||||
</p>
|
||||
</div>
|
||||
|
||||
@@ -76,14 +76,14 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat
|
||||
|
||||
elif (op == 'notification-default'):
|
||||
from changedetectionio.notification import (
|
||||
default_notification_format_for_watch
|
||||
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
)
|
||||
for uuid in uuids:
|
||||
if datastore.data['watching'].get(uuid):
|
||||
datastore.data['watching'][uuid]['notification_title'] = None
|
||||
datastore.data['watching'][uuid]['notification_body'] = None
|
||||
datastore.data['watching'][uuid]['notification_urls'] = []
|
||||
datastore.data['watching'][uuid]['notification_format'] = default_notification_format_for_watch
|
||||
datastore.data['watching'][uuid]['notification_format'] = USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
if emit_flash:
|
||||
flash(f"{len(uuids)} watches set to use default notification settings")
|
||||
|
||||
|
||||
@@ -75,7 +75,6 @@ class Fetcher():
|
||||
self.screenshot = None
|
||||
self.xpath_data = None
|
||||
# Keep headers and status_code as they're small
|
||||
logger.trace("Fetcher content cleared from memory")
|
||||
|
||||
@abstractmethod
|
||||
def get_error(self):
|
||||
|
||||
@@ -1,21 +1,32 @@
|
||||
import difflib
|
||||
from typing import List, Iterator, Union
|
||||
|
||||
HTML_REMOVED_STYLE = "background-color: #fadad7; color: #b30000;"
|
||||
HTML_ADDED_STYLE = "background-color: #eaf2c2; color: #406619;"
|
||||
# https://github.com/dgtlmoon/changedetection.io/issues/821#issuecomment-1241837050
|
||||
#HTML_ADDED_STYLE = "background-color: #d2f7c2; color: #255d00;"
|
||||
#HTML_CHANGED_INTO_STYLE = "background-color: #dafbe1; color: #116329;"
|
||||
#HTML_CHANGED_STYLE = "background-color: #ffd6cc; color: #7a2000;"
|
||||
#HTML_REMOVED_STYLE = "background-color: #ffebe9; color: #82071e;"
|
||||
|
||||
# @todo - In the future we can make this configurable
|
||||
HTML_ADDED_STYLE = "background-color: #eaf2c2; color: #406619"
|
||||
HTML_REMOVED_STYLE = "background-color: #fadad7; color: #b30000"
|
||||
HTML_CHANGED_STYLE = HTML_REMOVED_STYLE
|
||||
HTML_CHANGED_INTO_STYLE = HTML_ADDED_STYLE
|
||||
|
||||
|
||||
# These get set to html or telegram type or discord compatible or whatever in handler.py
|
||||
REMOVED_PLACEMARKER_OPEN = '<<<removed_PLACEMARKER_OPEN'
|
||||
REMOVED_PLACEMARKER_CLOSED = '<<<removed_PLACEMARKER_CLOSED'
|
||||
# Something that cant get escaped to HTML by accident
|
||||
REMOVED_PLACEMARKER_OPEN = '@removed_PLACEMARKER_OPEN'
|
||||
REMOVED_PLACEMARKER_CLOSED = '@removed_PLACEMARKER_CLOSED'
|
||||
|
||||
ADDED_PLACEMARKER_OPEN = '<<<added_PLACEMARKER_OPEN'
|
||||
ADDED_PLACEMARKER_CLOSED = '<<<added_PLACEMARKER_CLOSED'
|
||||
ADDED_PLACEMARKER_OPEN = '@added_PLACEMARKER_OPEN'
|
||||
ADDED_PLACEMARKER_CLOSED = '@added_PLACEMARKER_CLOSED'
|
||||
|
||||
CHANGED_PLACEMARKER_OPEN = '<<<changed_PLACEMARKER_OPEN'
|
||||
CHANGED_PLACEMARKER_CLOSED = '<<<changed_PLACEMARKER_CLOSED'
|
||||
CHANGED_PLACEMARKER_OPEN = '@changed_PLACEMARKER_OPEN'
|
||||
CHANGED_PLACEMARKER_CLOSED = '@changed_PLACEMARKER_CLOSED'
|
||||
|
||||
CHANGED_INTO_PLACEMARKER_OPEN = '<<<changed_into_PLACEMARKER_OPEN'
|
||||
CHANGED_INTO_PLACEMARKER_CLOSED = '<<<changed_into_PLACEMARKER_CLOSED'
|
||||
CHANGED_INTO_PLACEMARKER_OPEN = '@changed_into_PLACEMARKER_OPEN'
|
||||
CHANGED_INTO_PLACEMARKER_CLOSED = '@changed_into_PLACEMARKER_CLOSED'
|
||||
|
||||
def same_slicer(lst: List[str], start: int, end: int) -> List[str]:
|
||||
"""Return a slice of the list, or a single element if start == end."""
|
||||
|
||||
@@ -133,6 +133,11 @@ def get_socketio_path():
|
||||
# Socket.IO will be available at {prefix}/socket.io/
|
||||
return prefix
|
||||
|
||||
@app.template_global('is_safe_url')
|
||||
def _is_safe_url(test_url):
|
||||
from .html_tools import is_safe_url
|
||||
return is_safe_url(test_url)
|
||||
|
||||
|
||||
@app.template_filter('format_number_locale')
|
||||
def _jinja2_filter_format_number_locale(value: float) -> str:
|
||||
|
||||
@@ -550,7 +550,7 @@ def validate_url(test_url):
|
||||
# This should be wtforms.validators.
|
||||
raise ValidationError(message)
|
||||
|
||||
from .model.Watch import is_safe_url
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
if not is_safe_url(test_url):
|
||||
# This should be wtforms.validators.
|
||||
raise ValidationError('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX or incorrect URL format')
|
||||
@@ -741,7 +741,6 @@ class quickWatchForm(Form):
|
||||
edit_and_watch_submit_button = SubmitField('Edit > Watch', render_kw={"class": "pure-button pure-button-primary"})
|
||||
|
||||
|
||||
|
||||
# Common to a single watch and the global settings
|
||||
class commonSettingsForm(Form):
|
||||
from . import processors
|
||||
@@ -754,7 +753,7 @@ class commonSettingsForm(Form):
|
||||
|
||||
fetch_backend = RadioField(u'Fetch Method', choices=content_fetchers.available_fetchers(), validators=[ValidateContentFetcherIsReady()])
|
||||
notification_body = TextAreaField('Notification Body', default='{{ watch_url }} had a change.', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||
notification_format = SelectField('Notification format', choices=valid_notification_formats.keys())
|
||||
notification_format = SelectField('Notification format', choices=list(valid_notification_formats.items()))
|
||||
notification_title = StringField('Notification Title', default='ChangeDetection.io Notification - {{ watch_url }}', validators=[validators.Optional(), ValidateJinja2Template()])
|
||||
notification_urls = StringListField('Notification URL List', validators=[validators.Optional(), ValidateAppRiseServers(), ValidateJinja2Template()])
|
||||
processor = RadioField( label=u"Processor - What do you want to achieve?", choices=processors.available_processors(), default="text_json_diff")
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from functools import lru_cache
|
||||
|
||||
from loguru import logger
|
||||
from typing import List
|
||||
import html
|
||||
@@ -13,6 +15,7 @@ TITLE_RE = re.compile(r"<title[^>]*>(.*?)</title>", re.I | re.S)
|
||||
META_CS = re.compile(r'<meta[^>]+charset=["\']?\s*([a-z0-9_\-:+.]+)', re.I)
|
||||
META_CT = re.compile(r'<meta[^>]+http-equiv=["\']?content-type["\']?[^>]*content=["\'][^>]*charset=([a-z0-9_\-:+.]+)', re.I)
|
||||
|
||||
SAFE_PROTOCOL_REGEX='^(http|https|ftp|file):'
|
||||
|
||||
# 'price' , 'lowPrice', 'highPrice' are usually under here
|
||||
# All of those may or may not appear on different websites - I didnt find a way todo case-insensitive searching here
|
||||
@@ -22,9 +25,25 @@ class JSONNotFound(ValueError):
|
||||
def __init__(self, msg):
|
||||
ValueError.__init__(self, msg)
|
||||
|
||||
def is_safe_url(test_url):
|
||||
import os
|
||||
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
|
||||
|
||||
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
|
||||
# 'source:' is a valid way to tell us to return the source
|
||||
|
||||
r = re.compile(re.escape('source:'), re.IGNORECASE)
|
||||
test_url = r.sub('', test_url)
|
||||
|
||||
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
|
||||
if not pattern.match(test_url.strip()):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
# Doesn't look like python supports forward slash auto enclosure in re.findall
|
||||
# So convert it to inline flag "(?i)foobar" type configuration
|
||||
@lru_cache(maxsize=100)
|
||||
def perl_style_slash_enclosed_regex_to_options(regex):
|
||||
|
||||
res = re.search(PERL_STYLE_REGEX, regex, re.IGNORECASE)
|
||||
@@ -185,8 +204,21 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
|
||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||
html_block = ""
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser)
|
||||
#@note: //title/text() wont work where <title>CDATA..
|
||||
# Build namespace map for XPath queries
|
||||
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||
|
||||
# Handle default namespace in documents (common in RSS/Atom feeds, but can occur in any XML)
|
||||
# XPath spec: unprefixed element names have no namespace, not the default namespace
|
||||
# Solution: Register the default namespace with empty string prefix in elementpath
|
||||
# This is primarily for RSS/Atom feeds but works for any XML with default namespace
|
||||
if hasattr(tree, 'nsmap') and tree.nsmap and None in tree.nsmap:
|
||||
# Register the default namespace with empty string prefix for elementpath
|
||||
# This allows //title to match elements in the default namespace
|
||||
namespaces[''] = tree.nsmap[None]
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
|
||||
#@note: //title/text() now works with default namespaces (fixed by registering '' prefix)
|
||||
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
|
||||
|
||||
if type(r) != list:
|
||||
r = [r]
|
||||
@@ -221,8 +253,19 @@ def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=Fals
|
||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||
html_block = ""
|
||||
|
||||
r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'})
|
||||
#@note: //title/text() wont work where <title>CDATA..
|
||||
# Build namespace map for XPath queries
|
||||
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||
|
||||
# NOTE: lxml's native xpath() does NOT support empty string prefix for default namespace
|
||||
# For documents with default namespace (RSS/Atom feeds), users must use:
|
||||
# - local-name(): //*[local-name()='title']/text()
|
||||
# - Or use xpath_filter (not xpath1_filter) which supports default namespaces
|
||||
# XPath spec: unprefixed element names have no namespace, not the default namespace
|
||||
|
||||
r = tree.xpath(xpath_filter.strip(), namespaces=namespaces)
|
||||
#@note: xpath1 (lxml) does NOT automatically handle default namespaces
|
||||
#@note: Use //*[local-name()='element'] or switch to xpath_filter for default namespace support
|
||||
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
|
||||
|
||||
for element in r:
|
||||
# When there's more than 1 match, then add the suffix to separate each line
|
||||
|
||||
@@ -9,6 +9,7 @@ from .safe_jinja import (
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE,
|
||||
DEFAULT_JINJA2_EXTENSIONS,
|
||||
)
|
||||
from .plugins.regex import regex_replace
|
||||
|
||||
__all__ = [
|
||||
'TimeExtension',
|
||||
@@ -17,4 +18,5 @@ __all__ = [
|
||||
'create_jinja_env',
|
||||
'JINJA2_MAX_RETURN_PAYLOAD_SIZE',
|
||||
'DEFAULT_JINJA2_EXTENSIONS',
|
||||
'regex_replace',
|
||||
]
|
||||
|
||||
6
changedetectionio/jinja2_custom/plugins/__init__.py
Normal file
6
changedetectionio/jinja2_custom/plugins/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
Jinja2 custom filter plugins for changedetection.io
|
||||
"""
|
||||
from .regex import regex_replace
|
||||
|
||||
__all__ = ['regex_replace']
|
||||
98
changedetectionio/jinja2_custom/plugins/regex.py
Normal file
98
changedetectionio/jinja2_custom/plugins/regex.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Regex filter plugin for Jinja2 templates.
|
||||
|
||||
Provides regex_replace filter for pattern-based string replacements in templates.
|
||||
"""
|
||||
import re
|
||||
import signal
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def regex_replace(value: str, pattern: str, replacement: str = '', count: int = 0) -> str:
|
||||
"""
|
||||
Replace occurrences of a regex pattern in a string.
|
||||
|
||||
Security: Protected against ReDoS (Regular Expression Denial of Service) attacks:
|
||||
- Limits input value size to prevent excessive processing
|
||||
- Uses timeout mechanism to prevent runaway regex operations
|
||||
- Validates pattern complexity to prevent catastrophic backtracking
|
||||
|
||||
Args:
|
||||
value: The input string to perform replacements on
|
||||
pattern: The regex pattern to search for
|
||||
replacement: The replacement string (default: '')
|
||||
count: Maximum number of replacements (0 = replace all, default: 0)
|
||||
|
||||
Returns:
|
||||
String with replacements applied, or original value on error
|
||||
|
||||
Example:
|
||||
{{ "hello world" | regex_replace("world", "universe") }}
|
||||
{{ diff | regex_replace("<td>([^<]+)</td><td>([^<]+)</td>", "Label1: \\1\\nLabel2: \\2") }}
|
||||
|
||||
Security limits:
|
||||
- Maximum input size: 10MB
|
||||
- Maximum pattern length: 500 characters
|
||||
- Operation timeout: 10 seconds
|
||||
- Dangerous nested quantifier patterns are rejected
|
||||
"""
|
||||
# Security limits
|
||||
MAX_INPUT_SIZE = 1024 * 1024 * 10 # 10MB max input size
|
||||
MAX_PATTERN_LENGTH = 500 # Maximum regex pattern length
|
||||
REGEX_TIMEOUT_SECONDS = 10 # Maximum time for regex operation
|
||||
|
||||
# Validate input sizes
|
||||
value_str = str(value)
|
||||
if len(value_str) > MAX_INPUT_SIZE:
|
||||
logger.warning(f"regex_replace: Input too large ({len(value_str)} bytes), truncating")
|
||||
value_str = value_str[:MAX_INPUT_SIZE]
|
||||
|
||||
if len(pattern) > MAX_PATTERN_LENGTH:
|
||||
logger.warning(f"regex_replace: Pattern too long ({len(pattern)} chars), rejecting")
|
||||
return value_str
|
||||
|
||||
# Check for potentially dangerous patterns (basic checks)
|
||||
# Nested quantifiers like (a+)+ can cause catastrophic backtracking
|
||||
dangerous_patterns = [
|
||||
r'\([^)]*\+[^)]*\)\+', # (x+)+
|
||||
r'\([^)]*\*[^)]*\)\+', # (x*)+
|
||||
r'\([^)]*\+[^)]*\)\*', # (x+)*
|
||||
r'\([^)]*\*[^)]*\)\*', # (x*)*
|
||||
]
|
||||
|
||||
for dangerous in dangerous_patterns:
|
||||
if re.search(dangerous, pattern):
|
||||
logger.warning(f"regex_replace: Potentially dangerous pattern detected: {pattern}")
|
||||
return value_str
|
||||
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutError("Regex operation timed out")
|
||||
|
||||
try:
|
||||
# Set up timeout for regex operation (Unix-like systems only)
|
||||
# This prevents ReDoS attacks
|
||||
old_handler = None
|
||||
if hasattr(signal, 'SIGALRM'):
|
||||
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(REGEX_TIMEOUT_SECONDS)
|
||||
|
||||
try:
|
||||
result = re.sub(pattern, replacement, value_str, count=count)
|
||||
finally:
|
||||
# Cancel the alarm
|
||||
if hasattr(signal, 'SIGALRM'):
|
||||
signal.alarm(0)
|
||||
if old_handler is not None:
|
||||
signal.signal(signal.SIGALRM, old_handler)
|
||||
|
||||
return result
|
||||
|
||||
except TimeoutError:
|
||||
logger.error(f"regex_replace: Regex operation timed out - possible ReDoS attack. Pattern: {pattern}")
|
||||
return value_str
|
||||
except re.error as e:
|
||||
logger.warning(f"regex_replace: Invalid regex pattern: {e}")
|
||||
return value_str
|
||||
except Exception as e:
|
||||
logger.error(f"regex_replace: Unexpected error: {e}")
|
||||
return value_str
|
||||
@@ -8,13 +8,13 @@ import jinja2.sandbox
|
||||
import typing as t
|
||||
import os
|
||||
from .extensions.TimeExtension import TimeExtension
|
||||
from .plugins import regex_replace
|
||||
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
||||
|
||||
# Default extensions - can be overridden in create_jinja_env()
|
||||
DEFAULT_JINJA2_EXTENSIONS = [TimeExtension]
|
||||
|
||||
|
||||
def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment:
|
||||
"""
|
||||
Create a sandboxed Jinja2 environment with our custom extensions and default timezone.
|
||||
@@ -38,6 +38,9 @@ def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandb
|
||||
default_timezone = os.getenv('TZ', 'UTC').strip()
|
||||
jinja2_env.default_timezone = default_timezone
|
||||
|
||||
# Register custom filters
|
||||
jinja2_env.filters['regex_replace'] = regex_replace
|
||||
|
||||
return jinja2_env
|
||||
|
||||
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
from blinker import signal
|
||||
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import watch_base
|
||||
@@ -21,23 +21,6 @@ FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
|
||||
|
||||
def is_safe_url(test_url):
|
||||
# See https://github.com/dgtlmoon/changedetection.io/issues/1358
|
||||
|
||||
# Remove 'source:' prefix so we dont get 'source:javascript:' etc
|
||||
# 'source:' is a valid way to tell us to return the source
|
||||
|
||||
r = re.compile(re.escape('source:'), re.IGNORECASE)
|
||||
test_url = r.sub('', test_url)
|
||||
|
||||
pattern = re.compile(os.getenv('SAFE_PROTOCOL_REGEX', SAFE_PROTOCOL_REGEX), re.IGNORECASE)
|
||||
if not pattern.match(test_url.strip()):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
|
||||
class model(watch_base):
|
||||
__newest_history_key = None
|
||||
__history_n = 0
|
||||
|
||||
@@ -2,7 +2,7 @@ import os
|
||||
import uuid
|
||||
|
||||
from changedetectionio import strtobool
|
||||
default_notification_format_for_watch = 'System default'
|
||||
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH = 'System default'
|
||||
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
|
||||
|
||||
class watch_base(dict):
|
||||
@@ -44,7 +44,7 @@ class watch_base(dict):
|
||||
'method': 'GET',
|
||||
'notification_alert_count': 0,
|
||||
'notification_body': None,
|
||||
'notification_format': default_notification_format_for_watch,
|
||||
'notification_format': USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH,
|
||||
'notification_muted': False,
|
||||
'notification_screenshot': False, # Include the latest screenshot if available and supported by the apprise URL
|
||||
'notification_title': None,
|
||||
|
||||
@@ -1,17 +1,16 @@
|
||||
from changedetectionio.model import default_notification_format_for_watch
|
||||
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
|
||||
default_notification_format = 'HTML Color'
|
||||
default_notification_format = 'htmlcolor'
|
||||
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
|
||||
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
|
||||
|
||||
# The values (markdown etc) are from apprise NotifyFormat,
|
||||
# But to avoid importing the whole heavy module just use the same strings here.
|
||||
valid_notification_formats = {
|
||||
'Plain Text': 'text',
|
||||
'HTML': 'html',
|
||||
'HTML Color': 'htmlcolor',
|
||||
'Markdown to HTML': 'markdown',
|
||||
'text': 'Plain Text',
|
||||
'html': 'HTML',
|
||||
'htmlcolor': 'HTML Color',
|
||||
'markdown': 'Markdown to HTML',
|
||||
# Used only for editing a watch (not for global)
|
||||
default_notification_format_for_watch: default_notification_format_for_watch
|
||||
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
}
|
||||
|
||||
|
||||
@@ -1,10 +1,61 @@
|
||||
"""
|
||||
Custom Apprise HTTP Handlers with format= Parameter Support
|
||||
|
||||
IMPORTANT: This module works around a limitation in Apprise's @notify decorator.
|
||||
|
||||
THE PROBLEM:
|
||||
-------------
|
||||
When using Apprise's @notify decorator to create custom notification handlers, the
|
||||
decorator creates a CustomNotifyPlugin that uses parse_url(..., simple=True) to parse
|
||||
URLs. This simple parsing mode does NOT extract the format= query parameter from the URL
|
||||
and set it as a top-level parameter that NotifyBase.__init__ can use to set notify_format.
|
||||
|
||||
As a result:
|
||||
1. URL: post://example.com/webhook?format=html
|
||||
2. Apprise parses this and sees format=html in qsd (query string dictionary)
|
||||
3. But it does NOT extract it and pass it to NotifyBase.__init__
|
||||
4. NotifyBase defaults to notify_format=TEXT
|
||||
5. When you call apobj.notify(body="<html>...", body_format="html"):
|
||||
- Apprise sees: input format = html, output format (notify_format) = text
|
||||
- Apprise calls convert_between("html", "text", body)
|
||||
- This strips all HTML tags, leaving only plain text
|
||||
6. Your custom handler receives stripped plain text instead of HTML
|
||||
|
||||
THE SOLUTION:
|
||||
-------------
|
||||
Instead of using the @notify decorator directly, we:
|
||||
1. Manually register custom plugins using plugins.N_MGR.add()
|
||||
2. Create a CustomHTTPHandler class that extends CustomNotifyPlugin
|
||||
3. Override __init__ to extract format= from qsd and set it as kwargs['format']
|
||||
4. Call NotifyBase.__init__ which properly sets notify_format from kwargs['format']
|
||||
5. Set up _default_args like CustomNotifyPlugin does for compatibility
|
||||
|
||||
This ensures that when format=html is in the URL:
|
||||
- notify_format is set to HTML
|
||||
- Apprise sees: input format = html, output format = html
|
||||
- No conversion happens (convert_between returns content unchanged)
|
||||
- Your custom handler receives the original HTML intact
|
||||
|
||||
TESTING:
|
||||
--------
|
||||
To verify this works:
|
||||
>>> apobj = apprise.Apprise()
|
||||
>>> apobj.add('post://localhost:5005/test?format=html')
|
||||
>>> for server in apobj:
|
||||
... print(server.notify_format) # Should print: html (not text)
|
||||
>>> apobj.notify(body='<span>Test</span>', body_format='html')
|
||||
# Your handler should receive '<span>Test</span>' not 'Test'
|
||||
"""
|
||||
|
||||
import json
|
||||
import re
|
||||
from urllib.parse import unquote_plus
|
||||
|
||||
import requests
|
||||
from apprise.decorators import notify
|
||||
from apprise.utils.parse import parse_url as apprise_parse_url
|
||||
from apprise import plugins
|
||||
from apprise.decorators.base import CustomNotifyPlugin
|
||||
from apprise.utils.parse import parse_url as apprise_parse_url, url_assembly
|
||||
from apprise.utils.logic import dict_full_update
|
||||
from loguru import logger
|
||||
from requests.structures import CaseInsensitiveDict
|
||||
|
||||
@@ -12,13 +63,66 @@ SUPPORTED_HTTP_METHODS = {"get", "post", "put", "delete", "patch", "head"}
|
||||
|
||||
|
||||
def notify_supported_methods(func):
|
||||
"""Register custom HTTP method handlers that properly support format= parameter."""
|
||||
for method in SUPPORTED_HTTP_METHODS:
|
||||
func = notify(on=method)(func)
|
||||
# Add support for https, for each supported http method
|
||||
func = notify(on=f"{method}s")(func)
|
||||
_register_http_handler(method, func)
|
||||
_register_http_handler(f"{method}s", func)
|
||||
return func
|
||||
|
||||
|
||||
def _register_http_handler(schema, send_func):
|
||||
"""Register a custom HTTP handler that extracts format= from URL query parameters."""
|
||||
|
||||
# Parse base URL
|
||||
base_url = f"{schema}://"
|
||||
base_args = apprise_parse_url(base_url, default_schema=schema, verify_host=False, simple=True)
|
||||
|
||||
class CustomHTTPHandler(CustomNotifyPlugin):
|
||||
secure_protocol = schema
|
||||
service_name = f"Custom HTTP - {schema.upper()}"
|
||||
_base_args = base_args
|
||||
|
||||
def __init__(self, **kwargs):
|
||||
# Extract format from qsd and set it as a top-level kwarg
|
||||
# This allows NotifyBase.__init__ to properly set notify_format
|
||||
if 'qsd' in kwargs and 'format' in kwargs['qsd']:
|
||||
kwargs['format'] = kwargs['qsd']['format']
|
||||
|
||||
# Call NotifyBase.__init__ (skip CustomNotifyPlugin.__init__)
|
||||
super(CustomNotifyPlugin, self).__init__(**kwargs)
|
||||
|
||||
# Set up _default_args like CustomNotifyPlugin does
|
||||
self._default_args = {}
|
||||
kwargs.pop("secure", None)
|
||||
dict_full_update(self._default_args, self._base_args)
|
||||
dict_full_update(self._default_args, kwargs)
|
||||
self._default_args["url"] = url_assembly(**self._default_args)
|
||||
|
||||
__send = staticmethod(send_func)
|
||||
|
||||
def send(self, body, title="", notify_type="info", *args, **kwargs):
|
||||
"""Call the custom send function."""
|
||||
try:
|
||||
result = self.__send(
|
||||
body, title, notify_type,
|
||||
*args,
|
||||
meta=self._default_args,
|
||||
**kwargs
|
||||
)
|
||||
return True if result is None else bool(result)
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Exception in custom HTTP handler: {e}")
|
||||
return False
|
||||
|
||||
# Register the plugin
|
||||
plugins.N_MGR.add(
|
||||
plugin=CustomHTTPHandler,
|
||||
schemas=schema,
|
||||
send_func=send_func,
|
||||
url=base_url,
|
||||
)
|
||||
|
||||
|
||||
def _get_auth(parsed_url: dict) -> str | tuple[str, str]:
|
||||
user: str | None = parsed_url.get("user")
|
||||
password: str | None = parsed_url.get("password")
|
||||
@@ -74,6 +178,8 @@ def apprise_http_custom_handler(
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> bool:
|
||||
|
||||
|
||||
url: str = meta.get("url")
|
||||
schema: str = meta.get("schema")
|
||||
method: str = re.sub(r"s$", "", schema).upper()
|
||||
@@ -89,7 +195,6 @@ def apprise_http_custom_handler(
|
||||
|
||||
url = re.sub(rf"^{schema}", "https" if schema.endswith("s") else "http", parsed_url.get("url"))
|
||||
|
||||
try:
|
||||
response = requests.request(
|
||||
method=method,
|
||||
url=url,
|
||||
@@ -103,11 +208,3 @@ def apprise_http_custom_handler(
|
||||
|
||||
logger.info(f"Successfully sent custom notification to {url}")
|
||||
return True
|
||||
|
||||
except requests.RequestException as e:
|
||||
logger.error(f"Remote host error while sending custom notification to {url}: {e}")
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Unexpected error occurred while sending custom notification to {url}: {e}")
|
||||
return False
|
||||
|
||||
42
changedetectionio/notification/email_helpers.py
Normal file
42
changedetectionio/notification/email_helpers.py
Normal file
@@ -0,0 +1,42 @@
|
||||
def as_monospaced_html_email(content: str, title: str) -> str:
|
||||
"""
|
||||
Wraps `content` in a minimal, email-safe HTML template
|
||||
that forces monospace rendering across Gmail, Hotmail, Apple Mail, etc.
|
||||
|
||||
Args:
|
||||
content: The body text (plain text or HTML-like).
|
||||
title: The title plaintext
|
||||
Returns:
|
||||
A complete HTML document string suitable for sending as an email body.
|
||||
"""
|
||||
|
||||
# All line feed types should be removed and then this function should only be fed <br>'s
|
||||
# Then it works with our <pre> styling without double linefeeds
|
||||
content = content.translate(str.maketrans('', '', '\r\n'))
|
||||
|
||||
if title:
|
||||
import html
|
||||
title = html.escape(title)
|
||||
else:
|
||||
title = ''
|
||||
# 2. Full email-safe HTML
|
||||
html_email = f"""<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="x-apple-disable-message-reformatting">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<!--[if mso]>
|
||||
<style>
|
||||
body, div, pre, td {{ font-family: "Courier New", Courier, monospace !important; }}
|
||||
</style>
|
||||
<![endif]-->
|
||||
<title>{title}</title>
|
||||
</head>
|
||||
<body style="-webkit-text-size-adjust:100%;-ms-text-size-adjust:100%;">
|
||||
<pre role="article" aria-roledescription="email" lang="en"
|
||||
style="font-family: monospace, 'Courier New', Courier; font-size: 0.8em;
|
||||
white-space: pre-wrap; word-break: break-word;">{content}</pre>
|
||||
</body>
|
||||
</html>"""
|
||||
return html_email
|
||||
@@ -6,10 +6,12 @@ from loguru import logger
|
||||
from urllib.parse import urlparse
|
||||
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
||||
from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS
|
||||
from .email_helpers import as_monospaced_html_email
|
||||
from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \
|
||||
ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \
|
||||
CHANGED_PLACEMARKER_CLOSED
|
||||
from ..notification_service import NotificationContextData
|
||||
CHANGED_PLACEMARKER_CLOSED, HTML_CHANGED_STYLE, HTML_CHANGED_INTO_STYLE
|
||||
from ..notification_service import NotificationContextData, CUSTOM_LINEBREAK_PLACEHOLDER
|
||||
|
||||
|
||||
|
||||
def markup_text_links_to_html(body):
|
||||
@@ -61,13 +63,13 @@ def notification_format_align_with_apprise(n_format : str):
|
||||
:return:
|
||||
"""
|
||||
|
||||
if n_format.lower().startswith('html'):
|
||||
if n_format.startswith('html'):
|
||||
# Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here
|
||||
n_format = NotifyFormat.HTML.value
|
||||
elif n_format.lower().startswith('markdown'):
|
||||
elif n_format.startswith('markdown'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.MARKDOWN.value
|
||||
elif n_format.lower().startswith('text'):
|
||||
elif n_format.startswith('text'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.TEXT.value
|
||||
else:
|
||||
@@ -75,6 +77,55 @@ def notification_format_align_with_apprise(n_format : str):
|
||||
|
||||
return n_format
|
||||
|
||||
def apply_discord_markdown_to_body(n_body):
|
||||
"""
|
||||
Discord does not support <del> but it supports non-standard ~~strikethrough~~
|
||||
:param n_body:
|
||||
:return:
|
||||
"""
|
||||
import re
|
||||
# Define the mapping between your placeholders and markdown markers
|
||||
replacements = [
|
||||
(REMOVED_PLACEMARKER_OPEN, '~~', REMOVED_PLACEMARKER_CLOSED, '~~'),
|
||||
(ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'),
|
||||
(CHANGED_PLACEMARKER_OPEN, '~~', CHANGED_PLACEMARKER_CLOSED, '~~'),
|
||||
(CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'),
|
||||
]
|
||||
# So that the markdown gets added without any whitespace following it which would break it
|
||||
for open_tag, open_md, close_tag, close_md in replacements:
|
||||
# Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag
|
||||
pattern = re.compile(
|
||||
re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag),
|
||||
flags=re.DOTALL
|
||||
)
|
||||
n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body)
|
||||
return n_body
|
||||
|
||||
def apply_standard_markdown_to_body(n_body):
|
||||
"""
|
||||
Apprise does not support ~~strikethrough~~ but it will convert <del> to HTML strikethrough.
|
||||
:param n_body:
|
||||
:return:
|
||||
"""
|
||||
import re
|
||||
# Define the mapping between your placeholders and markdown markers
|
||||
replacements = [
|
||||
(REMOVED_PLACEMARKER_OPEN, '<del>', REMOVED_PLACEMARKER_CLOSED, '</del>'),
|
||||
(ADDED_PLACEMARKER_OPEN, '**', ADDED_PLACEMARKER_CLOSED, '**'),
|
||||
(CHANGED_PLACEMARKER_OPEN, '<del>', CHANGED_PLACEMARKER_CLOSED, '</del>'),
|
||||
(CHANGED_INTO_PLACEMARKER_OPEN, '**', CHANGED_INTO_PLACEMARKER_CLOSED, '**'),
|
||||
]
|
||||
|
||||
# So that the markdown gets added without any whitespace following it which would break it
|
||||
for open_tag, open_md, close_tag, close_md in replacements:
|
||||
# Regex: match opening tag, optional whitespace, capture the content, optional whitespace, then closing tag
|
||||
pattern = re.compile(
|
||||
re.escape(open_tag) + r'(\s*)(.*?)?(\s*)' + re.escape(close_tag),
|
||||
flags=re.DOTALL
|
||||
)
|
||||
n_body = pattern.sub(lambda m: f"{m.group(1)}{open_md}{m.group(2)}{close_md}{m.group(3)}", n_body)
|
||||
return n_body
|
||||
|
||||
|
||||
def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
|
||||
@@ -104,6 +155,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
# @todo re-use an existing library we have already imported to strip all non-allowed tags
|
||||
n_body = n_body.replace('<br>', '\n')
|
||||
n_body = n_body.replace('</br>', '\n')
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n')
|
||||
|
||||
# Use strikethrough for removed content, bold for added content
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '<s>')
|
||||
@@ -128,6 +180,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
# Discord doesn't support HTML, replace <br> with newlines
|
||||
n_body = n_body.strip().replace('<br>', '\n')
|
||||
n_body = n_body.replace('</br>', '\n')
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\n')
|
||||
|
||||
# Don't replace placeholders or truncate here - let the custom Discord plugin handle it
|
||||
# The plugin will use embeds (6000 char limit across all embeds) if placeholders are present,
|
||||
@@ -137,15 +190,7 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
if requested_output_format == 'html':
|
||||
# No diff placeholders, use Discord markdown for any other formatting
|
||||
# Use Discord markdown: strikethrough for removed, bold for added
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '~~')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '~~')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '**')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '**')
|
||||
# Handle changed/replaced lines (old → new)
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, '~~')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, '~~')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, '**')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, '**')
|
||||
n_body = apply_discord_markdown_to_body(n_body=n_body)
|
||||
|
||||
# Apply 2000 char limit for plain content
|
||||
payload_max_size = 1700
|
||||
@@ -156,16 +201,17 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
|
||||
# Is not discord/tgram and they want htmlcolor
|
||||
elif requested_output_format == 'htmlcolor':
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">')
|
||||
# https://github.com/dgtlmoon/changedetection.io/issues/821#issuecomment-1241837050
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}" role="deletion" aria-label="Removed text" title="Removed text">')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}" role="insertion" aria-label="Added text" title="Added text">')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, f'</span>')
|
||||
# Handle changed/replaced lines (old → new)
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'<span style="{HTML_CHANGED_STYLE}" role="note" aria-label="Changed text" title="Changed text">')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'<span style="{HTML_CHANGED_INTO_STYLE}" role="note" aria-label="Changed into" title="Changed into">')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n')
|
||||
elif requested_output_format == 'html':
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '')
|
||||
@@ -175,7 +221,10 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'')
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
n_body = n_body.replace('\n', f'{CUSTOM_LINEBREAK_PLACEHOLDER}\n')
|
||||
elif requested_output_format == 'markdown':
|
||||
# Markdown to HTML - Apprise will convert this to HTML
|
||||
n_body = apply_standard_markdown_to_body(n_body=n_body)
|
||||
|
||||
else: #plaintext etc default
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ')
|
||||
@@ -192,21 +241,12 @@ def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
|
||||
def process_notification(n_object: NotificationContextData, datastore):
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
||||
from . import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH, default_notification_format, valid_notification_formats
|
||||
# be sure its registered
|
||||
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||
# Register custom Discord plugin
|
||||
from .apprise_plugin.discord import NotifyDiscordCustom
|
||||
|
||||
# Create list of custom handler protocols (both http and https versions)
|
||||
custom_handler_protocols = [f"{method}://" for method in SUPPORTED_HTTP_METHODS]
|
||||
custom_handler_protocols += [f"{method}s://" for method in SUPPORTED_HTTP_METHODS]
|
||||
|
||||
has_custom_handler = any(
|
||||
url.startswith(tuple(custom_handler_protocols))
|
||||
for url in n_object['notification_urls']
|
||||
)
|
||||
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
|
||||
@@ -217,27 +257,21 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
# Insert variables into the notification content
|
||||
notification_parameters = create_notification_parameters(n_object, datastore)
|
||||
|
||||
requested_output_format = valid_notification_formats.get(
|
||||
n_object.get('notification_format', default_notification_format),
|
||||
valid_notification_formats[default_notification_format],
|
||||
)
|
||||
requested_output_format = n_object.get('notification_format', default_notification_format)
|
||||
logger.debug(f"Requested notification output format: '{requested_output_format}'")
|
||||
|
||||
# If we arrived with 'System default' then look it up
|
||||
if requested_output_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
|
||||
if requested_output_format == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
|
||||
# Initially text or whatever
|
||||
requested_output_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower()
|
||||
requested_output_format = datastore.data['settings']['application'].get('notification_format', default_notification_format)
|
||||
|
||||
requested_output_format_original = requested_output_format
|
||||
|
||||
# Now clean it up so it fits perfectly with apprise
|
||||
requested_output_format = notification_format_align_with_apprise(n_format=requested_output_format)
|
||||
|
||||
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
|
||||
|
||||
# If we have custom handlers, use invalid format to prevent conversion
|
||||
# Otherwise use the proper format
|
||||
if has_custom_handler:
|
||||
input_format = 'raw-no-convert'
|
||||
|
||||
# https://github.com/caronc/apprise/wiki/Development_LogCapture
|
||||
# Anything higher than or equal to WARNING (which covers things like Connection errors)
|
||||
# raise it as an exception
|
||||
@@ -258,43 +292,15 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
if not n_object.get('notification_urls'):
|
||||
return None
|
||||
|
||||
with apprise.LogCapture(level=apprise.logging.DEBUG) as logs:
|
||||
with (apprise.LogCapture(level=apprise.logging.DEBUG) as logs):
|
||||
for url in n_object['notification_urls']:
|
||||
parsed_url = urlparse(url)
|
||||
prefix_add_to_url = '?' if not parsed_url.query else '&'
|
||||
|
||||
# Get the notification body from datastore
|
||||
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
|
||||
|
||||
if n_object.get('markup_text_to_html'):
|
||||
if n_object.get('markup_text_links_to_html_links'):
|
||||
n_body = markup_text_links_to_html(body=n_body)
|
||||
|
||||
# This actually means we request "Markdown to HTML"
|
||||
if requested_output_format == NotifyFormat.MARKDOWN.value:
|
||||
output_format = NotifyFormat.HTML.value
|
||||
input_format = NotifyFormat.MARKDOWN.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
# Deviation from apprise.
|
||||
# No conversion, its like they want to send raw HTML but we add linebreaks
|
||||
elif requested_output_format == NotifyFormat.HTML.value:
|
||||
# same in and out means apprise wont try to convert
|
||||
input_format = output_format = NotifyFormat.HTML.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
else:
|
||||
# Nothing to be done, leave it as plaintext
|
||||
# `body_format` Tell apprise what format the INPUT is in
|
||||
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
|
||||
input_format = output_format = NotifyFormat.TEXT.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
if has_custom_handler:
|
||||
input_format='raw-no-convert'
|
||||
|
||||
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
|
||||
|
||||
url = url.strip()
|
||||
@@ -309,20 +315,68 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
logger.info(f">> Process Notification: AppRise notifying {url}")
|
||||
url = jinja_render(template_str=url, **notification_parameters)
|
||||
|
||||
# If it's a plaintext document, and they want HTML type email/alerts, so it needs to be escaped
|
||||
watch_mime_type = n_object.get('watch_mime_type')
|
||||
if watch_mime_type and 'text/' in watch_mime_type.lower() and not 'html' in watch_mime_type.lower():
|
||||
if 'html' in requested_output_format:
|
||||
from markupsafe import escape
|
||||
n_body = str(escape(n_body))
|
||||
|
||||
if 'html' in requested_output_format:
|
||||
# Since the n_body is always some kind of text from the 'diff' engine, attempt to preserve whitespaces that get sent to the HTML output
|
||||
# But only where its more than 1 consecutive whitespace, otherwise "and this" becomes "and this" etc which is too much.
|
||||
n_body = n_body.replace(' ', ' ')
|
||||
|
||||
(url, n_body, n_title) = apply_service_tweaks(url=url, n_body=n_body, n_title=n_title, requested_output_format=requested_output_format_original)
|
||||
|
||||
apobj.add(url)
|
||||
apprise_input_format = "NO-THANKS-WE-WILL-MANAGE-ALL-OF-THIS"
|
||||
|
||||
if not 'format=' in url:
|
||||
parsed_url = urlparse(url)
|
||||
prefix_add_to_url = '?' if not parsed_url.query else '&'
|
||||
|
||||
# THIS IS THE TRICK HOW TO DISABLE APPRISE DOING WEIRD AUTO-CONVERSION WITH BREAKING BR TAGS ETC
|
||||
if 'html' in requested_output_format:
|
||||
url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}"
|
||||
apprise_input_format = NotifyFormat.HTML.value
|
||||
elif 'text' in requested_output_format:
|
||||
url = f"{url}{prefix_add_to_url}format={NotifyFormat.TEXT.value}"
|
||||
apprise_input_format = NotifyFormat.TEXT.value
|
||||
|
||||
elif requested_output_format == NotifyFormat.MARKDOWN.value:
|
||||
# Convert markdown to HTML ourselves since not all plugins do this
|
||||
from apprise.conversion import markdown_to_html
|
||||
# Make sure there are paragraph breaks around horizontal rules
|
||||
n_body = n_body.replace('---', '\n\n---\n\n')
|
||||
n_body = markdown_to_html(n_body)
|
||||
url = f"{url}{prefix_add_to_url}format={NotifyFormat.HTML.value}"
|
||||
requested_output_format = NotifyFormat.HTML.value
|
||||
apprise_input_format = NotifyFormat.HTML.value # Changed from MARKDOWN to HTML
|
||||
|
||||
# Could have arrived at any stage, so we dont end up running .escape on it
|
||||
if 'html' in requested_output_format:
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '<br>\r\n')
|
||||
else:
|
||||
# texty types
|
||||
n_body = n_body.replace(CUSTOM_LINEBREAK_PLACEHOLDER, '\r\n')
|
||||
|
||||
sent_objs.append({'title': n_title,
|
||||
'body': n_body,
|
||||
'url': url})
|
||||
apobj.add(url)
|
||||
|
||||
# Since the output is always based on the plaintext of the 'diff' engine, wrap it nicely.
|
||||
# It should always be similar to the 'history' part of the UI.
|
||||
if url.startswith('mail') and 'html' in requested_output_format:
|
||||
if not '<pre' in n_body and not '<body' in n_body: # No custom HTML-ish body was setup already
|
||||
n_body = as_monospaced_html_email(content=n_body, title=n_title)
|
||||
|
||||
apobj.notify(
|
||||
title=n_title,
|
||||
body=n_body,
|
||||
# `body_format` Tell apprise what format the INPUT is in
|
||||
# `body_format` Tell apprise what format the INPUT is in, specify a wrong/bad type and it will force skip conversion in apprise
|
||||
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
|
||||
body_format=input_format,
|
||||
body_format=apprise_input_format,
|
||||
# False is not an option for AppRise, must be type None
|
||||
attach=n_object.get('screenshot', None)
|
||||
)
|
||||
|
||||
@@ -9,29 +9,35 @@ for both sync and async workers
|
||||
from loguru import logger
|
||||
import time
|
||||
|
||||
from changedetectionio.notification import default_notification_format
|
||||
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from changedetectionio.notification import default_notification_format, valid_notification_formats
|
||||
|
||||
# This gets modified on notification time (handler.py) depending on the required notification output
|
||||
CUSTOM_LINEBREAK_PLACEHOLDER='@BR@'
|
||||
|
||||
|
||||
# What is passed around as notification context, also used as the complete list of valid {{ tokens }}
|
||||
class NotificationContextData(dict):
|
||||
def __init__(self, initial_data=None, **kwargs):
|
||||
super().__init__({
|
||||
'base_url': None,
|
||||
'current_snapshot': None,
|
||||
'diff': None,
|
||||
'diff_added': None,
|
||||
'diff_full': None,
|
||||
'diff_patch': None,
|
||||
'diff_removed': None,
|
||||
'diff_url': None,
|
||||
'markup_text_links_to_html_links': False, # If automatic conversion of plaintext to HTML should happen
|
||||
'notification_timestamp': time.time(),
|
||||
'preview_url': None,
|
||||
'screenshot': None,
|
||||
'triggered_text': None,
|
||||
'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', # Converted to 'watch_uuid' in create_notification_parameters
|
||||
'watch_url': 'https://WATCH-PLACE-HOLDER/',
|
||||
'base_url': None,
|
||||
'diff_url': None,
|
||||
'preview_url': None,
|
||||
'watch_mime_type': None,
|
||||
'watch_tag': None,
|
||||
'watch_title': None,
|
||||
'markup_text_to_html': False, # If automatic conversion of plaintext to HTML should happen
|
||||
'watch_url': 'https://WATCH-PLACE-HOLDER/',
|
||||
})
|
||||
|
||||
# Apply any initial data passed in
|
||||
@@ -43,15 +49,28 @@ class NotificationContextData(dict):
|
||||
if kwargs:
|
||||
self.update(kwargs)
|
||||
|
||||
n_format = self.get('notification_format')
|
||||
if n_format and not valid_notification_formats.get(n_format):
|
||||
raise ValueError(f'Invalid notification format: "{n_format}"')
|
||||
|
||||
def set_random_for_validation(self):
|
||||
import random, string
|
||||
"""Randomly fills all dict keys with random strings (for validation/testing)."""
|
||||
"""Randomly fills all dict keys with random strings (for validation/testing).
|
||||
So we can test the output in the notification body
|
||||
"""
|
||||
for key in self.keys():
|
||||
if key in ['uuid', 'time', 'watch_uuid']:
|
||||
continue
|
||||
rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12))
|
||||
self[key] = rand_str
|
||||
|
||||
def __setitem__(self, key, value):
|
||||
if key == 'notification_format' and isinstance(value, str) and not value.startswith('RANDOM-PLACEHOLDER-'):
|
||||
if not valid_notification_formats.get(value):
|
||||
raise ValueError(f'Invalid notification format: "{value}"')
|
||||
|
||||
super().__setitem__(key, value)
|
||||
|
||||
class NotificationService:
|
||||
"""
|
||||
Standalone notification service that handles all notification functionality
|
||||
@@ -67,7 +86,7 @@ class NotificationService:
|
||||
Queue a notification for a watch with full diff rendering and template variables
|
||||
"""
|
||||
from changedetectionio import diff
|
||||
from changedetectionio.notification import default_notification_format_for_watch
|
||||
from changedetectionio.notification import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
@@ -89,27 +108,16 @@ class NotificationService:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
# If we ended up here with "System default"
|
||||
if n_object.get('notification_format') == default_notification_format_for_watch:
|
||||
if n_object.get('notification_format') == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
|
||||
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
||||
if n_object.get('notification_format') == 'HTML':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
elif n_object.get('notification_format') == 'HTML Color':
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
else:
|
||||
line_feed_sep = "\n"
|
||||
|
||||
triggered_text = ''
|
||||
if len(trigger_text):
|
||||
from . import html_tools
|
||||
triggered_text = html_tools.get_triggered_text(content=snapshot_contents, trigger_text=trigger_text)
|
||||
if triggered_text:
|
||||
triggered_text = line_feed_sep.join(triggered_text)
|
||||
triggered_text = CUSTOM_LINEBREAK_PLACEHOLDER.join(triggered_text)
|
||||
|
||||
# Could be called as a 'test notification' with only 1 snapshot available
|
||||
prev_snapshot = "Example text: example test\nExample text: change detection is cool\nExample text: some more examples\n"
|
||||
@@ -121,16 +129,17 @@ class NotificationService:
|
||||
|
||||
n_object.update({
|
||||
'current_snapshot': snapshot_contents,
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=CUSTOM_LINEBREAK_PLACEHOLDER),
|
||||
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
|
||||
'triggered_text': triggered_text,
|
||||
'uuid': watch.get('uuid') if watch else None,
|
||||
'watch_url': watch.get('url') if watch else None,
|
||||
'watch_uuid': watch.get('uuid') if watch else None,
|
||||
'watch_mime_type': watch.get('content-type')
|
||||
})
|
||||
|
||||
if watch:
|
||||
@@ -146,7 +155,7 @@ class NotificationService:
|
||||
Individual watch settings > Tag settings > Global settings
|
||||
"""
|
||||
from changedetectionio.notification import (
|
||||
default_notification_format_for_watch,
|
||||
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH,
|
||||
default_notification_body,
|
||||
default_notification_title
|
||||
)
|
||||
@@ -154,7 +163,7 @@ class NotificationService:
|
||||
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
|
||||
v = watch.get(var_name)
|
||||
if v and not watch.get('notification_muted'):
|
||||
if var_name == 'notification_format' and v == default_notification_format_for_watch:
|
||||
if var_name == 'notification_format' and v == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
|
||||
return self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
return v
|
||||
@@ -171,7 +180,7 @@ class NotificationService:
|
||||
|
||||
# Otherwise could be defaults
|
||||
if var_name == 'notification_format':
|
||||
return default_notification_format_for_watch
|
||||
return USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
if var_name == 'notification_body':
|
||||
return default_notification_body
|
||||
if var_name == 'notification_title':
|
||||
@@ -226,9 +235,8 @@ class NotificationService:
|
||||
if not watch:
|
||||
return
|
||||
|
||||
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format)
|
||||
filter_list = ", ".join(watch['include_filters'])
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed
|
||||
body = f"""Hello,
|
||||
|
||||
Your configured CSS/xPath filters of '{filter_list}' for {{{{watch_url}}}} did not appear on the page after {threshold} attempts.
|
||||
@@ -243,9 +251,9 @@ Thanks - Your omniscient changedetection.io installation.
|
||||
n_object = NotificationContextData({
|
||||
'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
|
||||
'notification_body': body,
|
||||
'notification_format': n_format,
|
||||
'markup_text_to_html': n_format.lower().startswith('html')
|
||||
'notification_format': self._check_cascading_vars('notification_format', watch),
|
||||
})
|
||||
n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html')
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
@@ -273,9 +281,9 @@ Thanks - Your omniscient changedetection.io installation.
|
||||
if not watch:
|
||||
return
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format).lower()
|
||||
|
||||
step = step_n + 1
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_links_to_html_links' is not needed
|
||||
|
||||
# {{{{ }}}} because this will be Jinja2 {{ }} tokens
|
||||
body = f"""Hello,
|
||||
@@ -292,9 +300,9 @@ Thanks - Your omniscient changedetection.io installation.
|
||||
n_object = NotificationContextData({
|
||||
'notification_title': f"Changedetection.io - Alert - Browser step at position {step} could not be run",
|
||||
'notification_body': body,
|
||||
'notification_format': n_format,
|
||||
'markup_text_to_html': n_format.lower().startswith('html')
|
||||
'notification_format': self._check_cascading_vars('notification_format', watch),
|
||||
})
|
||||
n_object['markup_text_links_to_html_links'] = n_object.get('notification_format').startswith('html')
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from changedetectionio.html_tools import is_safe_url
|
||||
|
||||
from flask import (
|
||||
flash
|
||||
)
|
||||
|
||||
from .html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
from . model import App, Watch
|
||||
from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from copy import deepcopy, copy
|
||||
from os import path, unlink
|
||||
from threading import Lock
|
||||
@@ -340,7 +341,6 @@ class ChangeDetectionStore:
|
||||
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
|
||||
flash("Error fetching metadata for {}".format(url), 'error')
|
||||
return False
|
||||
from .model.Watch import is_safe_url
|
||||
if not is_safe_url(url):
|
||||
flash('Watch protocol is not permitted by SAFE_PROTOCOL_REGEX', 'error')
|
||||
return None
|
||||
@@ -987,10 +987,35 @@ class ChangeDetectionStore:
|
||||
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
|
||||
|
||||
def update_21(self):
|
||||
if self.data['settings']['application'].get('timezone'):
|
||||
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
|
||||
del self.data['settings']['application']['timezone']
|
||||
|
||||
|
||||
# Some notification formats got the wrong name type
|
||||
def update_22(self):
|
||||
from .notification import valid_notification_formats
|
||||
|
||||
sys_n_format = self.data['settings']['application'].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == sys_n_format), None)
|
||||
if key_exists_as_value: # key of "Plain text"
|
||||
logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['settings']['application']['notification_format'] = key_exists_as_value
|
||||
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
n_format = self.data['watching'][uuid].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None)
|
||||
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
|
||||
logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
|
||||
|
||||
for uuid, tag in self.data['settings']['application']['tags'].items():
|
||||
n_format = self.data['settings']['application']['tags'][uuid].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in valid_notification_formats.items() if v == n_format), None)
|
||||
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
|
||||
logger.success(f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['settings']['application']['tags'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
|
||||
|
||||
def add_notification_url(self, notification_url):
|
||||
|
||||
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
|
||||
|
||||
@@ -134,6 +134,12 @@
|
||||
<p>
|
||||
URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code>
|
||||
</p>
|
||||
<p>
|
||||
Regular-expression replace, use <strong>|regex_replace</strong>, for example - <code>{{ "{{ \"hello world 123\" | regex_replace('[0-9]+', 'no-more-numbers') }}" }}</code>
|
||||
</p>
|
||||
<p>
|
||||
For a complete reference of all Jinja2 built-in filters, users can refer to the <a href="https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters">https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters</a>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -266,9 +266,7 @@
|
||||
<li id="timezone-info">
|
||||
{{ render_field(form.time_schedule_limit.timezone, placeholder=timezone_default_config) }} <span id="local-time-in-tz"></span>
|
||||
<datalist id="timezones" style="display: none;">
|
||||
{% for timezone in available_timezones %}
|
||||
<option value="{{ timezone }}">{{ timezone }}</option>
|
||||
{% endfor %}
|
||||
{%- for timezone in available_timezones -%}<option value="{{ timezone }}">{{ timezone }}</option>{%- endfor -%}
|
||||
</datalist>
|
||||
</li>
|
||||
</ul>
|
||||
|
||||
@@ -53,7 +53,7 @@
|
||||
<a class="pure-menu-heading" href="{{url_for('watchlist.index')}}">
|
||||
<strong>Change</strong>Detection.io</a>
|
||||
{% endif %}
|
||||
{% if current_diff_url %}
|
||||
{% if current_diff_url and is_safe_url(current_diff_url) %}
|
||||
<a class="current-diff-url" href="{{ current_diff_url }}">
|
||||
<span style="max-width: 30%; overflow: hidden">{{ current_diff_url }}</span></a>
|
||||
{% else %}
|
||||
|
||||
@@ -1,51 +1,110 @@
|
||||
#!/usr/bin/env python3
|
||||
import asyncio
|
||||
import threading
|
||||
import time
|
||||
from aiosmtpd.controller import Controller
|
||||
from aiosmtpd.smtp import SMTP
|
||||
from flask import Flask, Response
|
||||
from email import message_from_bytes
|
||||
from email.policy import default
|
||||
|
||||
# Accept a SMTP message and offer a way to retrieve the last message via TCP Socket
|
||||
# Accept a SMTP message and offer a way to retrieve the last message via HTTP
|
||||
|
||||
last_received_message = b"Nothing"
|
||||
last_received_message = b"Nothing received yet."
|
||||
active_smtp_connections = 0
|
||||
smtp_lock = threading.Lock()
|
||||
|
||||
|
||||
class CustomSMTPHandler:
|
||||
async def handle_DATA(self, server, session, envelope):
|
||||
global last_received_message
|
||||
global last_received_message, active_smtp_connections
|
||||
|
||||
with smtp_lock:
|
||||
active_smtp_connections += 1
|
||||
|
||||
try:
|
||||
last_received_message = envelope.content
|
||||
print('Receiving message from:', session.peer)
|
||||
print('Message addressed from:', envelope.mail_from)
|
||||
print('Message addressed to :', envelope.rcpt_tos)
|
||||
print('Message length :', len(envelope.content))
|
||||
print('*******************************')
|
||||
print(envelope.content.decode('utf8'))
|
||||
print('*******************************')
|
||||
|
||||
# Parse the email message
|
||||
msg = message_from_bytes(envelope.content, policy=default)
|
||||
with open('/tmp/last.eml', 'wb') as f:
|
||||
f.write(envelope.content)
|
||||
|
||||
# Write parts to files based on content type
|
||||
if msg.is_multipart():
|
||||
for part in msg.walk():
|
||||
content_type = part.get_content_type()
|
||||
payload = part.get_payload(decode=True)
|
||||
|
||||
if payload:
|
||||
if content_type == 'text/plain':
|
||||
with open('/tmp/last.txt', 'wb') as f:
|
||||
f.write(payload)
|
||||
print(f'Written text/plain part to /tmp/last.txt')
|
||||
elif content_type == 'text/html':
|
||||
with open('/tmp/last.html', 'wb') as f:
|
||||
f.write(payload)
|
||||
print(f'Written text/html part to /tmp/last.html')
|
||||
else:
|
||||
# Single part message
|
||||
content_type = msg.get_content_type()
|
||||
payload = msg.get_payload(decode=True)
|
||||
|
||||
if payload:
|
||||
if content_type == 'text/plain' or content_type.startswith('text/'):
|
||||
with open('/tmp/last.txt', 'wb') as f:
|
||||
f.write(payload)
|
||||
print(f'Written single part message to /tmp/last.txt')
|
||||
|
||||
return '250 Message accepted for delivery'
|
||||
finally:
|
||||
with smtp_lock:
|
||||
active_smtp_connections -= 1
|
||||
|
||||
|
||||
class EchoServerProtocol(asyncio.Protocol):
|
||||
def connection_made(self, transport):
|
||||
global last_received_message
|
||||
self.transport = transport
|
||||
peername = transport.get_extra_info('peername')
|
||||
print('Incoming connection from {}'.format(peername))
|
||||
self.transport.write(last_received_message)
|
||||
|
||||
last_received_message = b''
|
||||
self.transport.close()
|
||||
# Simple Flask HTTP server to echo back the last SMTP message
|
||||
app = Flask(__name__)
|
||||
|
||||
|
||||
async def main():
|
||||
@app.route('/')
|
||||
def echo_last_message():
|
||||
global last_received_message, active_smtp_connections
|
||||
|
||||
# Wait for any in-progress SMTP connections to complete
|
||||
max_wait = 5 # Maximum 5 seconds
|
||||
wait_interval = 0.05 # Check every 50ms
|
||||
elapsed = 0
|
||||
|
||||
while elapsed < max_wait:
|
||||
with smtp_lock:
|
||||
if active_smtp_connections == 0:
|
||||
break
|
||||
time.sleep(wait_interval)
|
||||
elapsed += wait_interval
|
||||
|
||||
return Response(last_received_message, mimetype='text/plain')
|
||||
|
||||
|
||||
def run_flask():
|
||||
app.run(host='0.0.0.0', port=11080, debug=False, use_reloader=False)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Start the SMTP server
|
||||
controller = Controller(CustomSMTPHandler(), hostname='0.0.0.0', port=11025)
|
||||
controller.start()
|
||||
|
||||
# Start the TCP Echo server
|
||||
loop = asyncio.get_running_loop()
|
||||
server = await loop.create_server(
|
||||
lambda: EchoServerProtocol(),
|
||||
'0.0.0.0', 11080
|
||||
)
|
||||
async with server:
|
||||
await server.serve_forever()
|
||||
# Start the HTTP server in a separate thread
|
||||
flask_thread = threading.Thread(target=run_flask, daemon=True)
|
||||
flask_thread.start()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
# Keep the main thread alive
|
||||
try:
|
||||
flask_thread.join()
|
||||
except KeyboardInterrupt:
|
||||
print("Shutting down...")
|
||||
|
||||
@@ -3,7 +3,8 @@ from flask import url_for
|
||||
from email import message_from_string
|
||||
from email.policy import default as email_policy
|
||||
|
||||
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE
|
||||
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE, HTML_CHANGED_STYLE
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
|
||||
wait_for_all_checks, \
|
||||
set_longer_modified_response, delete_all_watches
|
||||
@@ -14,6 +15,8 @@ import logging
|
||||
# NOTE - RELIES ON mailserver as hostname running, see github build recipes
|
||||
smtp_test_server = 'mailserver'
|
||||
|
||||
ALL_MARKUP_TOKENS = ''.join(f"TOKEN: '{t}'\n{{{{{t}}}}}\n" for t in NotificationContextData().keys())
|
||||
|
||||
from changedetectionio.notification import (
|
||||
default_notification_body,
|
||||
default_notification_format,
|
||||
@@ -24,16 +27,14 @@ from changedetectionio.notification import (
|
||||
|
||||
|
||||
def get_last_message_from_smtp_server():
|
||||
import socket
|
||||
port = 11080 # socket server port number
|
||||
|
||||
client_socket = socket.socket() # instantiate
|
||||
client_socket.connect((smtp_test_server, port)) # connect to the server
|
||||
|
||||
data = client_socket.recv(50024).decode() # receive response
|
||||
import requests
|
||||
time.sleep(1) # wait for any smtp connects to die off
|
||||
port = 11080 # HTTP server port number
|
||||
# Make HTTP GET request to Flask server
|
||||
response = requests.get(f'http://{smtp_test_server}:{port}/')
|
||||
data = response.text
|
||||
logging.info("get_last_message_from_smtp_server..")
|
||||
logging.info(data)
|
||||
client_socket.close() # close the connection
|
||||
return data
|
||||
|
||||
|
||||
@@ -52,7 +53,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
|
||||
"application-notification_format": 'HTML',
|
||||
"application-notification_format": 'html',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -121,7 +122,7 @@ def test_check_notification_plaintext_format(client, live_server, measure_memory
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'Plain Text',
|
||||
"application-notification_format": 'text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -172,8 +173,8 @@ def test_check_notification_html_color_format(client, live_server, measure_memor
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'HTML Color',
|
||||
"application-notification_body": f"some text\n{default_notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
|
||||
"application-notification_format": 'htmlcolor',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -225,8 +226,9 @@ def test_check_notification_html_color_format(client, live_server, measure_memor
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert HTML_REMOVED_STYLE in html_content
|
||||
assert HTML_CHANGED_STYLE or HTML_REMOVED_STYLE in html_content
|
||||
assert HTML_ADDED_STYLE in html_content
|
||||
assert '<' not in html_content
|
||||
|
||||
assert 'some text<br>' in html_content
|
||||
delete_all_watches(client)
|
||||
@@ -243,7 +245,7 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "*header*\n\nsome text\n" + default_notification_body,
|
||||
"application-notification_format": 'Markdown to HTML',
|
||||
"application-notification_format": 'markdown',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -288,7 +290,8 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
# We wont see anything in the "FALLBACK" text but that's OK (no added/strikethrough etc)
|
||||
assert 'So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
|
||||
# Second part should be text/html and roughly converted from markdown to HTML
|
||||
@@ -296,10 +299,10 @@ def test_check_notification_markdown_format(client, live_server, measure_memory_
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert '<p><em>header</em></p>' in html_content
|
||||
assert '(added) So let\'s see what happens.<br' in html_content
|
||||
assert '<strong>So let\'s see what happens.</strong><br>' in html_content # Additions are <strong> in markdown
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
# Custom notification body with HTML, that is either sent as HTML or rendered to plaintext and sent
|
||||
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage):
|
||||
|
||||
# HTML problems? see this
|
||||
@@ -326,7 +329,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": notification_body,
|
||||
"application-notification_format": 'Plain Text',
|
||||
"application-notification_format": 'text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -334,7 +337,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
test_url = url_for('test_endpoint',content_type="text/html", _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
@@ -343,6 +346,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
#################################### FIRST SITUATION, PLAIN TEXT NOTIFICATION IS WANTED BUT WE HAVE HTML IN OUR TEMPLATE AND CONTENT ##########
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
@@ -365,14 +369,17 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
# Get the plain text content
|
||||
text_content = msg.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert '<!DOCTYPE html>' in text_content # even tho they added html, they selected plaintext so it should have not got converted
|
||||
|
||||
|
||||
#################################### SECOND SITUATION, HTML IS CORRECTLY PASSED THROUGH TO THE EMAIL ####################
|
||||
set_original_response()
|
||||
# Now override as HTML format
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
data={
|
||||
"url": test_url,
|
||||
"notification_format": 'HTML',
|
||||
"notification_format": 'html',
|
||||
'fetch_backend': "html_requests",
|
||||
"time_between_check_use_default": "y"},
|
||||
follow_redirects=True
|
||||
@@ -405,10 +412,271 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert '(removed) So let\'s see what happens.<br>' in html_content # the html part
|
||||
assert '(removed) So let\'s see what happens.' in html_content # the html part
|
||||
assert '<!DOCTYPE html' not in html_content
|
||||
assert '<!DOCTYPE html' in html_content # Our original template is working correctly
|
||||
|
||||
# https://github.com/dgtlmoon/changedetection.io/issues/2103
|
||||
assert '<h1>Test</h1>' in html_content
|
||||
assert '<' not in html_content
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_check_plaintext_document_plaintext_notification_smtp(client, live_server, measure_memory_usage):
|
||||
"""When following a plaintext document, notification in Plain Text format is sent correctly"""
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("Some nice plain text\nwhich we add some extra data\nover here\n")
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
notification_body = f"""{default_notification_body}"""
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
|
||||
"application-notification_format": 'text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Change the content
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n")
|
||||
|
||||
|
||||
time.sleep(1)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy)
|
||||
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
body = msg.get_content()
|
||||
# nothing is escaped, raw html stuff in text/plain
|
||||
assert 'talk about <title> tags' in body
|
||||
assert '(added)' in body
|
||||
assert '<br' not in body
|
||||
assert '<' not in body
|
||||
assert '<pre' not in body
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_check_plaintext_document_html_notifications(client, live_server, measure_memory_usage):
|
||||
"""When following a plaintext document, notification in Plain Text format is sent correctly"""
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write(" Some nice plain text\nwhich we add some extra data\nover here\n")
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
notification_body = f"""{default_notification_body}"""
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
|
||||
"application-notification_format": 'html',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Change the content
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write(" Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n")
|
||||
|
||||
|
||||
time.sleep(2)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy)
|
||||
|
||||
|
||||
# The email should have two bodies (multipart/alternative)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
|
||||
|
||||
assert 'And let\'s talk about <title> tags\r\n' in text_content
|
||||
assert '<br' not in text_content
|
||||
assert '<span' not in text_content
|
||||
|
||||
|
||||
assert 'talk about <title>' not in html_content # the html part, should have got marked up to < etc
|
||||
assert 'talk about <title>' in html_content
|
||||
# Should be the HTML, but not HTML Color
|
||||
assert 'background-color' not in html_content
|
||||
assert '<br>(added) And let's talk about <title> tags<br>' in html_content
|
||||
assert '<br' not in html_content
|
||||
assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py
|
||||
|
||||
# And now for the whitespace retention
|
||||
assert ' Some nice plain text' in html_content
|
||||
assert '(added) And let' in html_content # just to show a single whitespace didnt get touched
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_check_plaintext_document_html_color_notifications(client, live_server, measure_memory_usage):
|
||||
"""When following a plaintext document, notification in Plain Text format is sent correctly"""
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("Some nice plain text\nwhich we add some extra data\nover here\n")
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
notification_body = f"""{default_notification_body}"""
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
|
||||
"application-notification_format": 'htmlcolor',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Change the content
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("Some nice plain text\nwhich we add some extra data\nAnd let's talk about <title> tags\nover here\n")
|
||||
|
||||
time.sleep(1)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
|
||||
|
||||
assert 'And let\'s talk about <title> tags\r\n' in text_content
|
||||
assert '<br' not in text_content
|
||||
assert '<span' not in text_content
|
||||
|
||||
assert 'talk about <title>' not in html_content # the html part, should have got marked up to < etc
|
||||
assert 'talk about <title>' in html_content
|
||||
# Should be the HTML, but not HTML Color
|
||||
assert 'background-color' in html_content
|
||||
assert '(added) And let' not in html_content
|
||||
assert '<br' not in html_content
|
||||
assert '<br>' in html_content
|
||||
assert '<pre role="article"' in html_content # Should have got wrapped nicely in email_helpers.py
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_check_html_document_plaintext_notification(client, live_server, measure_memory_usage):
|
||||
"""When following a HTML document, notification in Plain Text format is sent correctly"""
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("<html><body>some stuff<br>and more stuff<br>and even more stuff<br></body></html>")
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
notification_body = f"""{default_notification_body}"""
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": f"{notification_body}\nMore output test\n{ALL_MARKUP_TOKENS}",
|
||||
"application-notification_format": 'text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||
f.write("<html><body>sxome stuff<br>and more stuff<br>lets slip this in<br>and this in<br>and even more stuff<br><tag></body></html>")
|
||||
|
||||
time.sleep(0.1)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(get_last_message_from_smtp_server(), policy=email_policy)
|
||||
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
body = msg.get_content()
|
||||
|
||||
assert '<tag>' in body # Should have got converted from original HTML to plaintext
|
||||
assert '(changed) some stuff\r\n' in body
|
||||
assert '(into) sxome stuff\r\n' in body
|
||||
assert '(added) lets slip this in\r\n' in body
|
||||
assert '(added) and this in\r\n' in body
|
||||
assert ' ' not in body
|
||||
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
|
||||
@@ -124,7 +124,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
|
||||
"application-notification_body": 'triggered text was -{{triggered_text}}- ### 网站监测 内容更新了 ####',
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
|
||||
"application-notification_urls": test_notification_url,
|
||||
"application-notification_format": 'Plain Text',
|
||||
"application-notification_format": 'text',
|
||||
"application-minutes_between_check": 180,
|
||||
"application-fetch_backend": "html_requests"
|
||||
},
|
||||
|
||||
@@ -370,7 +370,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
||||
|
||||
######################################################
|
||||
|
||||
# HTTP PUT try a field that doenst exist
|
||||
# HTTP PUT try a field that doesn't exist
|
||||
|
||||
# HTTP PUT an update
|
||||
res = client.put(
|
||||
@@ -383,6 +383,17 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
|
||||
# Message will come from `flask_expects_json`
|
||||
assert b'Additional properties are not allowed' in res.data
|
||||
|
||||
|
||||
# Try a XSS URL
|
||||
res = client.put(
|
||||
url_for("watch", uuid=watch_uuid),
|
||||
headers={'x-api-key': api_key, 'content-type': 'application/json'},
|
||||
data=json.dumps({
|
||||
'url': 'javascript:alert(document.domain)'
|
||||
}),
|
||||
)
|
||||
assert res.status_code == 400
|
||||
|
||||
# Cleanup everything
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -394,7 +405,8 @@ def test_api_import(client, live_server, measure_memory_usage):
|
||||
res = client.post(
|
||||
url_for("import") + "?tag=import-test",
|
||||
data='https://website1.com\r\nhttps://website2.com',
|
||||
headers={'x-api-key': api_key, 'content-type': 'text/plain'},
|
||||
# We removed 'content-type': 'text/plain', the Import API should assume this if none is set #3547 #3542
|
||||
headers={'x-api-key': api_key},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
|
||||
@@ -86,7 +86,7 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
||||
"Diff Full: {{diff_full}}\n"
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_format": 'Plain Text'}
|
||||
"notification_format": 'text'}
|
||||
|
||||
notification_form_data.update({
|
||||
"url": test_url,
|
||||
|
||||
@@ -63,7 +63,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
|
||||
"Diff Full: {{diff_full}}\n"
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_format": 'Plain Text',
|
||||
"notification_format": 'text',
|
||||
"fetch_backend": "html_requests",
|
||||
"filter_failure_notification_send": 'y',
|
||||
"time_between_check_use_default": "y",
|
||||
@@ -175,13 +175,13 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
|
||||
|
||||
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage):
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('HTML Color'))
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('htmlcolor'))
|
||||
# Check markup send conversion didnt affect plaintext preference
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Plain Text'))
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('text'))
|
||||
|
||||
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage):
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('HTML Color'))
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('htmlcolor'))
|
||||
|
||||
# Test that notification is never sent
|
||||
|
||||
|
||||
@@ -195,7 +195,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage):
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_screenshot": True,
|
||||
"notification_format": 'Plain Text',
|
||||
"notification_format": 'text',
|
||||
"title": "test-tag"}
|
||||
|
||||
res = client.post(
|
||||
|
||||
@@ -170,3 +170,160 @@ def test_default_timezone_subtraction(environment):
|
||||
finalRender = render("{% now '' - 'minutes=11' %}")
|
||||
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||
|
||||
def test_regex_replace_basic():
|
||||
"""Test basic regex_replace functionality."""
|
||||
|
||||
# Simple word replacement
|
||||
finalRender = render("{{ 'hello world' | regex_replace('world', 'universe') }}")
|
||||
assert finalRender == "hello universe"
|
||||
|
||||
def test_regex_replace_with_groups():
|
||||
"""Test regex_replace with capture groups (issue #3501 use case)."""
|
||||
|
||||
# Transform HTML table data as described in the issue
|
||||
template = "{{ '<td>thing</td><td>other</td>' | regex_replace('<td>([^<]+)</td><td>([^<]+)</td>', 'ThingLabel: \\\\1\\nOtherLabel: \\\\2') }}"
|
||||
finalRender = render(template)
|
||||
assert "ThingLabel: thing" in finalRender
|
||||
assert "OtherLabel: other" in finalRender
|
||||
|
||||
def test_regex_replace_multiple_matches():
|
||||
"""Test regex_replace replacing multiple occurrences."""
|
||||
|
||||
finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux') }}")
|
||||
assert finalRender == "qux bar qux baz"
|
||||
|
||||
def test_regex_replace_count_parameter():
|
||||
"""Test regex_replace with count parameter to limit replacements."""
|
||||
|
||||
finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux', 1) }}")
|
||||
assert finalRender == "qux bar foo baz"
|
||||
|
||||
def test_regex_replace_empty_replacement():
|
||||
"""Test regex_replace with empty replacement (removal)."""
|
||||
|
||||
finalRender = render("{{ 'hello world 123' | regex_replace('[0-9]+', '') }}")
|
||||
assert finalRender == "hello world "
|
||||
|
||||
def test_regex_replace_no_match():
|
||||
"""Test regex_replace when pattern doesn't match."""
|
||||
|
||||
finalRender = render("{{ 'hello world' | regex_replace('xyz', 'abc') }}")
|
||||
assert finalRender == "hello world"
|
||||
|
||||
def test_regex_replace_invalid_regex():
|
||||
"""Test regex_replace with invalid regex pattern returns original value."""
|
||||
|
||||
# Invalid regex (unmatched parenthesis)
|
||||
finalRender = render("{{ 'hello world' | regex_replace('(invalid', 'replacement') }}")
|
||||
assert finalRender == "hello world"
|
||||
|
||||
def test_regex_replace_special_characters():
|
||||
"""Test regex_replace with special regex characters."""
|
||||
|
||||
finalRender = render("{{ 'Price: $50.00' | regex_replace('\\\\$([0-9.]+)', 'USD \\\\1') }}")
|
||||
assert finalRender == "Price: USD 50.00"
|
||||
|
||||
def test_regex_replace_multiline():
|
||||
"""Test regex_replace on multiline text."""
|
||||
|
||||
template = "{{ 'line1\\nline2\\nline3' | regex_replace('^line', 'row') }}"
|
||||
finalRender = render(template)
|
||||
# By default re.sub doesn't use MULTILINE flag, so only first line matches with ^
|
||||
assert finalRender == "row1\nline2\nline3"
|
||||
|
||||
def test_regex_replace_with_notification_context():
|
||||
"""Test regex_replace with notification diff variable."""
|
||||
|
||||
# Simulate how it would be used in notifications with diff variable
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
|
||||
context = NotificationContextData()
|
||||
context['diff'] = '<td>value1</td><td>value2</td>'
|
||||
|
||||
template = "{{ diff | regex_replace('<td>([^<]+)</td>', '\\\\1 ') }}"
|
||||
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals.update(context)
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
assert "value1 value2 " in finalRender
|
||||
|
||||
def test_regex_replace_security_large_input():
|
||||
"""Test regex_replace handles large input safely."""
|
||||
|
||||
# Create a large input string (over 10MB)
|
||||
large_input = "x" * (1024 * 1024 * 10 + 1000)
|
||||
template = "{{ large_input | regex_replace('x', 'y') }}"
|
||||
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals['large_input'] = large_input
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
# Should be truncated to 10MB
|
||||
assert len(finalRender) == 1024 * 1024 * 10
|
||||
|
||||
def test_regex_replace_security_long_pattern():
|
||||
"""Test regex_replace rejects very long patterns."""
|
||||
|
||||
# Pattern longer than 500 chars should be rejected
|
||||
long_pattern = "a" * 501
|
||||
finalRender = render("{{ 'test' | regex_replace('" + long_pattern + "', 'replacement') }}")
|
||||
|
||||
# Should return original value when pattern is too long
|
||||
assert finalRender == "test"
|
||||
|
||||
def test_regex_replace_security_dangerous_pattern():
|
||||
"""Test regex_replace detects and rejects dangerous nested quantifiers."""
|
||||
|
||||
# Patterns that could cause catastrophic backtracking
|
||||
dangerous_patterns = [
|
||||
"(a+)+",
|
||||
"(a*)+",
|
||||
"(a+)*",
|
||||
"(a*)*",
|
||||
]
|
||||
|
||||
for dangerous in dangerous_patterns:
|
||||
# Create a template with the dangerous pattern
|
||||
# Using single quotes to avoid escaping issues
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals['pattern'] = dangerous
|
||||
template = "{{ 'aaaaaaaaaa' | regex_replace(pattern, 'x') }}"
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
# Should return original value when dangerous pattern is detected
|
||||
assert finalRender == "aaaaaaaaaa"
|
||||
|
||||
def test_regex_replace_security_timeout_protection():
|
||||
"""Test that regex_replace has timeout protection (if SIGALRM available)."""
|
||||
import signal
|
||||
|
||||
# Only test on systems that support SIGALRM
|
||||
if not hasattr(signal, 'SIGALRM'):
|
||||
# Skip test on Windows and other systems without SIGALRM
|
||||
return
|
||||
|
||||
# This pattern is known to cause exponential backtracking on certain inputs
|
||||
# but should be caught by our dangerous pattern detector
|
||||
# We're mainly testing that the timeout mechanism works
|
||||
|
||||
from changedetectionio.jinja2_custom import regex_replace
|
||||
|
||||
# Create input that could trigger slow regex
|
||||
test_input = "a" * 50 + "b"
|
||||
|
||||
# This shouldn't take long due to our protections
|
||||
result = regex_replace(test_input, "a+b", "x")
|
||||
|
||||
# Should complete and return a result
|
||||
assert result is not None
|
||||
@@ -13,10 +13,10 @@ import base64
|
||||
from changedetectionio.notification import (
|
||||
default_notification_body,
|
||||
default_notification_format,
|
||||
default_notification_title,
|
||||
valid_notification_formats,
|
||||
default_notification_title, valid_notification_formats
|
||||
)
|
||||
|
||||
from ..diff import HTML_CHANGED_STYLE
|
||||
from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
|
||||
|
||||
# Hard to just add more live server URLs when one test is already running (I think)
|
||||
@@ -47,6 +47,14 @@ def test_check_notification(client, live_server, measure_memory_usage):
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
res = client.get(url_for("settings.settings_page"))
|
||||
for k,v in valid_notification_formats.items():
|
||||
if k == USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH:
|
||||
continue
|
||||
assert f'value="{k}"'.encode() in res.data # Should be by key NOT value
|
||||
assert f'value="{v}"'.encode() not in res.data # Should be by key NOT value
|
||||
|
||||
|
||||
# When test mode is in BASE_URL env mode, we should see this already configured
|
||||
env_base_url = os.getenv('BASE_URL', '').strip()
|
||||
if len(env_base_url):
|
||||
@@ -101,7 +109,7 @@ def test_check_notification(client, live_server, measure_memory_usage):
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_screenshot": True,
|
||||
"notification_format": 'Plain Text'}
|
||||
"notification_format": 'text'}
|
||||
|
||||
notification_form_data.update({
|
||||
"url": test_url,
|
||||
@@ -267,7 +275,7 @@ def test_notification_validation(client, live_server, measure_memory_usage):
|
||||
# data={"notification_urls": 'json://localhost/foobar',
|
||||
# "notification_title": "",
|
||||
# "notification_body": "",
|
||||
# "notification_format": 'Plain Text',
|
||||
# "notification_format": 'text',
|
||||
# "url": test_url,
|
||||
# "tag": "my tag",
|
||||
# "title": "my title",
|
||||
@@ -467,6 +475,25 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
|
||||
# Should come from notification.py default handler when there is no notification body to pull from
|
||||
assert 'change detection is cool 网站监测 内容更新了' in x
|
||||
|
||||
## Check that 'test' catches errors
|
||||
test_notification_url = 'post://akjsdfkjasdkfjasdkfjasdkjfas232323/should-error'
|
||||
|
||||
######### Test global/system settings
|
||||
res = client.post(
|
||||
url_for("ui.ui_notification.ajax_callback_send_notification_test")+"?mode=global-settings",
|
||||
data={"notification_urls": test_notification_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert res.status_code == 400
|
||||
assert (
|
||||
b"No address found" in res.data or
|
||||
b"Name or service not known" in res.data or
|
||||
b"nodename nor servname provided" in res.data or
|
||||
b"Temporary failure in name resolution" in res.data or
|
||||
b"Failed to establish a new connection" in res.data or
|
||||
b"Connection error occurred" in res.data
|
||||
)
|
||||
|
||||
client.get(
|
||||
url_for("ui.form_delete", uuid="all"),
|
||||
follow_redirects=True
|
||||
@@ -483,9 +510,8 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
|
||||
assert b"Error: You must have atleast one watch configured for 'test notification' to work" in res.data
|
||||
|
||||
|
||||
def _test_color_notifications(client, notification_body_token):
|
||||
|
||||
from changedetectionio.diff import HTML_ADDED_STYLE, HTML_REMOVED_STYLE
|
||||
def _test_color_notifications(client, notification_body_token):
|
||||
|
||||
set_original_response()
|
||||
|
||||
@@ -503,7 +529,7 @@ def _test_color_notifications(client, notification_body_token):
|
||||
"application-fetch_backend": "html_requests",
|
||||
"application-minutes_between_check": 180,
|
||||
"application-notification_body": notification_body_token,
|
||||
"application-notification_format": "HTML Color",
|
||||
"application-notification_format": "htmlcolor",
|
||||
"application-notification_urls": test_notification_url,
|
||||
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }}",
|
||||
},
|
||||
@@ -533,7 +559,8 @@ def _test_color_notifications(client, notification_body_token):
|
||||
|
||||
with open("test-datastore/notification.txt", 'r') as f:
|
||||
x = f.read()
|
||||
assert f'<span style="{HTML_REMOVED_STYLE}">Which is across multiple lines' in x
|
||||
s = f'<span style="{HTML_CHANGED_STYLE}" role="note" aria-label="Changed text" title="Changed text">Which is across multiple lines'
|
||||
assert s in x
|
||||
|
||||
|
||||
client.get(
|
||||
|
||||
@@ -30,7 +30,7 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
|
||||
data={"notification_urls": f"{broken_notification_url}\r\n{working_notification_url}",
|
||||
"notification_title": "xxx",
|
||||
"notification_body": "xxxxx",
|
||||
"notification_format": 'Plain Text',
|
||||
"notification_format": 'text',
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
"title": "",
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
import os
|
||||
|
||||
from flask import url_for
|
||||
|
||||
from changedetectionio.tests.util import set_modified_response
|
||||
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
from .. import strtobool
|
||||
|
||||
@@ -132,6 +134,26 @@ def test_xss(client, live_server, measure_memory_usage):
|
||||
assert b"<img src=x onerror=alert(" not in res.data
|
||||
assert b"<img" in res.data
|
||||
|
||||
# Check that even forcing an update directly still doesnt get to the frontend
|
||||
set_original_response()
|
||||
XSS_HACK = 'javascript:alert(document.domain)'
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
set_modified_response()
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
live_server.app.config['DATASTORE'].data['watching'][uuid]['url']=XSS_HACK
|
||||
|
||||
|
||||
res = client.get(url_for("ui.ui_views.preview_page", uuid=uuid))
|
||||
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
|
||||
client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
|
||||
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
|
||||
res = client.get(url_for("watchlist.index"))
|
||||
assert XSS_HACK.encode('utf-8') not in res.data and res.status_code == 200
|
||||
|
||||
|
||||
def test_xss_watch_last_error(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
153
changedetectionio/tests/test_xpath_default_namespace.py
Normal file
153
changedetectionio/tests/test_xpath_default_namespace.py
Normal file
@@ -0,0 +1,153 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for XPath default namespace handling in RSS/Atom feeds.
|
||||
Tests the fix for issue where //title/text() returns empty on feeds with default namespaces.
|
||||
|
||||
Real-world test data from https://github.com/microsoft/PowerToys/releases.atom
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import html_tools
|
||||
|
||||
|
||||
# Real-world Atom feed with default namespace from GitHub PowerToys releases
|
||||
# This is the actual format that was failing before the fix
|
||||
atom_feed_with_default_ns = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<feed xmlns="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" xml:lang="en-US">
|
||||
<id>tag:github.com,2008:https://github.com/microsoft/PowerToys/releases</id>
|
||||
<link type="text/html" rel="alternate" href="https://github.com/microsoft/PowerToys/releases"/>
|
||||
<link type="application/atom+xml" rel="self" href="https://github.com/microsoft/PowerToys/releases.atom"/>
|
||||
<title>Release notes from PowerToys</title>
|
||||
<updated>2025-10-23T08:53:12Z</updated>
|
||||
<entry>
|
||||
<id>tag:github.com,2008:Repository/184456251/v0.95.1</id>
|
||||
<updated>2025-10-24T14:20:14Z</updated>
|
||||
<link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.1"/>
|
||||
<title>Release 0.95.1</title>
|
||||
<content type="html"><p>This patch release fixes several important stability issues.</p></content>
|
||||
<author>
|
||||
<name>Jaylyn-Barbee</name>
|
||||
</author>
|
||||
</entry>
|
||||
<entry>
|
||||
<id>tag:github.com,2008:Repository/184456251/v0.95.0</id>
|
||||
<updated>2025-10-17T12:51:21Z</updated>
|
||||
<link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.0"/>
|
||||
<title>Release v0.95.0</title>
|
||||
<content type="html"><p>New features, stability, optimization improvements.</p></content>
|
||||
<author>
|
||||
<name>Jaylyn-Barbee</name>
|
||||
</author>
|
||||
</entry>
|
||||
</feed>"""
|
||||
|
||||
# RSS feed without default namespace
|
||||
rss_feed_no_default_ns = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Channel Title</title>
|
||||
<description>Channel Description</description>
|
||||
<item>
|
||||
<title>Item 1 Title</title>
|
||||
<description>Item 1 Description</description>
|
||||
</item>
|
||||
<item>
|
||||
<title>Item 2 Title</title>
|
||||
<description>Item 2 Description</description>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>"""
|
||||
|
||||
# RSS 2.0 feed with namespace prefix (not default)
|
||||
rss_feed_with_ns_prefix = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:content="http://purl.org/rss/1.0/modules/content/"
|
||||
xmlns:atom="http://www.w3.org/2005/Atom"
|
||||
version="2.0">
|
||||
<channel>
|
||||
<title>Channel Title</title>
|
||||
<atom:link href="http://example.com/feed" rel="self" type="application/rss+xml"/>
|
||||
<item>
|
||||
<title>Item Title</title>
|
||||
<dc:creator>Author Name</dc:creator>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>"""
|
||||
|
||||
|
||||
class TestXPathDefaultNamespace:
|
||||
"""Test XPath queries on feeds with and without default namespaces."""
|
||||
|
||||
def test_atom_feed_simple_xpath_with_xpath_filter(self):
|
||||
"""Test that //title/text() works on Atom feed with default namespace using xpath_filter."""
|
||||
result = html_tools.xpath_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
assert 'Release v0.95.0' in result
|
||||
|
||||
def test_atom_feed_nested_xpath_with_xpath_filter(self):
|
||||
"""Test nested XPath like //entry/title/text() on Atom feed."""
|
||||
result = html_tools.xpath_filter('//entry/title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release 0.95.1' in result
|
||||
assert 'Release v0.95.0' in result
|
||||
# Should NOT include the feed title
|
||||
assert 'Release notes from PowerToys' not in result
|
||||
|
||||
def test_atom_feed_other_elements_with_xpath_filter(self):
|
||||
"""Test that other elements like //updated/text() work on Atom feed."""
|
||||
result = html_tools.xpath_filter('//updated/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert '2025-10-23T08:53:12Z' in result
|
||||
assert '2025-10-24T14:20:14Z' in result
|
||||
|
||||
def test_rss_feed_without_namespace(self):
|
||||
"""Test that //title/text() works on RSS feed without default namespace."""
|
||||
result = html_tools.xpath_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item 1 Title' in result
|
||||
assert 'Item 2 Title' in result
|
||||
|
||||
def test_rss_feed_nested_xpath(self):
|
||||
"""Test nested XPath on RSS feed without default namespace."""
|
||||
result = html_tools.xpath_filter('//item/title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Item 1 Title' in result
|
||||
assert 'Item 2 Title' in result
|
||||
# Should NOT include channel title
|
||||
assert 'Channel Title' not in result
|
||||
|
||||
def test_rss_feed_with_prefixed_namespaces(self):
|
||||
"""Test that feeds with namespace prefixes (not default) still work."""
|
||||
result = html_tools.xpath_filter('//title/text()', rss_feed_with_ns_prefix, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item Title' in result
|
||||
|
||||
def test_local_name_workaround_still_works(self):
|
||||
"""Test that local-name() workaround still works for Atom feeds."""
|
||||
result = html_tools.xpath_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
|
||||
def test_xpath1_filter_without_default_namespace(self):
|
||||
"""Test xpath1_filter works on RSS without default namespace."""
|
||||
result = html_tools.xpath1_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item 1 Title' in result
|
||||
|
||||
def test_xpath1_filter_with_default_namespace_returns_empty(self):
|
||||
"""Test that xpath1_filter returns empty on Atom with default namespace (known limitation)."""
|
||||
result = html_tools.xpath1_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
# xpath1_filter (lxml) doesn't support default namespaces, so this returns empty
|
||||
assert result == ''
|
||||
|
||||
def test_xpath1_filter_local_name_workaround(self):
|
||||
"""Test that xpath1_filter works with local-name() workaround on Atom feeds."""
|
||||
result = html_tools.xpath1_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
@@ -244,7 +244,7 @@ def new_live_server_setup(live_server):
|
||||
return request.method
|
||||
|
||||
# Where we POST to as a notification, also use a space here to test URL escaping is OK across all tests that use this. ( #2868 )
|
||||
@live_server.app.route('/test_notification endpoint', methods=['POST', 'GET'])
|
||||
@live_server.app.route('/test_notification_endpoint', methods=['POST', 'GET'])
|
||||
def test_notification_endpoint():
|
||||
|
||||
with open("test-datastore/notification.txt", "wb") as f:
|
||||
|
||||
@@ -1,3 +1,5 @@
|
||||
from functools import lru_cache
|
||||
|
||||
import arrow
|
||||
from enum import IntEnum
|
||||
|
||||
@@ -12,7 +14,7 @@ class Weekday(IntEnum):
|
||||
Saturday = 5
|
||||
Sunday = 6
|
||||
|
||||
|
||||
@lru_cache(maxsize=100)
|
||||
def am_i_inside_time(
|
||||
day_of_week: str,
|
||||
time_str: str,
|
||||
|
||||
@@ -2,6 +2,9 @@
|
||||
|
||||
import sys
|
||||
import os
|
||||
|
||||
from changedetectionio.model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
|
||||
sys.path.insert(0, os.path.join(os.path.dirname(__file__), '../..'))
|
||||
|
||||
from changedetectionio.widgets import TernaryNoneBooleanField
|
||||
@@ -93,7 +96,7 @@ def test_custom_text():
|
||||
print(f"Does NOT contain 'System default': {'System default' not in boolean_html}")
|
||||
print(f"Does NOT contain 'Default': {'Default' not in boolean_html}")
|
||||
assert 'Enabled' in boolean_html and 'Disabled' in boolean_html
|
||||
assert 'System default' not in boolean_html and 'Default' not in boolean_html
|
||||
assert USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH not in boolean_html and 'Default' not in boolean_html
|
||||
|
||||
# Test FontAwesome field
|
||||
print("\n--- FontAwesome Icons Field ---")
|
||||
|
||||
@@ -28,7 +28,7 @@ info:
|
||||
|
||||
For example: `x-api-key: YOUR_API_KEY`
|
||||
|
||||
version: 0.1.1
|
||||
version: 0.1.2
|
||||
contact:
|
||||
name: ChangeDetection.io
|
||||
url: https://github.com/dgtlmoon/changedetection.io
|
||||
@@ -143,7 +143,7 @@ components:
|
||||
paused:
|
||||
type: boolean
|
||||
description: Whether the web page change monitor (watch) is paused
|
||||
muted:
|
||||
notification_muted:
|
||||
type: boolean
|
||||
description: Whether notifications are muted
|
||||
method:
|
||||
@@ -207,7 +207,7 @@ components:
|
||||
maxLength: 5000
|
||||
notification_format:
|
||||
type: string
|
||||
enum: [Text, HTML, Markdown]
|
||||
enum: ['text', 'html', 'htmlcolor', 'markdown', 'System default']
|
||||
description: Format for notifications
|
||||
track_ldjson_price_data:
|
||||
type: boolean
|
||||
@@ -406,7 +406,7 @@ paths:
|
||||
page_title: "The HTML <title> from the page"
|
||||
tags: ["550e8400-e29b-41d4-a716-446655440000"]
|
||||
paused: false
|
||||
muted: false
|
||||
notification_muted: false
|
||||
method: "GET"
|
||||
fetch_backend: "html_requests"
|
||||
last_checked: 1640995200
|
||||
@@ -419,7 +419,7 @@ paths:
|
||||
page_title: "The HTML <title> from the page"
|
||||
tags: ["330e8400-e29b-41d4-a716-446655440001"]
|
||||
paused: false
|
||||
muted: true
|
||||
notification_muted: true
|
||||
method: "GET"
|
||||
fetch_backend: "html_webdriver"
|
||||
last_checked: 1640998800
|
||||
@@ -1224,7 +1224,7 @@ paths:
|
||||
title: "Example Website Monitor"
|
||||
tags: ["550e8400-e29b-41d4-a716-446655440000"]
|
||||
paused: false
|
||||
muted: false
|
||||
notification_muted: false
|
||||
|
||||
/import:
|
||||
post:
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
|
||||
feedgen~=0.9
|
||||
feedgen~=1.0
|
||||
feedparser~=6.0 # For parsing RSS/Atom feeds
|
||||
flask-compress
|
||||
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
|
||||
@@ -12,7 +12,7 @@ janus # Thread-safe async/sync queue bridge
|
||||
flask_wtf~=1.2
|
||||
flask~=3.1
|
||||
flask-socketio~=5.5.1
|
||||
python-socketio~=5.13.0
|
||||
python-socketio~=5.14.2
|
||||
python-engineio~=4.12.3
|
||||
inscriptis~=2.2
|
||||
pytz
|
||||
@@ -22,7 +22,7 @@ validators~=0.35
|
||||
|
||||
# Set these versions together to avoid a RequestsDependencyWarning
|
||||
# >= 2.26 also adds Brotli support if brotli is installed
|
||||
brotli~=1.0
|
||||
brotli~=1.1
|
||||
requests[socks]
|
||||
requests-file
|
||||
|
||||
@@ -30,7 +30,7 @@ requests-file
|
||||
# If specific version needed for security, use urllib3>=1.26.19,<3.0
|
||||
chardet>2.3.0
|
||||
|
||||
wtforms~=3.0
|
||||
wtforms~=3.2
|
||||
jsonpath-ng~=1.5.3
|
||||
|
||||
# dnspython - Used by paho-mqtt for MQTT broker resolution
|
||||
|
||||
Reference in New Issue
Block a user