mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-07 01:56:53 +00:00
Compare commits
17 Commits
0.50.26
...
regex-filt
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0e9f01639a | ||
|
|
b236c8b24b | ||
|
|
27cedc9fa4 | ||
|
|
a51614f83d | ||
|
|
07f98d6bd3 | ||
|
|
f71550da4d | ||
|
|
8c3d0d7e31 | ||
|
|
46658a85d6 | ||
|
|
d699652955 | ||
|
|
9e88db5d9b | ||
|
|
5d9c102aff | ||
|
|
cb1c36d97d | ||
|
|
cc29ba5ea9 | ||
|
|
6f371b1bc6 | ||
|
|
785dabd071 | ||
|
|
09914d54a0 | ||
|
|
58b5586674 |
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.26'
|
||||
__version__ = '0.50.29'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from changedetectionio.notification.handler import apply_service_tweaks
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from feedgen.feed import FeedGenerator
|
||||
from flask import Blueprint, make_response, request, url_for, redirect
|
||||
@@ -120,9 +121,13 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
||||
include_equal=False,
|
||||
line_feed_sep="<br>",
|
||||
html_colour=html_colour_enable
|
||||
line_feed_sep="<br>"
|
||||
)
|
||||
|
||||
|
||||
requested_output_format = 'htmlcolor' if html_colour_enable else 'html'
|
||||
html_diff = apply_service_tweaks(url='', n_body=html_diff, n_title=None, requested_output_format=requested_output_format)
|
||||
|
||||
except FileNotFoundError as e:
|
||||
html_diff = f"History snapshot file for watch {watch.get('uuid')}@{watch.last_changed} - '{watch.get('title')} not found."
|
||||
|
||||
|
||||
@@ -1,8 +1,21 @@
|
||||
import difflib
|
||||
from typing import List, Iterator, Union
|
||||
|
||||
REMOVED_STYLE = "background-color: #fadad7; color: #b30000;"
|
||||
ADDED_STYLE = "background-color: #eaf2c2; color: #406619;"
|
||||
HTML_REMOVED_STYLE = "background-color: #fadad7; color: #b30000;"
|
||||
HTML_ADDED_STYLE = "background-color: #eaf2c2; color: #406619;"
|
||||
|
||||
# These get set to html or telegram type or discord compatible or whatever in handler.py
|
||||
REMOVED_PLACEMARKER_OPEN = '<<<removed_PLACEMARKER_OPEN'
|
||||
REMOVED_PLACEMARKER_CLOSED = '<<<removed_PLACEMARKER_CLOSED'
|
||||
|
||||
ADDED_PLACEMARKER_OPEN = '<<<added_PLACEMARKER_OPEN'
|
||||
ADDED_PLACEMARKER_CLOSED = '<<<added_PLACEMARKER_CLOSED'
|
||||
|
||||
CHANGED_PLACEMARKER_OPEN = '<<<changed_PLACEMARKER_OPEN'
|
||||
CHANGED_PLACEMARKER_CLOSED = '<<<changed_PLACEMARKER_CLOSED'
|
||||
|
||||
CHANGED_INTO_PLACEMARKER_OPEN = '<<<changed_into_PLACEMARKER_OPEN'
|
||||
CHANGED_INTO_PLACEMARKER_CLOSED = '<<<changed_into_PLACEMARKER_CLOSED'
|
||||
|
||||
def same_slicer(lst: List[str], start: int, end: int) -> List[str]:
|
||||
"""Return a slice of the list, or a single element if start == end."""
|
||||
@@ -15,8 +28,7 @@ def customSequenceMatcher(
|
||||
include_removed: bool = True,
|
||||
include_added: bool = True,
|
||||
include_replaced: bool = True,
|
||||
include_change_type_prefix: bool = True,
|
||||
html_colour: bool = False
|
||||
include_change_type_prefix: bool = True
|
||||
) -> Iterator[List[str]]:
|
||||
"""
|
||||
Compare two sequences and yield differences based on specified parameters.
|
||||
@@ -29,8 +41,6 @@ def customSequenceMatcher(
|
||||
include_added (bool): Include added parts
|
||||
include_replaced (bool): Include replaced parts
|
||||
include_change_type_prefix (bool): Add prefixes to indicate change types
|
||||
html_colour (bool): Use HTML background colors for differences
|
||||
|
||||
Yields:
|
||||
List[str]: Differences between sequences
|
||||
"""
|
||||
@@ -42,22 +52,22 @@ def customSequenceMatcher(
|
||||
if include_equal and tag == 'equal':
|
||||
yield before[alo:ahi]
|
||||
elif include_removed and tag == 'delete':
|
||||
if html_colour:
|
||||
yield [f'<span style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)]
|
||||
if include_change_type_prefix:
|
||||
yield [f'{REMOVED_PLACEMARKER_OPEN}{line}{REMOVED_PLACEMARKER_CLOSED}' for line in same_slicer(before, alo, ahi)]
|
||||
else:
|
||||
yield [f"(removed) {line}" for line in same_slicer(before, alo, ahi)] if include_change_type_prefix else same_slicer(before, alo, ahi)
|
||||
yield same_slicer(before, alo, ahi)
|
||||
elif include_replaced and tag == 'replace':
|
||||
if html_colour:
|
||||
yield [f'<span style="{REMOVED_STYLE}">{line}</span>' for line in same_slicer(before, alo, ahi)] + \
|
||||
[f'<span style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
|
||||
if include_change_type_prefix:
|
||||
yield [f'{CHANGED_PLACEMARKER_OPEN}{line}{CHANGED_PLACEMARKER_CLOSED}' for line in same_slicer(before, alo, ahi)] + \
|
||||
[f'{CHANGED_INTO_PLACEMARKER_OPEN}{line}{CHANGED_INTO_PLACEMARKER_CLOSED}' for line in same_slicer(after, blo, bhi)]
|
||||
else:
|
||||
yield [f"(changed) {line}" for line in same_slicer(before, alo, ahi)] + \
|
||||
[f"(into) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(before, alo, ahi) + same_slicer(after, blo, bhi)
|
||||
yield same_slicer(before, alo, ahi) + same_slicer(after, blo, bhi)
|
||||
elif include_added and tag == 'insert':
|
||||
if html_colour:
|
||||
yield [f'<span style="{ADDED_STYLE}">{line}</span>' for line in same_slicer(after, blo, bhi)]
|
||||
if include_change_type_prefix:
|
||||
yield [f'{ADDED_PLACEMARKER_OPEN}{line}{ADDED_PLACEMARKER_CLOSED}' for line in same_slicer(after, blo, bhi)]
|
||||
else:
|
||||
yield [f"(added) {line}" for line in same_slicer(after, blo, bhi)] if include_change_type_prefix else same_slicer(after, blo, bhi)
|
||||
yield same_slicer(after, blo, bhi)
|
||||
|
||||
|
||||
def render_diff(
|
||||
previous_version_file_contents: str,
|
||||
@@ -68,8 +78,7 @@ def render_diff(
|
||||
include_replaced: bool = True,
|
||||
line_feed_sep: str = "\n",
|
||||
include_change_type_prefix: bool = True,
|
||||
patch_format: bool = False,
|
||||
html_colour: bool = False
|
||||
patch_format: bool = False
|
||||
) -> str:
|
||||
"""
|
||||
Render the difference between two file contents.
|
||||
@@ -84,8 +93,6 @@ def render_diff(
|
||||
line_feed_sep (str): Separator for lines in output
|
||||
include_change_type_prefix (bool): Add prefixes to indicate change types
|
||||
patch_format (bool): Use patch format for output
|
||||
html_colour (bool): Use HTML background colors for differences
|
||||
|
||||
Returns:
|
||||
str: Rendered difference
|
||||
"""
|
||||
@@ -103,8 +110,7 @@ def render_diff(
|
||||
include_removed=include_removed,
|
||||
include_added=include_added,
|
||||
include_replaced=include_replaced,
|
||||
include_change_type_prefix=include_change_type_prefix,
|
||||
html_colour=html_colour
|
||||
include_change_type_prefix=include_change_type_prefix
|
||||
)
|
||||
|
||||
def flatten(lst: List[Union[str, List[str]]]) -> str:
|
||||
|
||||
@@ -761,6 +761,14 @@ class commonSettingsForm(Form):
|
||||
scheduler_timezone_default = StringField("Default timezone for watch check scheduler", render_kw={"list": "timezones"}, validators=[validateTimeZoneName()])
|
||||
webdriver_delay = IntegerField('Wait seconds before extracting text', validators=[validators.Optional(), validators.NumberRange(min=1, message="Should contain one or more seconds")])
|
||||
|
||||
# Not true anymore but keep the validate_ hook for future use, we convert color tags
|
||||
# def validate_notification_urls(self, field):
|
||||
# """Validate that HTML Color format is not used with Telegram"""
|
||||
# if self.notification_format.data == 'HTML Color' and field.data:
|
||||
# for url in field.data:
|
||||
# if url and ('tgram://' in url or 'discord://' in url or 'discord.com/api/webhooks' in url):
|
||||
# raise ValidationError('HTML Color format is not supported by Telegram and Discord. Please choose another Notification Format (Plain Text, HTML, or Markdown to HTML).')
|
||||
|
||||
|
||||
class importForm(Form):
|
||||
from . import processors
|
||||
|
||||
@@ -185,8 +185,21 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
|
||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||
html_block = ""
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser)
|
||||
#@note: //title/text() wont work where <title>CDATA..
|
||||
# Build namespace map for XPath queries
|
||||
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||
|
||||
# Handle default namespace in documents (common in RSS/Atom feeds, but can occur in any XML)
|
||||
# XPath spec: unprefixed element names have no namespace, not the default namespace
|
||||
# Solution: Register the default namespace with empty string prefix in elementpath
|
||||
# This is primarily for RSS/Atom feeds but works for any XML with default namespace
|
||||
if hasattr(tree, 'nsmap') and tree.nsmap and None in tree.nsmap:
|
||||
# Register the default namespace with empty string prefix for elementpath
|
||||
# This allows //title to match elements in the default namespace
|
||||
namespaces[''] = tree.nsmap[None]
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
|
||||
#@note: //title/text() now works with default namespaces (fixed by registering '' prefix)
|
||||
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
|
||||
|
||||
if type(r) != list:
|
||||
r = [r]
|
||||
@@ -221,8 +234,19 @@ def xpath1_filter(xpath_filter, html_content, append_pretty_line_formatting=Fals
|
||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||
html_block = ""
|
||||
|
||||
r = tree.xpath(xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'})
|
||||
#@note: //title/text() wont work where <title>CDATA..
|
||||
# Build namespace map for XPath queries
|
||||
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||
|
||||
# NOTE: lxml's native xpath() does NOT support empty string prefix for default namespace
|
||||
# For documents with default namespace (RSS/Atom feeds), users must use:
|
||||
# - local-name(): //*[local-name()='title']/text()
|
||||
# - Or use xpath_filter (not xpath1_filter) which supports default namespaces
|
||||
# XPath spec: unprefixed element names have no namespace, not the default namespace
|
||||
|
||||
r = tree.xpath(xpath_filter.strip(), namespaces=namespaces)
|
||||
#@note: xpath1 (lxml) does NOT automatically handle default namespaces
|
||||
#@note: Use //*[local-name()='element'] or switch to xpath_filter for default namespace support
|
||||
#@note: //title/text() wont work where <title>CDATA.. (use cdata_in_document_to_text first)
|
||||
|
||||
for element in r:
|
||||
# When there's more than 1 match, then add the suffix to separate each line
|
||||
@@ -408,6 +432,9 @@ def strip_ignore_text(content, wordlist, mode="content"):
|
||||
ignored_lines = []
|
||||
|
||||
for k in wordlist:
|
||||
# Skip empty strings to avoid matching everything
|
||||
if not k or not k.strip():
|
||||
continue
|
||||
# Is it a regex?
|
||||
res = re.search(PERL_STYLE_REGEX, k, re.IGNORECASE)
|
||||
if res:
|
||||
|
||||
@@ -9,6 +9,7 @@ from .safe_jinja import (
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE,
|
||||
DEFAULT_JINJA2_EXTENSIONS,
|
||||
)
|
||||
from .plugins.regex import regex_replace
|
||||
|
||||
__all__ = [
|
||||
'TimeExtension',
|
||||
@@ -17,4 +18,5 @@ __all__ = [
|
||||
'create_jinja_env',
|
||||
'JINJA2_MAX_RETURN_PAYLOAD_SIZE',
|
||||
'DEFAULT_JINJA2_EXTENSIONS',
|
||||
'regex_replace',
|
||||
]
|
||||
|
||||
6
changedetectionio/jinja2_custom/plugins/__init__.py
Normal file
6
changedetectionio/jinja2_custom/plugins/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
Jinja2 custom filter plugins for changedetection.io
|
||||
"""
|
||||
from .regex import regex_replace
|
||||
|
||||
__all__ = ['regex_replace']
|
||||
98
changedetectionio/jinja2_custom/plugins/regex.py
Normal file
98
changedetectionio/jinja2_custom/plugins/regex.py
Normal file
@@ -0,0 +1,98 @@
|
||||
"""
|
||||
Regex filter plugin for Jinja2 templates.
|
||||
|
||||
Provides regex_replace filter for pattern-based string replacements in templates.
|
||||
"""
|
||||
import re
|
||||
import signal
|
||||
from loguru import logger
|
||||
|
||||
|
||||
def regex_replace(value: str, pattern: str, replacement: str = '', count: int = 0) -> str:
|
||||
"""
|
||||
Replace occurrences of a regex pattern in a string.
|
||||
|
||||
Security: Protected against ReDoS (Regular Expression Denial of Service) attacks:
|
||||
- Limits input value size to prevent excessive processing
|
||||
- Uses timeout mechanism to prevent runaway regex operations
|
||||
- Validates pattern complexity to prevent catastrophic backtracking
|
||||
|
||||
Args:
|
||||
value: The input string to perform replacements on
|
||||
pattern: The regex pattern to search for
|
||||
replacement: The replacement string (default: '')
|
||||
count: Maximum number of replacements (0 = replace all, default: 0)
|
||||
|
||||
Returns:
|
||||
String with replacements applied, or original value on error
|
||||
|
||||
Example:
|
||||
{{ "hello world" | regex_replace("world", "universe") }}
|
||||
{{ diff | regex_replace("<td>([^<]+)</td><td>([^<]+)</td>", "Label1: \\1\\nLabel2: \\2") }}
|
||||
|
||||
Security limits:
|
||||
- Maximum input size: 10MB
|
||||
- Maximum pattern length: 500 characters
|
||||
- Operation timeout: 10 seconds
|
||||
- Dangerous nested quantifier patterns are rejected
|
||||
"""
|
||||
# Security limits
|
||||
MAX_INPUT_SIZE = 1024 * 1024 * 10 # 10MB max input size
|
||||
MAX_PATTERN_LENGTH = 500 # Maximum regex pattern length
|
||||
REGEX_TIMEOUT_SECONDS = 10 # Maximum time for regex operation
|
||||
|
||||
# Validate input sizes
|
||||
value_str = str(value)
|
||||
if len(value_str) > MAX_INPUT_SIZE:
|
||||
logger.warning(f"regex_replace: Input too large ({len(value_str)} bytes), truncating")
|
||||
value_str = value_str[:MAX_INPUT_SIZE]
|
||||
|
||||
if len(pattern) > MAX_PATTERN_LENGTH:
|
||||
logger.warning(f"regex_replace: Pattern too long ({len(pattern)} chars), rejecting")
|
||||
return value_str
|
||||
|
||||
# Check for potentially dangerous patterns (basic checks)
|
||||
# Nested quantifiers like (a+)+ can cause catastrophic backtracking
|
||||
dangerous_patterns = [
|
||||
r'\([^)]*\+[^)]*\)\+', # (x+)+
|
||||
r'\([^)]*\*[^)]*\)\+', # (x*)+
|
||||
r'\([^)]*\+[^)]*\)\*', # (x+)*
|
||||
r'\([^)]*\*[^)]*\)\*', # (x*)*
|
||||
]
|
||||
|
||||
for dangerous in dangerous_patterns:
|
||||
if re.search(dangerous, pattern):
|
||||
logger.warning(f"regex_replace: Potentially dangerous pattern detected: {pattern}")
|
||||
return value_str
|
||||
|
||||
def timeout_handler(signum, frame):
|
||||
raise TimeoutError("Regex operation timed out")
|
||||
|
||||
try:
|
||||
# Set up timeout for regex operation (Unix-like systems only)
|
||||
# This prevents ReDoS attacks
|
||||
old_handler = None
|
||||
if hasattr(signal, 'SIGALRM'):
|
||||
old_handler = signal.signal(signal.SIGALRM, timeout_handler)
|
||||
signal.alarm(REGEX_TIMEOUT_SECONDS)
|
||||
|
||||
try:
|
||||
result = re.sub(pattern, replacement, value_str, count=count)
|
||||
finally:
|
||||
# Cancel the alarm
|
||||
if hasattr(signal, 'SIGALRM'):
|
||||
signal.alarm(0)
|
||||
if old_handler is not None:
|
||||
signal.signal(signal.SIGALRM, old_handler)
|
||||
|
||||
return result
|
||||
|
||||
except TimeoutError:
|
||||
logger.error(f"regex_replace: Regex operation timed out - possible ReDoS attack. Pattern: {pattern}")
|
||||
return value_str
|
||||
except re.error as e:
|
||||
logger.warning(f"regex_replace: Invalid regex pattern: {e}")
|
||||
return value_str
|
||||
except Exception as e:
|
||||
logger.error(f"regex_replace: Unexpected error: {e}")
|
||||
return value_str
|
||||
@@ -8,13 +8,13 @@ import jinja2.sandbox
|
||||
import typing as t
|
||||
import os
|
||||
from .extensions.TimeExtension import TimeExtension
|
||||
from .plugins import regex_replace
|
||||
|
||||
JINJA2_MAX_RETURN_PAYLOAD_SIZE = 1024 * int(os.getenv("JINJA2_MAX_RETURN_PAYLOAD_SIZE_KB", 1024 * 10))
|
||||
|
||||
# Default extensions - can be overridden in create_jinja_env()
|
||||
DEFAULT_JINJA2_EXTENSIONS = [TimeExtension]
|
||||
|
||||
|
||||
def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandboxedEnvironment:
|
||||
"""
|
||||
Create a sandboxed Jinja2 environment with our custom extensions and default timezone.
|
||||
@@ -38,6 +38,9 @@ def create_jinja_env(extensions=None, **kwargs) -> jinja2.sandbox.ImmutableSandb
|
||||
default_timezone = os.getenv('TZ', 'UTC').strip()
|
||||
jinja2_env.default_timezone = default_timezone
|
||||
|
||||
# Register custom filters
|
||||
jinja2_env.filters['regex_replace'] = regex_replace
|
||||
|
||||
return jinja2_env
|
||||
|
||||
|
||||
|
||||
@@ -9,6 +9,7 @@ from pathlib import Path
|
||||
from loguru import logger
|
||||
|
||||
from .. import jinja2_custom as safe_jinja
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
# Allowable protocols, protects against javascript: etc
|
||||
@@ -89,9 +90,8 @@ class model(watch_base):
|
||||
ready_url = jinja_render(template_str=url)
|
||||
except Exception as e:
|
||||
logger.critical(f"Invalid URL template for: '{url}' - {str(e)}")
|
||||
from flask import (
|
||||
flash, Markup, url_for
|
||||
)
|
||||
from flask import flash, url_for
|
||||
from markupsafe import Markup
|
||||
message = Markup('<a href="{}#general">The URL {} is invalid and cannot be used, click to edit</a>'.format(
|
||||
url_for('ui.ui_edit.edit_page', uuid=self.get('uuid')), self.get('url', '')))
|
||||
flash(message, 'error')
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from changedetectionio.model import default_notification_format_for_watch
|
||||
|
||||
ult_notification_format_for_watch = 'System default'
|
||||
default_notification_format = 'HTML Color'
|
||||
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
|
||||
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
|
||||
@@ -8,10 +7,10 @@ default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
|
||||
# The values (markdown etc) are from apprise NotifyFormat,
|
||||
# But to avoid importing the whole heavy module just use the same strings here.
|
||||
valid_notification_formats = {
|
||||
'Text': 'text',
|
||||
'Markdown': 'markdown',
|
||||
'Plain Text': 'text',
|
||||
'HTML': 'html',
|
||||
'HTML Color': 'htmlcolor',
|
||||
'Markdown to HTML': 'markdown',
|
||||
# Used only for editing a watch (not for global)
|
||||
default_notification_format_for_watch: default_notification_format_for_watch
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ def apprise_http_custom_handler(
|
||||
title: str,
|
||||
notify_type: str,
|
||||
meta: dict,
|
||||
body_format: str = None,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> bool:
|
||||
|
||||
286
changedetectionio/notification/apprise_plugin/discord.py
Normal file
286
changedetectionio/notification/apprise_plugin/discord.py
Normal file
@@ -0,0 +1,286 @@
|
||||
"""
|
||||
Custom Discord plugin for changedetection.io
|
||||
Extends Apprise's Discord plugin to support custom colored embeds for removed/added content
|
||||
"""
|
||||
from apprise.plugins.discord import NotifyDiscord
|
||||
from apprise.decorators import notify
|
||||
from apprise.common import NotifyFormat
|
||||
from loguru import logger
|
||||
|
||||
# Import placeholders from changedetection's diff module
|
||||
from ...diff import (
|
||||
REMOVED_PLACEMARKER_OPEN,
|
||||
REMOVED_PLACEMARKER_CLOSED,
|
||||
ADDED_PLACEMARKER_OPEN,
|
||||
ADDED_PLACEMARKER_CLOSED,
|
||||
CHANGED_PLACEMARKER_OPEN,
|
||||
CHANGED_PLACEMARKER_CLOSED,
|
||||
CHANGED_INTO_PLACEMARKER_OPEN,
|
||||
CHANGED_INTO_PLACEMARKER_CLOSED,
|
||||
)
|
||||
|
||||
# Discord embed sidebar colors for different change types
|
||||
DISCORD_COLOR_UNCHANGED = 8421504 # Gray (#808080)
|
||||
DISCORD_COLOR_REMOVED = 16711680 # Red (#FF0000)
|
||||
DISCORD_COLOR_ADDED = 65280 # Green (#00FF00)
|
||||
DISCORD_COLOR_CHANGED = 16753920 # Orange (#FFA500)
|
||||
DISCORD_COLOR_CHANGED_INTO = 3447003 # Blue (#5865F2 - Discord blue)
|
||||
DISCORD_COLOR_WARNING = 16776960 # Yellow (#FFFF00)
|
||||
|
||||
|
||||
class NotifyDiscordCustom(NotifyDiscord):
|
||||
"""
|
||||
Custom Discord notification handler that supports multiple colored embeds
|
||||
for showing removed (red) and added (green) content separately.
|
||||
"""
|
||||
|
||||
def send(self, body, title="", notify_type=None, attach=None, **kwargs):
|
||||
"""
|
||||
Override send method to create custom embeds with red/green colors
|
||||
for removed/added content when placeholders are present.
|
||||
"""
|
||||
|
||||
# Check if body contains our diff placeholders
|
||||
has_removed = REMOVED_PLACEMARKER_OPEN in body
|
||||
has_added = ADDED_PLACEMARKER_OPEN in body
|
||||
has_changed = CHANGED_PLACEMARKER_OPEN in body
|
||||
has_changed_into = CHANGED_INTO_PLACEMARKER_OPEN in body
|
||||
|
||||
# If we have diff placeholders and we're in markdown/html format, create custom embeds
|
||||
if (has_removed or has_added or has_changed or has_changed_into) and self.notify_format in (NotifyFormat.MARKDOWN, NotifyFormat.HTML):
|
||||
return self._send_with_colored_embeds(body, title, notify_type, attach, **kwargs)
|
||||
|
||||
# Otherwise, use the parent class's default behavior
|
||||
return super().send(body, title, notify_type, attach, **kwargs)
|
||||
|
||||
def _send_with_colored_embeds(self, body, title, notify_type, attach, **kwargs):
|
||||
"""
|
||||
Send Discord message with embeds in the original diff order.
|
||||
Preserves the sequence: unchanged -> removed -> added -> unchanged, etc.
|
||||
"""
|
||||
from datetime import datetime, timezone
|
||||
|
||||
payload = {
|
||||
"tts": self.tts,
|
||||
"wait": self.tts is False,
|
||||
}
|
||||
|
||||
if self.flags:
|
||||
payload["flags"] = self.flags
|
||||
|
||||
# Acquire image_url
|
||||
image_url = self.image_url(notify_type)
|
||||
|
||||
if self.avatar and (image_url or self.avatar_url):
|
||||
payload["avatar_url"] = self.avatar_url if self.avatar_url else image_url
|
||||
|
||||
if self.user:
|
||||
payload["username"] = self.user
|
||||
|
||||
# Associate our thread_id with our message
|
||||
params = {"thread_id": self.thread_id} if self.thread_id else None
|
||||
|
||||
# Build embeds array preserving order
|
||||
embeds = []
|
||||
|
||||
# Add title as plain bold text in message content (not an embed)
|
||||
if title:
|
||||
payload["content"] = f"**{title}**"
|
||||
|
||||
# Parse the body into ordered chunks
|
||||
chunks = self._parse_body_into_chunks(body)
|
||||
|
||||
# Discord limits:
|
||||
# - Max 10 embeds per message
|
||||
# - Max 6000 characters total across all embeds
|
||||
# - Max 4096 characters per embed description
|
||||
max_embeds = 10
|
||||
max_total_chars = 6000
|
||||
max_embed_description = 4096
|
||||
|
||||
# All 10 embed slots are available for content
|
||||
max_content_embeds = max_embeds
|
||||
|
||||
# Start character count
|
||||
total_chars = 0
|
||||
|
||||
# Create embeds from chunks in order (no titles, just color coding)
|
||||
for chunk_type, content in chunks:
|
||||
if not content.strip():
|
||||
continue
|
||||
|
||||
# Truncate individual embed description if needed
|
||||
if len(content) > max_embed_description:
|
||||
content = content[:max_embed_description - 3] + "..."
|
||||
|
||||
# Check if we're approaching the embed count limit
|
||||
# We need room for the warning embed, so stop at max_content_embeds - 1
|
||||
current_content_embeds = len(embeds)
|
||||
if current_content_embeds >= max_content_embeds - 1:
|
||||
# Add a truncation notice (this will be the 10th embed)
|
||||
embeds.append({
|
||||
"description": "⚠️ Content truncated (Discord 10 embed limit reached) - Tip: Select 'Plain Text' or 'HTML' format for longer diffs",
|
||||
"color": DISCORD_COLOR_WARNING,
|
||||
})
|
||||
break
|
||||
|
||||
# Check if adding this embed would exceed total character limit
|
||||
if total_chars + len(content) > max_total_chars:
|
||||
# Add a truncation notice
|
||||
remaining_chars = max_total_chars - total_chars
|
||||
if remaining_chars > 100:
|
||||
# Add partial content if we have room
|
||||
truncated_content = content[:remaining_chars - 100] + "..."
|
||||
embeds.append({
|
||||
"description": truncated_content,
|
||||
"color": (DISCORD_COLOR_UNCHANGED if chunk_type == "unchanged"
|
||||
else DISCORD_COLOR_REMOVED if chunk_type == "removed"
|
||||
else DISCORD_COLOR_ADDED),
|
||||
})
|
||||
embeds.append({
|
||||
"description": "⚠️ Content truncated (Discord 6000 char limit reached)\nTip: Select 'Plain Text' or 'HTML' format for longer diffs",
|
||||
"color": DISCORD_COLOR_WARNING,
|
||||
})
|
||||
break
|
||||
|
||||
if chunk_type == "unchanged":
|
||||
embeds.append({
|
||||
"description": content,
|
||||
"color": DISCORD_COLOR_UNCHANGED,
|
||||
})
|
||||
elif chunk_type == "removed":
|
||||
embeds.append({
|
||||
"description": content,
|
||||
"color": DISCORD_COLOR_REMOVED,
|
||||
})
|
||||
elif chunk_type == "added":
|
||||
embeds.append({
|
||||
"description": content,
|
||||
"color": DISCORD_COLOR_ADDED,
|
||||
})
|
||||
elif chunk_type == "changed":
|
||||
# Changed (old value) - use orange to distinguish from pure removal
|
||||
embeds.append({
|
||||
"description": content,
|
||||
"color": DISCORD_COLOR_CHANGED,
|
||||
})
|
||||
elif chunk_type == "changed_into":
|
||||
# Changed into (new value) - use blue to distinguish from pure addition
|
||||
embeds.append({
|
||||
"description": content,
|
||||
"color": DISCORD_COLOR_CHANGED_INTO,
|
||||
})
|
||||
|
||||
total_chars += len(content)
|
||||
|
||||
if embeds:
|
||||
payload["embeds"] = embeds
|
||||
|
||||
# Send the payload using parent's _send method
|
||||
if not self._send(payload, params=params):
|
||||
return False
|
||||
|
||||
# Handle attachments if present
|
||||
if attach and self.attachment_support:
|
||||
payload.update({
|
||||
"tts": False,
|
||||
"wait": True,
|
||||
})
|
||||
payload.pop("embeds", None)
|
||||
payload.pop("content", None)
|
||||
payload.pop("allow_mentions", None)
|
||||
|
||||
for attachment in attach:
|
||||
self.logger.info(f"Posting Discord Attachment {attachment.name}")
|
||||
if not self._send(payload, params=params, attach=attachment):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _parse_body_into_chunks(self, body):
|
||||
"""
|
||||
Parse the body into ordered chunks of (type, content) tuples.
|
||||
Types: "unchanged", "removed", "added", "changed", "changed_into"
|
||||
Preserves the original order of the diff.
|
||||
"""
|
||||
chunks = []
|
||||
position = 0
|
||||
|
||||
while position < len(body):
|
||||
# Find the next marker
|
||||
next_removed = body.find(REMOVED_PLACEMARKER_OPEN, position)
|
||||
next_added = body.find(ADDED_PLACEMARKER_OPEN, position)
|
||||
next_changed = body.find(CHANGED_PLACEMARKER_OPEN, position)
|
||||
next_changed_into = body.find(CHANGED_INTO_PLACEMARKER_OPEN, position)
|
||||
|
||||
# Determine which marker comes first
|
||||
if next_removed == -1 and next_added == -1 and next_changed == -1 and next_changed_into == -1:
|
||||
# No more markers, rest is unchanged
|
||||
if position < len(body):
|
||||
chunks.append(("unchanged", body[position:]))
|
||||
break
|
||||
|
||||
# Find the earliest marker
|
||||
next_marker_pos = None
|
||||
next_marker_type = None
|
||||
|
||||
# Compare all marker positions to find the earliest
|
||||
markers = []
|
||||
if next_removed != -1:
|
||||
markers.append((next_removed, "removed"))
|
||||
if next_added != -1:
|
||||
markers.append((next_added, "added"))
|
||||
if next_changed != -1:
|
||||
markers.append((next_changed, "changed"))
|
||||
if next_changed_into != -1:
|
||||
markers.append((next_changed_into, "changed_into"))
|
||||
|
||||
if markers:
|
||||
next_marker_pos, next_marker_type = min(markers, key=lambda x: x[0])
|
||||
|
||||
# Add unchanged content before the marker
|
||||
if next_marker_pos > position:
|
||||
chunks.append(("unchanged", body[position:next_marker_pos]))
|
||||
|
||||
# Find the closing marker
|
||||
if next_marker_type == "removed":
|
||||
open_marker = REMOVED_PLACEMARKER_OPEN
|
||||
close_marker = REMOVED_PLACEMARKER_CLOSED
|
||||
elif next_marker_type == "added":
|
||||
open_marker = ADDED_PLACEMARKER_OPEN
|
||||
close_marker = ADDED_PLACEMARKER_CLOSED
|
||||
elif next_marker_type == "changed":
|
||||
open_marker = CHANGED_PLACEMARKER_OPEN
|
||||
close_marker = CHANGED_PLACEMARKER_CLOSED
|
||||
else: # changed_into
|
||||
open_marker = CHANGED_INTO_PLACEMARKER_OPEN
|
||||
close_marker = CHANGED_INTO_PLACEMARKER_CLOSED
|
||||
|
||||
close_pos = body.find(close_marker, next_marker_pos)
|
||||
|
||||
if close_pos == -1:
|
||||
# No closing marker, take rest as this type
|
||||
content = body[next_marker_pos + len(open_marker):]
|
||||
chunks.append((next_marker_type, content))
|
||||
break
|
||||
else:
|
||||
# Extract content between markers
|
||||
content = body[next_marker_pos + len(open_marker):close_pos]
|
||||
chunks.append((next_marker_type, content))
|
||||
position = close_pos + len(close_marker)
|
||||
|
||||
return chunks
|
||||
|
||||
|
||||
# Register the custom Discord handler with Apprise
|
||||
# This will override the built-in discord:// handler
|
||||
@notify(on="discord")
|
||||
def discord_custom_wrapper(body, title, notify_type, meta, body_format=None, *args, **kwargs):
|
||||
"""
|
||||
Wrapper function to make the custom Discord handler work with Apprise's decorator system.
|
||||
Note: This decorator approach may not work for overriding built-in plugins.
|
||||
The class-based approach above is the proper way to extend NotifyDiscord.
|
||||
"""
|
||||
logger.info("Custom Discord handler called")
|
||||
# This is here for potential future use with decorator-based registration
|
||||
return True
|
||||
@@ -3,7 +3,12 @@ import time
|
||||
import apprise
|
||||
from apprise import NotifyFormat
|
||||
from loguru import logger
|
||||
from urllib.parse import urlparse
|
||||
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
||||
from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS
|
||||
from ..diff import HTML_REMOVED_STYLE, REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED, ADDED_PLACEMARKER_OPEN, HTML_ADDED_STYLE, \
|
||||
ADDED_PLACEMARKER_CLOSED, CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED, CHANGED_PLACEMARKER_OPEN, \
|
||||
CHANGED_PLACEMARKER_CLOSED
|
||||
from ..notification_service import NotificationContextData
|
||||
|
||||
|
||||
@@ -51,30 +56,156 @@ def notification_format_align_with_apprise(n_format : str):
|
||||
"""
|
||||
Correctly align changedetection's formats with apprise's formats
|
||||
Probably these are the same - but good to be sure.
|
||||
These set the expected OUTPUT format type
|
||||
:param n_format:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if n_format.lower().startswith('html'):
|
||||
# Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here
|
||||
n_format = NotifyFormat.HTML
|
||||
n_format = NotifyFormat.HTML.value
|
||||
elif n_format.lower().startswith('markdown'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.MARKDOWN
|
||||
n_format = NotifyFormat.MARKDOWN.value
|
||||
elif n_format.lower().startswith('text'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.TEXT
|
||||
n_format = NotifyFormat.TEXT.value
|
||||
else:
|
||||
n_format = NotifyFormat.TEXT
|
||||
n_format = NotifyFormat.TEXT.value
|
||||
|
||||
return n_format
|
||||
|
||||
|
||||
def apply_service_tweaks(url, n_body, n_title, requested_output_format):
|
||||
|
||||
# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
|
||||
# Because different notifications may require different pre-processing, run each sequentially :(
|
||||
# 2000 bytes minus -
|
||||
# 200 bytes for the overhead of the _entire_ json payload, 200 bytes for {tts, wait, content} etc headers
|
||||
# Length of URL - Incase they specify a longer custom avatar_url
|
||||
|
||||
if not n_body or not n_body.strip():
|
||||
return url, n_body, n_title
|
||||
|
||||
# So if no avatar_url is specified, add one so it can be correctly calculated into the total payload
|
||||
parsed = urlparse(url)
|
||||
k = '?' if not parsed.query else '&'
|
||||
if url and not 'avatar_url' in url \
|
||||
and not url.startswith('mail') \
|
||||
and not url.startswith('post') \
|
||||
and not url.startswith('get') \
|
||||
and not url.startswith('delete') \
|
||||
and not url.startswith('put'):
|
||||
url += k + f"avatar_url={APPRISE_AVATAR_URL}"
|
||||
|
||||
if url.startswith('tgram://'):
|
||||
# Telegram only supports a limit subset of HTML, remove the '<br>' we place in.
|
||||
# re https://github.com/dgtlmoon/changedetection.io/issues/555
|
||||
# @todo re-use an existing library we have already imported to strip all non-allowed tags
|
||||
n_body = n_body.replace('<br>', '\n')
|
||||
n_body = n_body.replace('</br>', '\n')
|
||||
|
||||
# Use strikethrough for removed content, bold for added content
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '<s>')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '</s>')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '<b>')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '</b>')
|
||||
# Handle changed/replaced lines (old → new)
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, '<s>')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, '</s>')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, '<b>')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, '</b>')
|
||||
|
||||
# real limit is 4096, but minus some for extra metadata
|
||||
payload_max_size = 3600
|
||||
body_limit = max(0, payload_max_size - len(n_title))
|
||||
n_title = n_title[0:payload_max_size]
|
||||
n_body = n_body[0:body_limit]
|
||||
|
||||
elif (url.startswith('discord://') or url.startswith('https://discordapp.com/api/webhooks')
|
||||
or url.startswith('https://discord.com/api'))\
|
||||
and 'html' in requested_output_format:
|
||||
# Discord doesn't support HTML, replace <br> with newlines
|
||||
n_body = n_body.strip().replace('<br>', '\n')
|
||||
n_body = n_body.replace('</br>', '\n')
|
||||
|
||||
# Don't replace placeholders or truncate here - let the custom Discord plugin handle it
|
||||
# The plugin will use embeds (6000 char limit across all embeds) if placeholders are present,
|
||||
# or plain content (2000 char limit) otherwise
|
||||
|
||||
# Only do placeholder replacement if NOT using htmlcolor (which triggers embeds in custom plugin)
|
||||
if requested_output_format == 'html':
|
||||
# No diff placeholders, use Discord markdown for any other formatting
|
||||
# Use Discord markdown: strikethrough for removed, bold for added
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '~~')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '~~')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '**')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '**')
|
||||
# Handle changed/replaced lines (old → new)
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, '~~')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, '~~')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, '**')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, '**')
|
||||
|
||||
# Apply 2000 char limit for plain content
|
||||
payload_max_size = 1700
|
||||
body_limit = max(0, payload_max_size - len(n_title))
|
||||
n_title = n_title[0:payload_max_size]
|
||||
n_body = n_body[0:body_limit]
|
||||
# else: our custom Discord plugin will convert any placeholders left over into embeds with color bars
|
||||
|
||||
# Is not discord/tgram and they want htmlcolor
|
||||
elif requested_output_format == 'htmlcolor':
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, f'</span>')
|
||||
# Handle changed/replaced lines (old → new)
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'<span style="{HTML_REMOVED_STYLE}">')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'<span style="{HTML_ADDED_STYLE}">')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'</span>')
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
elif requested_output_format == 'html':
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '(added) ')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'(changed) ')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'')
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
|
||||
else: #plaintext etc default
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_OPEN, '(removed) ')
|
||||
n_body = n_body.replace(REMOVED_PLACEMARKER_CLOSED, '')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_OPEN, '(added) ')
|
||||
n_body = n_body.replace(ADDED_PLACEMARKER_CLOSED, '')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_OPEN, f'(changed) ')
|
||||
n_body = n_body.replace(CHANGED_PLACEMARKER_CLOSED, f'')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_OPEN, f'(into) ')
|
||||
n_body = n_body.replace(CHANGED_INTO_PLACEMARKER_CLOSED, f'')
|
||||
|
||||
return url, n_body, n_title
|
||||
|
||||
# Must be str for apprise notify body_format
|
||||
return str(n_format)
|
||||
|
||||
def process_notification(n_object: NotificationContextData, datastore):
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
||||
# be sure its registered
|
||||
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||
# Register custom Discord plugin
|
||||
from .apprise_plugin.discord import NotifyDiscordCustom
|
||||
|
||||
# Create list of custom handler protocols (both http and https versions)
|
||||
custom_handler_protocols = [f"{method}://" for method in SUPPORTED_HTTP_METHODS]
|
||||
custom_handler_protocols += [f"{method}s://" for method in SUPPORTED_HTTP_METHODS]
|
||||
|
||||
has_custom_handler = any(
|
||||
url.startswith(tuple(custom_handler_protocols))
|
||||
for url in n_object['notification_urls']
|
||||
)
|
||||
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
@@ -86,20 +217,27 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
# Insert variables into the notification content
|
||||
notification_parameters = create_notification_parameters(n_object, datastore)
|
||||
|
||||
n_format = valid_notification_formats.get(
|
||||
requested_output_format = valid_notification_formats.get(
|
||||
n_object.get('notification_format', default_notification_format),
|
||||
valid_notification_formats[default_notification_format],
|
||||
)
|
||||
|
||||
# If we arrived with 'System default' then look it up
|
||||
if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
|
||||
if requested_output_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
|
||||
# Initially text or whatever
|
||||
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower()
|
||||
requested_output_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower()
|
||||
|
||||
n_format = notification_format_align_with_apprise(n_format=n_format)
|
||||
requested_output_format_original = requested_output_format
|
||||
|
||||
requested_output_format = notification_format_align_with_apprise(n_format=requested_output_format)
|
||||
|
||||
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
|
||||
|
||||
# If we have custom handlers, use invalid format to prevent conversion
|
||||
# Otherwise use the proper format
|
||||
if has_custom_handler:
|
||||
input_format = 'raw-no-convert'
|
||||
|
||||
# https://github.com/caronc/apprise/wiki/Development_LogCapture
|
||||
# Anything higher than or equal to WARNING (which covers things like Connection errors)
|
||||
# raise it as an exception
|
||||
@@ -111,11 +249,19 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
|
||||
apobj = apprise.Apprise(debug=True, asset=apprise_asset)
|
||||
|
||||
# Override Apprise's built-in Discord plugin with our custom one
|
||||
# This allows us to use colored embeds for diff content
|
||||
# First remove the built-in discord plugin, then add our custom one
|
||||
apprise.plugins.N_MGR.remove('discord')
|
||||
apprise.plugins.N_MGR.add(NotifyDiscordCustom, schemas='discord')
|
||||
|
||||
if not n_object.get('notification_urls'):
|
||||
return None
|
||||
|
||||
with apprise.LogCapture(level=apprise.logging.DEBUG) as logs:
|
||||
for url in n_object['notification_urls']:
|
||||
parsed_url = urlparse(url)
|
||||
prefix_add_to_url = '?' if not parsed_url.query else '&'
|
||||
|
||||
# Get the notification body from datastore
|
||||
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
|
||||
@@ -123,8 +269,31 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
if n_object.get('markup_text_to_html'):
|
||||
n_body = markup_text_links_to_html(body=n_body)
|
||||
|
||||
if n_format == str(NotifyFormat.HTML):
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
# This actually means we request "Markdown to HTML"
|
||||
if requested_output_format == NotifyFormat.MARKDOWN.value:
|
||||
output_format = NotifyFormat.HTML.value
|
||||
input_format = NotifyFormat.MARKDOWN.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
# Deviation from apprise.
|
||||
# No conversion, its like they want to send raw HTML but we add linebreaks
|
||||
elif requested_output_format == NotifyFormat.HTML.value:
|
||||
# same in and out means apprise wont try to convert
|
||||
input_format = output_format = NotifyFormat.HTML.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
else:
|
||||
# Nothing to be done, leave it as plaintext
|
||||
# `body_format` Tell apprise what format the INPUT is in
|
||||
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
|
||||
input_format = output_format = NotifyFormat.TEXT.value
|
||||
if not 'format=' in url.lower():
|
||||
url = f"{url}{prefix_add_to_url}format={output_format}"
|
||||
|
||||
if has_custom_handler:
|
||||
input_format='raw-no-convert'
|
||||
|
||||
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
|
||||
|
||||
@@ -140,74 +309,28 @@ def process_notification(n_object: NotificationContextData, datastore):
|
||||
logger.info(f">> Process Notification: AppRise notifying {url}")
|
||||
url = jinja_render(template_str=url, **notification_parameters)
|
||||
|
||||
# Re 323 - Limit discord length to their 2000 char limit total or it wont send.
|
||||
# Because different notifications may require different pre-processing, run each sequentially :(
|
||||
# 2000 bytes minus -
|
||||
# 200 bytes for the overhead of the _entire_ json payload, 200 bytes for {tts, wait, content} etc headers
|
||||
# Length of URL - Incase they specify a longer custom avatar_url
|
||||
|
||||
# So if no avatar_url is specified, add one so it can be correctly calculated into the total payload
|
||||
k = '?' if not '?' in url else '&'
|
||||
if not 'avatar_url' in url \
|
||||
and not url.startswith('mail') \
|
||||
and not url.startswith('post') \
|
||||
and not url.startswith('get') \
|
||||
and not url.startswith('delete') \
|
||||
and not url.startswith('put'):
|
||||
url += k + f"avatar_url={APPRISE_AVATAR_URL}"
|
||||
|
||||
if url.startswith('tgram://'):
|
||||
# Telegram only supports a limit subset of HTML, remove the '<br>' we place in.
|
||||
# re https://github.com/dgtlmoon/changedetection.io/issues/555
|
||||
# @todo re-use an existing library we have already imported to strip all non-allowed tags
|
||||
n_body = n_body.replace('<br>', '\n')
|
||||
n_body = n_body.replace('</br>', '\n')
|
||||
# real limit is 4096, but minus some for extra metadata
|
||||
payload_max_size = 3600
|
||||
body_limit = max(0, payload_max_size - len(n_title))
|
||||
n_title = n_title[0:payload_max_size]
|
||||
n_body = n_body[0:body_limit]
|
||||
|
||||
elif url.startswith('discord://') or url.startswith('https://discordapp.com/api/webhooks') or url.startswith(
|
||||
'https://discord.com/api'):
|
||||
# real limit is 2000, but minus some for extra metadata
|
||||
payload_max_size = 1700
|
||||
body_limit = max(0, payload_max_size - len(n_title))
|
||||
n_title = n_title[0:payload_max_size]
|
||||
n_body = n_body[0:body_limit]
|
||||
|
||||
elif url.startswith('mailto'):
|
||||
# Apprise will default to HTML, so we need to override it
|
||||
# So that whats' generated in n_body is in line with what is going to be sent.
|
||||
# https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
|
||||
if not 'format=' in url and (n_format == 'Text' or n_format == 'Markdown'):
|
||||
prefix = '?' if not '?' in url else '&'
|
||||
# Apprise format is lowercase text https://github.com/caronc/apprise/issues/633
|
||||
n_format = n_format.lower()
|
||||
url = f"{url}{prefix}format={n_format}"
|
||||
# If n_format == HTML, then apprise email should default to text/html and we should be sending HTML only
|
||||
(url, n_body, n_title) = apply_service_tweaks(url=url, n_body=n_body, n_title=n_title, requested_output_format=requested_output_format_original)
|
||||
|
||||
apobj.add(url)
|
||||
|
||||
sent_objs.append({'title': n_title,
|
||||
'body': n_body,
|
||||
'url': url,
|
||||
'body_format': n_format})
|
||||
'url': url})
|
||||
|
||||
# Blast off the notifications tht are set in .add()
|
||||
apobj.notify(
|
||||
title=n_title,
|
||||
body=n_body,
|
||||
body_format=n_format,
|
||||
# `body_format` Tell apprise what format the INPUT is in
|
||||
# &format= in URL Tell apprise what format the OUTPUT should be in (it can convert between)
|
||||
body_format=input_format,
|
||||
# False is not an option for AppRise, must be type None
|
||||
attach=n_object.get('screenshot', None)
|
||||
)
|
||||
|
||||
|
||||
# Returns empty string if nothing found, multi-line string otherwise
|
||||
log_value = logs.getvalue()
|
||||
|
||||
if log_value and 'WARNING' in log_value or 'ERROR' in log_value:
|
||||
if log_value and ('WARNING' in log_value or 'ERROR' in log_value):
|
||||
logger.critical(log_value)
|
||||
raise Exception(log_value)
|
||||
|
||||
|
||||
@@ -92,7 +92,6 @@ class NotificationService:
|
||||
if n_object.get('notification_format') == default_notification_format_for_watch:
|
||||
n_object['notification_format'] = self.datastore.data['settings']['application'].get('notification_format')
|
||||
|
||||
html_colour_enable = False
|
||||
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
||||
if n_object.get('notification_format') == 'HTML':
|
||||
line_feed_sep = "<br>"
|
||||
@@ -102,7 +101,6 @@ class NotificationService:
|
||||
line_feed_sep = "<br>"
|
||||
# Snapshot will be plaintext on the disk, convert to some kind of HTML
|
||||
snapshot_contents = snapshot_contents.replace('\n', line_feed_sep)
|
||||
html_colour_enable = True
|
||||
else:
|
||||
line_feed_sep = "\n"
|
||||
|
||||
@@ -123,11 +121,11 @@ class NotificationService:
|
||||
|
||||
n_object.update({
|
||||
'current_snapshot': snapshot_contents,
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
|
||||
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
|
||||
'triggered_text': triggered_text,
|
||||
'uuid': watch.get('uuid') if watch else None,
|
||||
|
||||
@@ -7,6 +7,7 @@ import re
|
||||
import urllib3
|
||||
|
||||
from changedetectionio.conditions import execute_ruleset_against_all_plugins
|
||||
from changedetectionio.diff import ADDED_PLACEMARKER_OPEN
|
||||
from changedetectionio.processors import difference_detection_processor
|
||||
from changedetectionio.html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text, TRANSLATE_WHITESPACE_TABLE
|
||||
from changedetectionio import html_tools, content_fetchers
|
||||
@@ -324,13 +325,13 @@ class ContentProcessor:
|
||||
append_pretty_line_formatting=not self.watch.is_source_type_url
|
||||
)
|
||||
|
||||
# Raise error if filter returned nothing
|
||||
if not filtered_content.strip():
|
||||
raise FilterNotFoundInResponse(
|
||||
msg=self.filter_config.include_filters,
|
||||
screenshot=self.fetcher.screenshot,
|
||||
xpath_data=self.fetcher.xpath_data
|
||||
)
|
||||
# Raise error if filter returned nothing
|
||||
if not filtered_content.strip():
|
||||
raise FilterNotFoundInResponse(
|
||||
msg=self.filter_config.include_filters,
|
||||
screenshot=self.fetcher.screenshot,
|
||||
xpath_data=self.fetcher.xpath_data
|
||||
)
|
||||
|
||||
return filtered_content
|
||||
|
||||
|
||||
@@ -2,6 +2,13 @@
|
||||
|
||||
$(document).ready(function () {
|
||||
|
||||
function reapplyTableStripes() {
|
||||
$('.watch-table tbody tr').each(function(index) {
|
||||
$(this).removeClass('pure-table-odd pure-table-even');
|
||||
$(this).addClass(index % 2 === 0 ? 'pure-table-odd' : 'pure-table-even');
|
||||
});
|
||||
}
|
||||
|
||||
function bindSocketHandlerButtonsEvents(socket) {
|
||||
$('.ajax-op').on('click.socketHandlerNamespace', function (e) {
|
||||
e.preventDefault();
|
||||
@@ -101,6 +108,7 @@ $(document).ready(function () {
|
||||
socket.on('watch_deleted', function (data) {
|
||||
$('tr[data-watch-uuid="' + data.uuid + '"] td').fadeOut(500, function () {
|
||||
$(this).closest('tr').remove();
|
||||
reapplyTableStripes();
|
||||
});
|
||||
});
|
||||
|
||||
|
||||
@@ -134,6 +134,12 @@
|
||||
<p>
|
||||
URL encoding, use <strong>|urlencode</strong>, for example - <code>gets://hook-website.com/test.php?title={{ '{{ watch_title|urlencode }}' }}</code>
|
||||
</p>
|
||||
<p>
|
||||
Regular-expression replace, use <strong>|regex_replace</strong>, for example - <code>{{ "{{ \"hello world 123\" | regex_replace('[0-9]+', 'no-more-numbers') }}" }}</code>
|
||||
</p>
|
||||
<p>
|
||||
For a complete reference of all Jinja2 built-in filters, users can refer to the <a href="https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters">https://jinja.palletsprojects.com/en/3.1.x/templates/#builtin-filters</a>
|
||||
</p>
|
||||
</div>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
|
||||
@@ -1,14 +1,15 @@
|
||||
import json
|
||||
import os
|
||||
import time
|
||||
import re
|
||||
from flask import url_for
|
||||
from email import message_from_string
|
||||
from email.policy import default as email_policy
|
||||
|
||||
from changedetectionio.diff import HTML_REMOVED_STYLE, HTML_ADDED_STYLE
|
||||
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
|
||||
wait_for_all_checks, \
|
||||
set_longer_modified_response, delete_all_watches
|
||||
from changedetectionio.tests.util import extract_UUID_from_client
|
||||
|
||||
import logging
|
||||
import base64
|
||||
|
||||
|
||||
# NOTE - RELIES ON mailserver as hostname running, see github build recipes
|
||||
smtp_test_server = 'mailserver'
|
||||
@@ -50,7 +51,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "fallback-body<br> " + default_notification_body,
|
||||
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
|
||||
"application-notification_format": 'HTML',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
@@ -77,19 +78,229 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# The email should have two bodies, and the text/html part should be <br>
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '(added) So let\'s see what happens.<br>' in msg # the html part
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert 'fallback-body\r\n' in text_content # The plaintext part
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert 'some text<br>' in html_content # We converted \n from the notification body
|
||||
assert 'fallback-body<br>' in html_content # kept the original <br>
|
||||
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_check_notification_plaintext_format(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'Plain Text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
time.sleep(2)
|
||||
|
||||
set_longer_modified_response()
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should be plain text only (not multipart)
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
|
||||
# Get the plain text content
|
||||
text_content = msg.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
# Should NOT contain HTML
|
||||
assert '<br>' not in text_content # We should not have HTML in plain text
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
|
||||
def test_check_notification_html_color_format(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'HTML Color',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert 'So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert '(added)' not in text_content # Because apprise only dumb converts the html to text
|
||||
|
||||
# Second part should be text/html with color styling
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert HTML_REMOVED_STYLE in html_content
|
||||
assert HTML_ADDED_STYLE in html_content
|
||||
|
||||
assert 'some text<br>' in html_content
|
||||
delete_all_watches(client)
|
||||
|
||||
def test_check_notification_markdown_format(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "*header*\n\nsome text\n" + default_notification_body,
|
||||
"application-notification_format": 'Markdown to HTML',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
|
||||
# Second part should be text/html and roughly converted from markdown to HTML
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert '<p><em>header</em></p>' in html_content
|
||||
assert '(added) So let\'s see what happens.<br' in html_content
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage):
|
||||
## live_server_setup(live_server) # Setup on conftest per function
|
||||
|
||||
# HTML problems? see this
|
||||
# https://github.com/caronc/apprise/issues/633
|
||||
@@ -115,7 +326,7 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": notification_body,
|
||||
"application-notification_format": 'Text',
|
||||
"application-notification_format": 'Plain Text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
@@ -139,15 +350,21 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
# with open('/tmp/m.txt', 'w') as f:
|
||||
# f.write(msg)
|
||||
# f.write(msg_raw)
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should not have two bodies, should be TEXT only
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
|
||||
# Get the plain text content
|
||||
text_content = msg.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
set_original_response()
|
||||
# Now override as HTML format
|
||||
@@ -164,18 +381,34 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# The email should have two bodies, and the text/html part should be <br>
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(removed) So let\'s see what happens.\r\n' in msg # The plaintext part with \n
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '(removed) So let\'s see what happens.<br>' in msg # the html part
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(removed) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert '(removed) So let\'s see what happens.<br>' in html_content # the html part
|
||||
|
||||
# https://github.com/dgtlmoon/changedetection.io/issues/2103
|
||||
assert '<h1>Test</h1>' in msg
|
||||
assert '<' not in msg
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '<h1>Test</h1>' in html_content
|
||||
assert '<' not in html_content
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -6,6 +6,9 @@ from flask import url_for
|
||||
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, delete_all_watches
|
||||
import time
|
||||
|
||||
from ..diff import ADDED_PLACEMARKER_OPEN
|
||||
|
||||
|
||||
def set_original(excluding=None, add_line=None):
|
||||
test_return_data = """<html>
|
||||
<body>
|
||||
@@ -121,6 +124,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
|
||||
"application-notification_body": 'triggered text was -{{triggered_text}}- ### 网站监测 内容更新了 ####',
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
|
||||
"application-notification_urls": test_notification_url,
|
||||
"application-notification_format": 'Plain Text',
|
||||
"application-minutes_between_check": 180,
|
||||
"application-fetch_backend": "html_requests"
|
||||
},
|
||||
@@ -174,6 +178,7 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
|
||||
assert os.path.isfile("test-datastore/notification.txt"), "Notification fired because I can see the output file"
|
||||
with open("test-datastore/notification.txt", 'rb') as f:
|
||||
response = f.read()
|
||||
assert ADDED_PLACEMARKER_OPEN.encode('utf-8') not in response # _apply_diff_filtering shouldnt add something here
|
||||
assert b'-Oh yes please' in response
|
||||
assert '网站监测 内容更新了'.encode('utf-8') in response
|
||||
|
||||
|
||||
@@ -86,14 +86,16 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
||||
"Diff Full: {{diff_full}}\n"
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_format": "Text"}
|
||||
"notification_format": 'Plain Text'}
|
||||
|
||||
notification_form_data.update({
|
||||
"url": test_url,
|
||||
"tags": "my tag",
|
||||
"title": "my title",
|
||||
"headers": "",
|
||||
"include_filters": '.ticket-available',
|
||||
# preprended with extra filter that intentionally doesn't match any entry,
|
||||
# notification should still be sent even if first filter does not match (PR#3516)
|
||||
"include_filters": ".non-matching-selector\n.ticket-available",
|
||||
"fetch_backend": "html_requests",
|
||||
"time_between_check_use_default": "y"})
|
||||
|
||||
|
||||
@@ -63,7 +63,7 @@ def run_filter_test(client, live_server, content_filter, app_notification_format
|
||||
"Diff Full: {{diff_full}}\n"
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_format": "Text",
|
||||
"notification_format": 'Plain Text',
|
||||
"fetch_backend": "html_requests",
|
||||
"filter_failure_notification_send": 'y',
|
||||
"time_between_check_use_default": "y",
|
||||
@@ -177,7 +177,7 @@ def test_check_include_filters_failure_notification(client, live_server, measure
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('HTML Color'))
|
||||
# Check markup send conversion didnt affect plaintext preference
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Text'))
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Plain Text'))
|
||||
|
||||
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage):
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
|
||||
@@ -195,7 +195,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage):
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_screenshot": True,
|
||||
"notification_format": "Text",
|
||||
"notification_format": 'Plain Text',
|
||||
"title": "test-tag"}
|
||||
|
||||
res = client.post(
|
||||
|
||||
@@ -169,4 +169,161 @@ def test_default_timezone_subtraction(environment):
|
||||
|
||||
finalRender = render("{% now '' - 'minutes=11' %}")
|
||||
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||
assert finalRender == "Wed, 09 Dec 2015 23:22:01"
|
||||
|
||||
def test_regex_replace_basic():
|
||||
"""Test basic regex_replace functionality."""
|
||||
|
||||
# Simple word replacement
|
||||
finalRender = render("{{ 'hello world' | regex_replace('world', 'universe') }}")
|
||||
assert finalRender == "hello universe"
|
||||
|
||||
def test_regex_replace_with_groups():
|
||||
"""Test regex_replace with capture groups (issue #3501 use case)."""
|
||||
|
||||
# Transform HTML table data as described in the issue
|
||||
template = "{{ '<td>thing</td><td>other</td>' | regex_replace('<td>([^<]+)</td><td>([^<]+)</td>', 'ThingLabel: \\\\1\\nOtherLabel: \\\\2') }}"
|
||||
finalRender = render(template)
|
||||
assert "ThingLabel: thing" in finalRender
|
||||
assert "OtherLabel: other" in finalRender
|
||||
|
||||
def test_regex_replace_multiple_matches():
|
||||
"""Test regex_replace replacing multiple occurrences."""
|
||||
|
||||
finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux') }}")
|
||||
assert finalRender == "qux bar qux baz"
|
||||
|
||||
def test_regex_replace_count_parameter():
|
||||
"""Test regex_replace with count parameter to limit replacements."""
|
||||
|
||||
finalRender = render("{{ 'foo bar foo baz' | regex_replace('foo', 'qux', 1) }}")
|
||||
assert finalRender == "qux bar foo baz"
|
||||
|
||||
def test_regex_replace_empty_replacement():
|
||||
"""Test regex_replace with empty replacement (removal)."""
|
||||
|
||||
finalRender = render("{{ 'hello world 123' | regex_replace('[0-9]+', '') }}")
|
||||
assert finalRender == "hello world "
|
||||
|
||||
def test_regex_replace_no_match():
|
||||
"""Test regex_replace when pattern doesn't match."""
|
||||
|
||||
finalRender = render("{{ 'hello world' | regex_replace('xyz', 'abc') }}")
|
||||
assert finalRender == "hello world"
|
||||
|
||||
def test_regex_replace_invalid_regex():
|
||||
"""Test regex_replace with invalid regex pattern returns original value."""
|
||||
|
||||
# Invalid regex (unmatched parenthesis)
|
||||
finalRender = render("{{ 'hello world' | regex_replace('(invalid', 'replacement') }}")
|
||||
assert finalRender == "hello world"
|
||||
|
||||
def test_regex_replace_special_characters():
|
||||
"""Test regex_replace with special regex characters."""
|
||||
|
||||
finalRender = render("{{ 'Price: $50.00' | regex_replace('\\\\$([0-9.]+)', 'USD \\\\1') }}")
|
||||
assert finalRender == "Price: USD 50.00"
|
||||
|
||||
def test_regex_replace_multiline():
|
||||
"""Test regex_replace on multiline text."""
|
||||
|
||||
template = "{{ 'line1\\nline2\\nline3' | regex_replace('^line', 'row') }}"
|
||||
finalRender = render(template)
|
||||
# By default re.sub doesn't use MULTILINE flag, so only first line matches with ^
|
||||
assert finalRender == "row1\nline2\nline3"
|
||||
|
||||
def test_regex_replace_with_notification_context():
|
||||
"""Test regex_replace with notification diff variable."""
|
||||
|
||||
# Simulate how it would be used in notifications with diff variable
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
|
||||
context = NotificationContextData()
|
||||
context['diff'] = '<td>value1</td><td>value2</td>'
|
||||
|
||||
template = "{{ diff | regex_replace('<td>([^<]+)</td>', '\\\\1 ') }}"
|
||||
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals.update(context)
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
assert "value1 value2 " in finalRender
|
||||
|
||||
def test_regex_replace_security_large_input():
|
||||
"""Test regex_replace handles large input safely."""
|
||||
|
||||
# Create a large input string (over 10MB)
|
||||
large_input = "x" * (1024 * 1024 * 10 + 1000)
|
||||
template = "{{ large_input | regex_replace('x', 'y') }}"
|
||||
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals['large_input'] = large_input
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
# Should be truncated to 10MB
|
||||
assert len(finalRender) == 1024 * 1024 * 10
|
||||
|
||||
def test_regex_replace_security_long_pattern():
|
||||
"""Test regex_replace rejects very long patterns."""
|
||||
|
||||
# Pattern longer than 500 chars should be rejected
|
||||
long_pattern = "a" * 501
|
||||
finalRender = render("{{ 'test' | regex_replace('" + long_pattern + "', 'replacement') }}")
|
||||
|
||||
# Should return original value when pattern is too long
|
||||
assert finalRender == "test"
|
||||
|
||||
def test_regex_replace_security_dangerous_pattern():
|
||||
"""Test regex_replace detects and rejects dangerous nested quantifiers."""
|
||||
|
||||
# Patterns that could cause catastrophic backtracking
|
||||
dangerous_patterns = [
|
||||
"(a+)+",
|
||||
"(a*)+",
|
||||
"(a+)*",
|
||||
"(a*)*",
|
||||
]
|
||||
|
||||
for dangerous in dangerous_patterns:
|
||||
# Create a template with the dangerous pattern
|
||||
# Using single quotes to avoid escaping issues
|
||||
from changedetectionio.jinja2_custom import create_jinja_env
|
||||
from jinja2 import BaseLoader
|
||||
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
jinja2_env.globals['pattern'] = dangerous
|
||||
template = "{{ 'aaaaaaaaaa' | regex_replace(pattern, 'x') }}"
|
||||
finalRender = jinja2_env.from_string(template).render()
|
||||
|
||||
# Should return original value when dangerous pattern is detected
|
||||
assert finalRender == "aaaaaaaaaa"
|
||||
|
||||
def test_regex_replace_security_timeout_protection():
|
||||
"""Test that regex_replace has timeout protection (if SIGALRM available)."""
|
||||
import signal
|
||||
|
||||
# Only test on systems that support SIGALRM
|
||||
if not hasattr(signal, 'SIGALRM'):
|
||||
# Skip test on Windows and other systems without SIGALRM
|
||||
return
|
||||
|
||||
# This pattern is known to cause exponential backtracking on certain inputs
|
||||
# but should be caught by our dangerous pattern detector
|
||||
# We're mainly testing that the timeout mechanism works
|
||||
|
||||
from changedetectionio.jinja2_custom import regex_replace
|
||||
|
||||
# Create input that could trigger slow regex
|
||||
test_input = "a" * 50 + "b"
|
||||
|
||||
# This shouldn't take long due to our protections
|
||||
result = regex_replace(test_input, "a+b", "x")
|
||||
|
||||
# Should complete and return a result
|
||||
assert result is not None
|
||||
@@ -2,7 +2,8 @@
|
||||
# coding=utf-8
|
||||
|
||||
import time
|
||||
from flask import url_for, escape
|
||||
from flask import url_for
|
||||
from markupsafe import escape
|
||||
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
|
||||
import pytest
|
||||
jq_support = True
|
||||
|
||||
@@ -101,7 +101,7 @@ def test_check_notification(client, live_server, measure_memory_usage):
|
||||
"Diff as Patch: {{diff_patch}}\n"
|
||||
":-)",
|
||||
"notification_screenshot": True,
|
||||
"notification_format": "Text"}
|
||||
"notification_format": 'Plain Text'}
|
||||
|
||||
notification_form_data.update({
|
||||
"url": test_url,
|
||||
@@ -267,7 +267,7 @@ def test_notification_validation(client, live_server, measure_memory_usage):
|
||||
# data={"notification_urls": 'json://localhost/foobar',
|
||||
# "notification_title": "",
|
||||
# "notification_body": "",
|
||||
# "notification_format": "Text",
|
||||
# "notification_format": 'Plain Text',
|
||||
# "url": test_url,
|
||||
# "tag": "my tag",
|
||||
# "title": "my title",
|
||||
@@ -383,7 +383,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
|
||||
assert 'second: hello world "space"' in notification_headers.lower()
|
||||
|
||||
|
||||
# Should always be automatically detected as JSON content type even when we set it as 'Text' (default)
|
||||
# Should always be automatically detected as JSON content type even when we set it as 'Plain Text' (default)
|
||||
assert os.path.isfile("test-datastore/notification-content-type.txt")
|
||||
with open("test-datastore/notification-content-type.txt", 'r') as f:
|
||||
assert 'application/json' in f.read()
|
||||
@@ -485,7 +485,7 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
|
||||
|
||||
def _test_color_notifications(client, notification_body_token):
|
||||
|
||||
from changedetectionio.diff import ADDED_STYLE, REMOVED_STYLE
|
||||
from changedetectionio.diff import HTML_ADDED_STYLE, HTML_REMOVED_STYLE
|
||||
|
||||
set_original_response()
|
||||
|
||||
@@ -533,7 +533,7 @@ def _test_color_notifications(client, notification_body_token):
|
||||
|
||||
with open("test-datastore/notification.txt", 'r') as f:
|
||||
x = f.read()
|
||||
assert f'<span style="{REMOVED_STYLE}">Which is across multiple lines' in x
|
||||
assert f'<span style="{HTML_REMOVED_STYLE}">Which is across multiple lines' in x
|
||||
|
||||
|
||||
client.get(
|
||||
@@ -541,9 +541,7 @@ def _test_color_notifications(client, notification_body_token):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Just checks the format of the colour notifications was correct
|
||||
def test_html_color_notifications(client, live_server, measure_memory_usage):
|
||||
|
||||
|
||||
_test_color_notifications(client, '{{diff}}')
|
||||
_test_color_notifications(client, '{{diff_full}}')
|
||||
|
||||
@@ -30,7 +30,7 @@ def test_check_notification_error_handling(client, live_server, measure_memory_u
|
||||
data={"notification_urls": f"{broken_notification_url}\r\n{working_notification_url}",
|
||||
"notification_title": "xxx",
|
||||
"notification_body": "xxxxx",
|
||||
"notification_format": "Text",
|
||||
"notification_format": 'Plain Text',
|
||||
"url": test_url,
|
||||
"tags": "",
|
||||
"title": "",
|
||||
|
||||
153
changedetectionio/tests/test_xpath_default_namespace.py
Normal file
153
changedetectionio/tests/test_xpath_default_namespace.py
Normal file
@@ -0,0 +1,153 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Unit tests for XPath default namespace handling in RSS/Atom feeds.
|
||||
Tests the fix for issue where //title/text() returns empty on feeds with default namespaces.
|
||||
|
||||
Real-world test data from https://github.com/microsoft/PowerToys/releases.atom
|
||||
"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
|
||||
import html_tools
|
||||
|
||||
|
||||
# Real-world Atom feed with default namespace from GitHub PowerToys releases
|
||||
# This is the actual format that was failing before the fix
|
||||
atom_feed_with_default_ns = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<feed xmlns="http://www.w3.org/2005/Atom" xmlns:media="http://search.yahoo.com/mrss/" xml:lang="en-US">
|
||||
<id>tag:github.com,2008:https://github.com/microsoft/PowerToys/releases</id>
|
||||
<link type="text/html" rel="alternate" href="https://github.com/microsoft/PowerToys/releases"/>
|
||||
<link type="application/atom+xml" rel="self" href="https://github.com/microsoft/PowerToys/releases.atom"/>
|
||||
<title>Release notes from PowerToys</title>
|
||||
<updated>2025-10-23T08:53:12Z</updated>
|
||||
<entry>
|
||||
<id>tag:github.com,2008:Repository/184456251/v0.95.1</id>
|
||||
<updated>2025-10-24T14:20:14Z</updated>
|
||||
<link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.1"/>
|
||||
<title>Release 0.95.1</title>
|
||||
<content type="html"><p>This patch release fixes several important stability issues.</p></content>
|
||||
<author>
|
||||
<name>Jaylyn-Barbee</name>
|
||||
</author>
|
||||
</entry>
|
||||
<entry>
|
||||
<id>tag:github.com,2008:Repository/184456251/v0.95.0</id>
|
||||
<updated>2025-10-17T12:51:21Z</updated>
|
||||
<link rel="alternate" type="text/html" href="https://github.com/microsoft/PowerToys/releases/tag/v0.95.0"/>
|
||||
<title>Release v0.95.0</title>
|
||||
<content type="html"><p>New features, stability, optimization improvements.</p></content>
|
||||
<author>
|
||||
<name>Jaylyn-Barbee</name>
|
||||
</author>
|
||||
</entry>
|
||||
</feed>"""
|
||||
|
||||
# RSS feed without default namespace
|
||||
rss_feed_no_default_ns = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss version="2.0">
|
||||
<channel>
|
||||
<title>Channel Title</title>
|
||||
<description>Channel Description</description>
|
||||
<item>
|
||||
<title>Item 1 Title</title>
|
||||
<description>Item 1 Description</description>
|
||||
</item>
|
||||
<item>
|
||||
<title>Item 2 Title</title>
|
||||
<description>Item 2 Description</description>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>"""
|
||||
|
||||
# RSS 2.0 feed with namespace prefix (not default)
|
||||
rss_feed_with_ns_prefix = """<?xml version="1.0" encoding="UTF-8"?>
|
||||
<rss xmlns:dc="http://purl.org/dc/elements/1.1/"
|
||||
xmlns:content="http://purl.org/rss/1.0/modules/content/"
|
||||
xmlns:atom="http://www.w3.org/2005/Atom"
|
||||
version="2.0">
|
||||
<channel>
|
||||
<title>Channel Title</title>
|
||||
<atom:link href="http://example.com/feed" rel="self" type="application/rss+xml"/>
|
||||
<item>
|
||||
<title>Item Title</title>
|
||||
<dc:creator>Author Name</dc:creator>
|
||||
</item>
|
||||
</channel>
|
||||
</rss>"""
|
||||
|
||||
|
||||
class TestXPathDefaultNamespace:
|
||||
"""Test XPath queries on feeds with and without default namespaces."""
|
||||
|
||||
def test_atom_feed_simple_xpath_with_xpath_filter(self):
|
||||
"""Test that //title/text() works on Atom feed with default namespace using xpath_filter."""
|
||||
result = html_tools.xpath_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
assert 'Release v0.95.0' in result
|
||||
|
||||
def test_atom_feed_nested_xpath_with_xpath_filter(self):
|
||||
"""Test nested XPath like //entry/title/text() on Atom feed."""
|
||||
result = html_tools.xpath_filter('//entry/title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release 0.95.1' in result
|
||||
assert 'Release v0.95.0' in result
|
||||
# Should NOT include the feed title
|
||||
assert 'Release notes from PowerToys' not in result
|
||||
|
||||
def test_atom_feed_other_elements_with_xpath_filter(self):
|
||||
"""Test that other elements like //updated/text() work on Atom feed."""
|
||||
result = html_tools.xpath_filter('//updated/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert '2025-10-23T08:53:12Z' in result
|
||||
assert '2025-10-24T14:20:14Z' in result
|
||||
|
||||
def test_rss_feed_without_namespace(self):
|
||||
"""Test that //title/text() works on RSS feed without default namespace."""
|
||||
result = html_tools.xpath_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item 1 Title' in result
|
||||
assert 'Item 2 Title' in result
|
||||
|
||||
def test_rss_feed_nested_xpath(self):
|
||||
"""Test nested XPath on RSS feed without default namespace."""
|
||||
result = html_tools.xpath_filter('//item/title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Item 1 Title' in result
|
||||
assert 'Item 2 Title' in result
|
||||
# Should NOT include channel title
|
||||
assert 'Channel Title' not in result
|
||||
|
||||
def test_rss_feed_with_prefixed_namespaces(self):
|
||||
"""Test that feeds with namespace prefixes (not default) still work."""
|
||||
result = html_tools.xpath_filter('//title/text()', rss_feed_with_ns_prefix, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item Title' in result
|
||||
|
||||
def test_local_name_workaround_still_works(self):
|
||||
"""Test that local-name() workaround still works for Atom feeds."""
|
||||
result = html_tools.xpath_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
|
||||
def test_xpath1_filter_without_default_namespace(self):
|
||||
"""Test xpath1_filter works on RSS without default namespace."""
|
||||
result = html_tools.xpath1_filter('//title/text()', rss_feed_no_default_ns, is_rss=True)
|
||||
assert 'Channel Title' in result
|
||||
assert 'Item 1 Title' in result
|
||||
|
||||
def test_xpath1_filter_with_default_namespace_returns_empty(self):
|
||||
"""Test that xpath1_filter returns empty on Atom with default namespace (known limitation)."""
|
||||
result = html_tools.xpath1_filter('//title/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
# xpath1_filter (lxml) doesn't support default namespaces, so this returns empty
|
||||
assert result == ''
|
||||
|
||||
def test_xpath1_filter_local_name_workaround(self):
|
||||
"""Test that xpath1_filter works with local-name() workaround on Atom feeds."""
|
||||
result = html_tools.xpath1_filter('//*[local-name()="title"]/text()', atom_feed_with_default_ns, is_rss=True)
|
||||
assert 'Release notes from PowerToys' in result
|
||||
assert 'Release 0.95.1' in result
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
pytest.main([__file__, '-v'])
|
||||
@@ -24,18 +24,18 @@ class TestDiffBuilder(unittest.TestCase):
|
||||
|
||||
output = output.split("\n")
|
||||
|
||||
|
||||
self.assertIn('(changed) ok', output)
|
||||
self.assertIn('(into) xok', output)
|
||||
self.assertIn('(into) next-x-ok', output)
|
||||
self.assertIn('(added) and something new', output)
|
||||
# Check that placeholders are present (they get replaced in apply_service_tweaks)
|
||||
self.assertTrue(any(diff.CHANGED_PLACEMARKER_OPEN in line and 'ok' in line for line in output))
|
||||
self.assertTrue(any(diff.CHANGED_INTO_PLACEMARKER_OPEN in line and 'xok' in line for line in output))
|
||||
self.assertTrue(any(diff.CHANGED_INTO_PLACEMARKER_OPEN in line and 'next-x-ok' in line for line in output))
|
||||
self.assertTrue(any(diff.ADDED_PLACEMARKER_OPEN in line and 'and something new' in line for line in output))
|
||||
|
||||
with open(base_dir + "/test-content/after-2.txt", 'r') as f:
|
||||
newest_version_file_contents = f.read()
|
||||
output = diff.render_diff(previous_version_file_contents, newest_version_file_contents)
|
||||
output = output.split("\n")
|
||||
self.assertIn('(removed) for having learned computerese,', output)
|
||||
self.assertIn('(removed) I continue to examine bits, bytes and words', output)
|
||||
self.assertTrue(any(diff.REMOVED_PLACEMARKER_OPEN in line and 'for having learned computerese,' in line for line in output))
|
||||
self.assertTrue(any(diff.REMOVED_PLACEMARKER_OPEN in line and 'I continue to examine bits, bytes and words' in line for line in output))
|
||||
|
||||
#diff_removed
|
||||
with open(base_dir + "/test-content/before.txt", 'r') as f:
|
||||
@@ -45,18 +45,18 @@ class TestDiffBuilder(unittest.TestCase):
|
||||
newest_version_file_contents = f.read()
|
||||
output = diff.render_diff(previous_version_file_contents, newest_version_file_contents, include_equal=False, include_removed=True, include_added=False)
|
||||
output = output.split("\n")
|
||||
self.assertIn('(changed) ok', output)
|
||||
self.assertIn('(into) xok', output)
|
||||
self.assertIn('(into) next-x-ok', output)
|
||||
self.assertNotIn('(added) and something new', output)
|
||||
self.assertTrue(any(diff.CHANGED_PLACEMARKER_OPEN in line and 'ok' in line for line in output))
|
||||
self.assertTrue(any(diff.CHANGED_INTO_PLACEMARKER_OPEN in line and 'xok' in line for line in output))
|
||||
self.assertTrue(any(diff.CHANGED_INTO_PLACEMARKER_OPEN in line and 'next-x-ok' in line for line in output))
|
||||
self.assertFalse(any(diff.ADDED_PLACEMARKER_OPEN in line and 'and something new' in line for line in output))
|
||||
|
||||
#diff_removed
|
||||
with open(base_dir + "/test-content/after-2.txt", 'r') as f:
|
||||
newest_version_file_contents = f.read()
|
||||
output = diff.render_diff(previous_version_file_contents, newest_version_file_contents, include_equal=False, include_removed=True, include_added=False)
|
||||
output = output.split("\n")
|
||||
self.assertIn('(removed) for having learned computerese,', output)
|
||||
self.assertIn('(removed) I continue to examine bits, bytes and words', output)
|
||||
self.assertTrue(any(diff.REMOVED_PLACEMARKER_OPEN in line and 'for having learned computerese,' in line for line in output))
|
||||
self.assertTrue(any(diff.REMOVED_PLACEMARKER_OPEN in line and 'I continue to examine bits, bytes and words' in line for line in output))
|
||||
|
||||
def test_expected_diff_patch_output(self):
|
||||
base_dir = os.path.dirname(__file__)
|
||||
|
||||
@@ -10,14 +10,14 @@ flask_restful
|
||||
flask_cors # For the Chrome extension to operate
|
||||
janus # Thread-safe async/sync queue bridge
|
||||
flask_wtf~=1.2
|
||||
flask~=2.3
|
||||
flask~=3.1
|
||||
flask-socketio~=5.5.1
|
||||
python-socketio~=5.13.0
|
||||
python-engineio~=4.12.3
|
||||
inscriptis~=2.2
|
||||
pytz
|
||||
timeago~=1.0
|
||||
validators~=0.21
|
||||
validators~=0.35
|
||||
|
||||
|
||||
# Set these versions together to avoid a RequestsDependencyWarning
|
||||
@@ -56,7 +56,7 @@ cryptography==44.0.1
|
||||
paho-mqtt!=2.0.*
|
||||
|
||||
# Used for CSS filtering, JSON extraction from HTML
|
||||
beautifulsoup4>=4.0.0,<=4.13.5
|
||||
beautifulsoup4>=4.0.0,<=4.14.2
|
||||
|
||||
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
|
||||
# #2328 - 5.2.0 and 5.2.1 had extra CPU flag CFLAGS set which was not compatible on older hardware
|
||||
@@ -66,14 +66,10 @@ lxml >=4.8.0,<6,!=5.2.0,!=5.2.1
|
||||
|
||||
# XPath 2.0-3.1 support - 4.2.0 had issues, 4.1.5 stable
|
||||
# Consider updating to latest stable version periodically
|
||||
elementpath==4.1.5
|
||||
elementpath==5.0.4
|
||||
|
||||
selenium~=4.31.0
|
||||
|
||||
# https://github.com/pallets/werkzeug/issues/2985
|
||||
# Maybe related to pytest?
|
||||
werkzeug==3.0.6
|
||||
|
||||
# Templating, so far just in the URLs but in the future can be for the notifications also
|
||||
jinja2~=3.1
|
||||
arrow
|
||||
|
||||
Reference in New Issue
Block a user