mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-01 07:08:47 +00:00
Compare commits
18 Commits
flask-upda
...
email-noti
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6c54b4344f | ||
|
|
c30da09861 | ||
|
|
c0d51402ab | ||
|
|
7ab68380aa | ||
|
|
74792685c9 | ||
|
|
6647d0bb09 | ||
|
|
785dabd071 | ||
|
|
09914d54a0 | ||
|
|
58b5586674 | ||
|
|
cb02ccc8b4 | ||
|
|
ec692ed727 | ||
|
|
2fb2ea573e | ||
|
|
ada2dc6112 | ||
|
|
ad9024a4f0 | ||
|
|
047c10e23c | ||
|
|
4f83164544 | ||
|
|
6f926ed595 | ||
|
|
249dc55212 |
27
.github/workflows/pypi-release.yml
vendored
27
.github/workflows/pypi-release.yml
vendored
@@ -28,7 +28,7 @@ jobs:
|
||||
|
||||
|
||||
test-pypi-package:
|
||||
name: Test the built 📦 package works basically.
|
||||
name: Test the built package works basically.
|
||||
runs-on: ubuntu-latest
|
||||
needs:
|
||||
- build
|
||||
@@ -42,18 +42,39 @@ jobs:
|
||||
uses: actions/setup-python@v6
|
||||
with:
|
||||
python-version: '3.11'
|
||||
|
||||
- name: Test that the basic pip built package runs without error
|
||||
run: |
|
||||
set -ex
|
||||
ls -alR
|
||||
|
||||
# Find and install the first .whl file
|
||||
find dist -type f -name "*.whl" -exec pip3 install {} \; -quit
|
||||
# Install the first wheel found in dist/
|
||||
WHEEL=$(find dist -type f -name "*.whl" -print -quit)
|
||||
echo Installing $WHEEL
|
||||
python3 -m pip install --upgrade pip
|
||||
python3 -m pip install "$WHEEL"
|
||||
changedetection.io -d /tmp -p 10000 &
|
||||
|
||||
sleep 3
|
||||
curl --retry-connrefused --retry 6 http://127.0.0.1:10000/static/styles/pure-min.css >/dev/null
|
||||
curl --retry-connrefused --retry 6 http://127.0.0.1:10000/ >/dev/null
|
||||
|
||||
# --- API test ---
|
||||
# This also means that the docs/api-spec.yml was shipped and could be read
|
||||
test -f /tmp/url-watches.json
|
||||
API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/url-watches.json)
|
||||
echo Test API KEY is $API_KEY
|
||||
curl -X POST "http://127.0.0.1:10000/api/v1/watch" \
|
||||
-H "x-api-key: ${API_KEY}" \
|
||||
-H "Content-Type: application/json" \
|
||||
--show-error --fail \
|
||||
--retry 6 --retry-delay 1 --retry-connrefused \
|
||||
-d '{
|
||||
"url": "https://example.com",
|
||||
"title": "Example Site Monitor",
|
||||
"time_between_check": { "hours": 1 }
|
||||
}'
|
||||
|
||||
killall changedetection.io
|
||||
|
||||
|
||||
|
||||
@@ -54,7 +54,10 @@ jobs:
|
||||
|
||||
- name: Spin up ancillary SMTP+Echo message test server
|
||||
run: |
|
||||
# Debug SMTP server/echo message back server
|
||||
# Debug SMTP server/echo message back server, telnet 11080 to it should immediately bounce back the most recent message that tried to send (then you can see if cdio tried to send, the format, etc)
|
||||
# 11025 is the SMTP port for testing
|
||||
# apprise example would be 'mailto://changedetection@localhost:11025/?to=fff@home.com (it will also echo to STDOUT)
|
||||
# telnet localhost 11080
|
||||
docker run --network changedet-network -d -p 11025:11025 -p 11080:11080 --hostname mailserver test-changedetectionio bash -c 'pip3 install aiosmtpd && python changedetectionio/tests/smtp/smtp-test-server.py'
|
||||
docker ps
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
recursive-include changedetectionio/api *
|
||||
include docs/api-spec.yaml
|
||||
recursive-include changedetectionio/blueprint *
|
||||
recursive-include changedetectionio/conditions *
|
||||
recursive-include changedetectionio/content_fetchers *
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
|
||||
__version__ = '0.50.23'
|
||||
__version__ = '0.50.27'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
|
||||
@@ -37,6 +37,10 @@ def get_openapi_spec():
|
||||
from openapi_core import OpenAPI # Lazy import - saves ~10.7 MB on startup
|
||||
|
||||
spec_path = os.path.join(os.path.dirname(__file__), '../../docs/api-spec.yaml')
|
||||
if not os.path.exists(spec_path):
|
||||
# Possibly for pip3 packages
|
||||
spec_path = os.path.join(os.path.dirname(__file__), '../docs/api-spec.yaml')
|
||||
|
||||
with open(spec_path, 'r') as f:
|
||||
spec_dict = yaml.safe_load(f)
|
||||
_openapi_spec = OpenAPI.from_dict(spec_dict)
|
||||
|
||||
@@ -2,6 +2,7 @@ from flask import Blueprint, request, make_response
|
||||
import random
|
||||
from loguru import logger
|
||||
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.auth_decorator import login_optionally_required
|
||||
|
||||
@@ -19,6 +20,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
import apprise
|
||||
from changedetectionio.notification.handler import process_notification
|
||||
from changedetectionio.notification.apprise_plugin.assets import apprise_asset
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
|
||||
from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||
|
||||
@@ -61,16 +63,20 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
return 'Error: No Notification URLs set/found'
|
||||
|
||||
for n_url in notification_urls:
|
||||
# We are ONLY validating the apprise:// part here, convert all tags to something so as not to break apprise URLs
|
||||
generic_notification_context_data = NotificationContextData()
|
||||
generic_notification_context_data.set_random_for_validation()
|
||||
n_url = jinja_render(template_str=n_url, **generic_notification_context_data).strip()
|
||||
if len(n_url.strip()):
|
||||
if not apobj.add(n_url):
|
||||
return f'Error: {n_url} is not a valid AppRise URL.'
|
||||
|
||||
try:
|
||||
# use the same as when it is triggered, but then override it with the form test values
|
||||
n_object = {
|
||||
n_object = NotificationContextData({
|
||||
'watch_url': request.form.get('window_url', "https://changedetection.io"),
|
||||
'notification_urls': notification_urls
|
||||
}
|
||||
})
|
||||
|
||||
# Only use if present, if not set in n_object it should use the default system value
|
||||
if 'notification_format' in request.form and request.form['notification_format'].strip():
|
||||
|
||||
@@ -5,6 +5,7 @@ from wtforms.widgets.core import TimeInput
|
||||
|
||||
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES
|
||||
from changedetectionio.conditions.form import ConditionFormRow
|
||||
from changedetectionio.notification_service import NotificationContextData
|
||||
from changedetectionio.strtobool import strtobool
|
||||
|
||||
from wtforms import (
|
||||
@@ -469,11 +470,16 @@ class ValidateAppRiseServers(object):
|
||||
import apprise
|
||||
from .notification.apprise_plugin.assets import apprise_asset
|
||||
from .notification.apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
|
||||
apobj = apprise.Apprise(asset=apprise_asset)
|
||||
|
||||
for server_url in field.data:
|
||||
url = server_url.strip()
|
||||
generic_notification_context_data = NotificationContextData()
|
||||
# Make sure something is atleast in all those regular token fields
|
||||
generic_notification_context_data.set_random_for_validation()
|
||||
|
||||
url = jinja_render(template_str=server_url.strip(), **generic_notification_context_data).strip()
|
||||
if url.startswith("#"):
|
||||
continue
|
||||
|
||||
@@ -500,7 +506,7 @@ class ValidateJinja2Template(object):
|
||||
jinja2_env = create_jinja_env(loader=BaseLoader)
|
||||
|
||||
# Add notification tokens for validation
|
||||
jinja2_env.globals.update(notification.valid_tokens)
|
||||
jinja2_env.globals.update(NotificationContextData())
|
||||
if hasattr(field, 'extra_notification_tokens'):
|
||||
jinja2_env.globals.update(field.extra_notification_tokens)
|
||||
|
||||
|
||||
@@ -408,6 +408,9 @@ def strip_ignore_text(content, wordlist, mode="content"):
|
||||
ignored_lines = []
|
||||
|
||||
for k in wordlist:
|
||||
# Skip empty strings to avoid matching everything
|
||||
if not k or not k.strip():
|
||||
continue
|
||||
# Is it a regex?
|
||||
res = re.search(PERL_STYLE_REGEX, k, re.IGNORECASE)
|
||||
if res:
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
from changedetectionio.model import default_notification_format_for_watch
|
||||
|
||||
ult_notification_format_for_watch = 'System default'
|
||||
default_notification_format = 'HTML Color'
|
||||
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
|
||||
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
|
||||
@@ -16,20 +15,3 @@ valid_notification_formats = {
|
||||
default_notification_format_for_watch: default_notification_format_for_watch
|
||||
}
|
||||
|
||||
|
||||
valid_tokens = {
|
||||
'base_url': '',
|
||||
'current_snapshot': '',
|
||||
'diff': '',
|
||||
'diff_added': '',
|
||||
'diff_full': '',
|
||||
'diff_patch': '',
|
||||
'diff_removed': '',
|
||||
'diff_url': '',
|
||||
'preview_url': '',
|
||||
'triggered_text': '',
|
||||
'watch_tag': '',
|
||||
'watch_title': '',
|
||||
'watch_url': '',
|
||||
'watch_uuid': '',
|
||||
}
|
||||
|
||||
@@ -70,6 +70,7 @@ def apprise_http_custom_handler(
|
||||
title: str,
|
||||
notify_type: str,
|
||||
meta: dict,
|
||||
body_format: str = None,
|
||||
*args,
|
||||
**kwargs,
|
||||
) -> bool:
|
||||
|
||||
@@ -1,18 +1,89 @@
|
||||
|
||||
import time
|
||||
import apprise
|
||||
from apprise import NotifyFormat
|
||||
from loguru import logger
|
||||
from urllib.parse import urlparse
|
||||
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
|
||||
from .apprise_plugin.custom_handlers import SUPPORTED_HTTP_METHODS
|
||||
from ..notification_service import NotificationContextData
|
||||
|
||||
def process_notification(n_object, datastore):
|
||||
|
||||
def markup_text_links_to_html(body):
|
||||
"""
|
||||
Convert plaintext to HTML with clickable links.
|
||||
Uses Jinja2's escape and Markup for XSS safety.
|
||||
"""
|
||||
from linkify_it import LinkifyIt
|
||||
from markupsafe import Markup, escape
|
||||
|
||||
linkify = LinkifyIt()
|
||||
|
||||
# Match URLs in the ORIGINAL text (before escaping)
|
||||
matches = linkify.match(body)
|
||||
|
||||
if not matches:
|
||||
# No URLs, just escape everything
|
||||
return Markup(escape(body))
|
||||
|
||||
result = []
|
||||
last_index = 0
|
||||
|
||||
# Process each URL match
|
||||
for match in matches:
|
||||
# Add escaped text before the URL
|
||||
if match.index > last_index:
|
||||
text_part = body[last_index:match.index]
|
||||
result.append(escape(text_part))
|
||||
|
||||
# Add the link with escaped URL (both in href and display)
|
||||
url = match.url
|
||||
result.append(Markup(f'<a href="{escape(url)}">{escape(url)}</a>'))
|
||||
|
||||
last_index = match.last_index
|
||||
|
||||
# Add remaining escaped text
|
||||
if last_index < len(body):
|
||||
result.append(escape(body[last_index:]))
|
||||
|
||||
# Join all parts
|
||||
return str(Markup(''.join(str(part) for part in result)))
|
||||
|
||||
def notification_format_align_with_apprise(n_format : str):
|
||||
"""
|
||||
Correctly align changedetection's formats with apprise's formats
|
||||
Probably these are the same - but good to be sure.
|
||||
:param n_format:
|
||||
:return:
|
||||
"""
|
||||
|
||||
if n_format.lower().startswith('html'):
|
||||
# Apprise only knows 'html' not 'htmlcolor' etc, which shouldnt matter here
|
||||
n_format = NotifyFormat.HTML.value
|
||||
elif n_format.lower().startswith('markdown'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.MARKDOWN.value
|
||||
elif n_format.lower().startswith('text'):
|
||||
# probably the same but just to be safe
|
||||
n_format = NotifyFormat.TEXT.value
|
||||
else:
|
||||
n_format = NotifyFormat.TEXT.value
|
||||
|
||||
return n_format
|
||||
|
||||
def process_notification(n_object: NotificationContextData, datastore):
|
||||
from changedetectionio.jinja2_custom import render as jinja_render
|
||||
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
|
||||
# be sure its registered
|
||||
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
|
||||
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
|
||||
now = time.time()
|
||||
if n_object.get('notification_timestamp'):
|
||||
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
|
||||
|
||||
# Insert variables into the notification content
|
||||
notification_parameters = create_notification_parameters(n_object, datastore)
|
||||
|
||||
@@ -24,7 +95,9 @@ def process_notification(n_object, datastore):
|
||||
# If we arrived with 'System default' then look it up
|
||||
if n_format == default_notification_format_for_watch and datastore.data['settings']['application'].get('notification_format') != default_notification_format_for_watch:
|
||||
# Initially text or whatever
|
||||
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
|
||||
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format]).lower()
|
||||
|
||||
n_format = notification_format_align_with_apprise(n_format=n_format)
|
||||
|
||||
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
|
||||
|
||||
@@ -47,7 +120,11 @@ def process_notification(n_object, datastore):
|
||||
|
||||
# Get the notification body from datastore
|
||||
n_body = jinja_render(template_str=n_object.get('notification_body', ''), **notification_parameters)
|
||||
if n_object.get('notification_format', '').startswith('HTML'):
|
||||
|
||||
if n_object.get('markup_text_to_html'):
|
||||
n_body = markup_text_links_to_html(body=n_body)
|
||||
|
||||
if n_format == NotifyFormat.HTML.value:
|
||||
n_body = n_body.replace("\n", '<br>')
|
||||
|
||||
n_title = jinja_render(template_str=n_object.get('notification_title', ''), **notification_parameters)
|
||||
@@ -71,7 +148,8 @@ def process_notification(n_object, datastore):
|
||||
# Length of URL - Incase they specify a longer custom avatar_url
|
||||
|
||||
# So if no avatar_url is specified, add one so it can be correctly calculated into the total payload
|
||||
k = '?' if not '?' in url else '&'
|
||||
parsed = urlparse(url)
|
||||
k = '?' if not parsed.query else '&'
|
||||
if not 'avatar_url' in url \
|
||||
and not url.startswith('mail') \
|
||||
and not url.startswith('post') \
|
||||
@@ -100,16 +178,15 @@ def process_notification(n_object, datastore):
|
||||
n_title = n_title[0:payload_max_size]
|
||||
n_body = n_body[0:body_limit]
|
||||
|
||||
elif url.startswith('mailto'):
|
||||
# Apprise will default to HTML, so we need to override it
|
||||
# So that whats' generated in n_body is in line with what is going to be sent.
|
||||
# https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
|
||||
if not 'format=' in url and (n_format == 'Text' or n_format == 'Markdown'):
|
||||
prefix = '?' if not '?' in url else '&'
|
||||
# Apprise format is lowercase text https://github.com/caronc/apprise/issues/633
|
||||
n_format = n_format.lower()
|
||||
url = f"{url}{prefix}format={n_format}"
|
||||
# If n_format == HTML, then apprise email should default to text/html and we should be sending HTML only
|
||||
# Add format parameter to mailto URLs to ensure proper text/html handling
|
||||
# https://github.com/caronc/apprise/issues/633#issuecomment-1191449321
|
||||
# Note: Custom handlers (post://, get://, etc.) don't need this as we handle them
|
||||
# differently by passing an invalid body_format to prevent HTML conversion
|
||||
if not 'format=' in url and url.startswith(('mailto', 'mailtos')):
|
||||
parsed = urlparse(url)
|
||||
prefix = '?' if not parsed.query else '&'
|
||||
# Apprise format is already lowercase from notification_format_align_with_apprise()
|
||||
url = f"{url}{prefix}format={n_format}"
|
||||
|
||||
apobj.add(url)
|
||||
|
||||
@@ -119,10 +196,28 @@ def process_notification(n_object, datastore):
|
||||
'body_format': n_format})
|
||||
|
||||
# Blast off the notifications tht are set in .add()
|
||||
# Check if we have any custom HTTP handlers (post://, get://, etc.)
|
||||
# These handlers created with @notify decorator don't handle format conversion properly
|
||||
# and will strip HTML if we pass a valid format. So we pass an invalid format string
|
||||
# to prevent Apprise from converting HTML->TEXT
|
||||
|
||||
# Create list of custom handler protocols (both http and https versions)
|
||||
custom_handler_protocols = [f"{method}://" for method in SUPPORTED_HTTP_METHODS]
|
||||
custom_handler_protocols += [f"{method}s://" for method in SUPPORTED_HTTP_METHODS]
|
||||
|
||||
has_custom_handler = any(
|
||||
url.startswith(tuple(custom_handler_protocols))
|
||||
for url in n_object['notification_urls']
|
||||
)
|
||||
|
||||
# If we have custom handlers, use invalid format to prevent conversion
|
||||
# Otherwise use the proper format
|
||||
notify_format = 'raw-no-convert' if has_custom_handler else n_format
|
||||
|
||||
apobj.notify(
|
||||
title=n_title,
|
||||
body=n_body,
|
||||
body_format=n_format,
|
||||
body_format=notify_format,
|
||||
# False is not an option for AppRise, must be type None
|
||||
attach=n_object.get('screenshot', None)
|
||||
)
|
||||
@@ -131,7 +226,7 @@ def process_notification(n_object, datastore):
|
||||
# Returns empty string if nothing found, multi-line string otherwise
|
||||
log_value = logs.getvalue()
|
||||
|
||||
if log_value and 'WARNING' in log_value or 'ERROR' in log_value:
|
||||
if log_value and ('WARNING' in log_value or 'ERROR' in log_value):
|
||||
logger.critical(log_value)
|
||||
raise Exception(log_value)
|
||||
|
||||
@@ -141,17 +236,15 @@ def process_notification(n_object, datastore):
|
||||
|
||||
# Notification title + body content parameters get created here.
|
||||
# ( Where we prepare the tokens in the notification to be replaced with actual values )
|
||||
def create_notification_parameters(n_object, datastore):
|
||||
from copy import deepcopy
|
||||
from . import valid_tokens
|
||||
def create_notification_parameters(n_object: NotificationContextData, datastore):
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
|
||||
# in the case we send a test notification from the main settings, there is no UUID.
|
||||
uuid = n_object['uuid'] if 'uuid' in n_object else ''
|
||||
|
||||
if uuid:
|
||||
watch_title = datastore.data['watching'][uuid].label
|
||||
watch = datastore.data['watching'].get(n_object['uuid'])
|
||||
if watch:
|
||||
watch_title = datastore.data['watching'][n_object['uuid']].label
|
||||
tag_list = []
|
||||
tags = datastore.get_all_tags_for_watch(uuid)
|
||||
tags = datastore.get_all_tags_for_watch(n_object['uuid'])
|
||||
if tags:
|
||||
for tag_uuid, tag in tags.items():
|
||||
tag_list.append(tag.get('title'))
|
||||
@@ -166,14 +259,10 @@ def create_notification_parameters(n_object, datastore):
|
||||
|
||||
watch_url = n_object['watch_url']
|
||||
|
||||
diff_url = "{}/diff/{}".format(base_url, uuid)
|
||||
preview_url = "{}/preview/{}".format(base_url, uuid)
|
||||
diff_url = "{}/diff/{}".format(base_url, n_object['uuid'])
|
||||
preview_url = "{}/preview/{}".format(base_url, n_object['uuid'])
|
||||
|
||||
# Not sure deepcopy is needed here, but why not
|
||||
tokens = deepcopy(valid_tokens)
|
||||
|
||||
# Valid_tokens also used as a field validator
|
||||
tokens.update(
|
||||
n_object.update(
|
||||
{
|
||||
'base_url': base_url,
|
||||
'diff_url': diff_url,
|
||||
@@ -181,13 +270,10 @@ def create_notification_parameters(n_object, datastore):
|
||||
'watch_tag': watch_tag if watch_tag is not None else '',
|
||||
'watch_title': watch_title if watch_title is not None else '',
|
||||
'watch_url': watch_url,
|
||||
'watch_uuid': uuid,
|
||||
'watch_uuid': n_object['uuid'],
|
||||
})
|
||||
|
||||
# n_object will contain diff, diff_added etc etc
|
||||
tokens.update(n_object)
|
||||
if watch:
|
||||
n_object.update(datastore.data['watching'].get(n_object['uuid']).extra_notification_token_values())
|
||||
|
||||
if uuid:
|
||||
tokens.update(datastore.data['watching'].get(uuid).extra_notification_token_values())
|
||||
|
||||
return tokens
|
||||
return n_object
|
||||
|
||||
@@ -6,9 +6,51 @@ Extracted from update_worker.py to provide standalone notification functionality
|
||||
for both sync and async workers
|
||||
"""
|
||||
|
||||
import time
|
||||
from loguru import logger
|
||||
import time
|
||||
|
||||
from changedetectionio.notification import default_notification_format
|
||||
|
||||
# What is passed around as notification context, also used as the complete list of valid {{ tokens }}
|
||||
class NotificationContextData(dict):
|
||||
def __init__(self, initial_data=None, **kwargs):
|
||||
super().__init__({
|
||||
'current_snapshot': None,
|
||||
'diff': None,
|
||||
'diff_added': None,
|
||||
'diff_full': None,
|
||||
'diff_patch': None,
|
||||
'diff_removed': None,
|
||||
'notification_timestamp': time.time(),
|
||||
'screenshot': None,
|
||||
'triggered_text': None,
|
||||
'uuid': 'XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX', # Converted to 'watch_uuid' in create_notification_parameters
|
||||
'watch_url': 'https://WATCH-PLACE-HOLDER/',
|
||||
'base_url': None,
|
||||
'diff_url': None,
|
||||
'preview_url': None,
|
||||
'watch_tag': None,
|
||||
'watch_title': None,
|
||||
'markup_text_to_html': False, # If automatic conversion of plaintext to HTML should happen
|
||||
})
|
||||
|
||||
# Apply any initial data passed in
|
||||
self.update({'watch_uuid': self.get('uuid')})
|
||||
if initial_data:
|
||||
self.update(initial_data)
|
||||
|
||||
# Apply any keyword arguments
|
||||
if kwargs:
|
||||
self.update(kwargs)
|
||||
|
||||
def set_random_for_validation(self):
|
||||
import random, string
|
||||
"""Randomly fills all dict keys with random strings (for validation/testing)."""
|
||||
for key in self.keys():
|
||||
if key in ['uuid', 'time', 'watch_uuid']:
|
||||
continue
|
||||
rand_str = 'RANDOM-PLACEHOLDER-'+''.join(random.choices(string.ascii_letters + string.digits, k=12))
|
||||
self[key] = rand_str
|
||||
|
||||
class NotificationService:
|
||||
"""
|
||||
@@ -20,13 +62,16 @@ class NotificationService:
|
||||
self.datastore = datastore
|
||||
self.notification_q = notification_q
|
||||
|
||||
def queue_notification_for_watch(self, n_object, watch):
|
||||
def queue_notification_for_watch(self, n_object: NotificationContextData, watch):
|
||||
"""
|
||||
Queue a notification for a watch with full diff rendering and template variables
|
||||
"""
|
||||
from changedetectionio import diff
|
||||
from changedetectionio.notification import default_notification_format_for_watch
|
||||
|
||||
if not isinstance(n_object, NotificationContextData):
|
||||
raise TypeError(f"Expected NotificationContextData, got {type(n_object)}")
|
||||
|
||||
dates = []
|
||||
trigger_text = ''
|
||||
|
||||
@@ -79,15 +124,15 @@ class NotificationService:
|
||||
n_object.update({
|
||||
'current_snapshot': snapshot_contents,
|
||||
'diff': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep),
|
||||
'diff_added': diff.render_diff(prev_snapshot, current_snapshot, include_removed=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_full': diff.render_diff(prev_snapshot, current_snapshot, include_equal=True, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'diff_patch': diff.render_diff(prev_snapshot, current_snapshot, line_feed_sep=line_feed_sep, patch_format=True),
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep),
|
||||
'notification_timestamp': now,
|
||||
'diff_removed': diff.render_diff(prev_snapshot, current_snapshot, include_added=False, line_feed_sep=line_feed_sep, html_colour=html_colour_enable),
|
||||
'screenshot': watch.get_screenshot() if watch and watch.get('notification_screenshot') else None,
|
||||
'triggered_text': triggered_text,
|
||||
'uuid': watch.get('uuid') if watch else None,
|
||||
'watch_url': watch.get('url') if watch else None,
|
||||
'watch_uuid': watch.get('uuid') if watch else None,
|
||||
})
|
||||
|
||||
if watch:
|
||||
@@ -140,7 +185,7 @@ class NotificationService:
|
||||
"""
|
||||
Send notification when content changes are detected
|
||||
"""
|
||||
n_object = {}
|
||||
n_object = NotificationContextData()
|
||||
watch = self.datastore.data['watching'].get(watch_uuid)
|
||||
if not watch:
|
||||
return
|
||||
@@ -183,11 +228,26 @@ class NotificationService:
|
||||
if not watch:
|
||||
return
|
||||
|
||||
n_object = {'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
|
||||
'notification_body': "Your configured CSS/xPath filters of '{}' for {{{{watch_url}}}} did not appear on the page after {} attempts, did the page change layout?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\nThanks - Your omniscient changedetection.io installation :)\n".format(
|
||||
", ".join(watch['include_filters']),
|
||||
threshold),
|
||||
'notification_format': 'text'}
|
||||
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format)
|
||||
filter_list = ", ".join(watch['include_filters'])
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed
|
||||
body = f"""Hello,
|
||||
|
||||
Your configured CSS/xPath filters of '{filter_list}' for {{{{watch_url}}}} did not appear on the page after {threshold} attempts.
|
||||
|
||||
It's possible the page changed layout and the filter needs updating ( Try the 'Visual Selector' tab )
|
||||
|
||||
Edit link: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}
|
||||
|
||||
Thanks - Your omniscient changedetection.io installation.
|
||||
"""
|
||||
|
||||
n_object = NotificationContextData({
|
||||
'notification_title': 'Changedetection.io - Alert - CSS/xPath filter was not present in the page',
|
||||
'notification_body': body,
|
||||
'notification_format': n_format,
|
||||
'markup_text_to_html': n_format.lower().startswith('html')
|
||||
})
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
@@ -215,12 +275,28 @@ class NotificationService:
|
||||
if not watch:
|
||||
return
|
||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts')
|
||||
n_object = {'notification_title': "Changedetection.io - Alert - Browser step at position {} could not be run".format(step_n+1),
|
||||
'notification_body': "Your configured browser step at position {} for {{{{watch_url}}}} "
|
||||
"did not appear on the page after {} attempts, did the page change layout? "
|
||||
"Does it need a delay added?\n\nLink: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}\n\n"
|
||||
"Thanks - Your omniscient changedetection.io installation :)\n".format(step_n+1, threshold),
|
||||
'notification_format': 'text'}
|
||||
n_format = self.datastore.data['settings']['application'].get('notification_format', default_notification_format).lower()
|
||||
step = step_n + 1
|
||||
# @todo - This could be a markdown template on the disk, apprise will convert the markdown to HTML+Plaintext parts in the email, and then 'markup_text_to_html' is not needed
|
||||
|
||||
# {{{{ }}}} because this will be Jinja2 {{ }} tokens
|
||||
body = f"""Hello,
|
||||
|
||||
Your configured browser step at position {step} for the web page watch {{{{watch_url}}}} did not appear on the page after {threshold} attempts, did the page change layout?
|
||||
|
||||
The element may have moved and needs editing, or does it need a delay added?
|
||||
|
||||
Edit link: {{{{base_url}}}}/edit/{{{{watch_uuid}}}}
|
||||
|
||||
Thanks - Your omniscient changedetection.io installation.
|
||||
"""
|
||||
|
||||
n_object = NotificationContextData({
|
||||
'notification_title': f"Changedetection.io - Alert - Browser step at position {step} could not be run",
|
||||
'notification_body': body,
|
||||
'notification_format': n_format,
|
||||
'markup_text_to_html': n_format.lower().startswith('html')
|
||||
})
|
||||
|
||||
if len(watch['notification_urls']):
|
||||
n_object['notification_urls'] = watch['notification_urls']
|
||||
|
||||
@@ -324,13 +324,13 @@ class ContentProcessor:
|
||||
append_pretty_line_formatting=not self.watch.is_source_type_url
|
||||
)
|
||||
|
||||
# Raise error if filter returned nothing
|
||||
if not filtered_content.strip():
|
||||
raise FilterNotFoundInResponse(
|
||||
msg=self.filter_config.include_filters,
|
||||
screenshot=self.fetcher.screenshot,
|
||||
xpath_data=self.fetcher.xpath_data
|
||||
)
|
||||
# Raise error if filter returned nothing
|
||||
if not filtered_content.strip():
|
||||
raise FilterNotFoundInResponse(
|
||||
msg=self.filter_config.include_filters,
|
||||
screenshot=self.fetcher.screenshot,
|
||||
xpath_data=self.fetcher.xpath_data
|
||||
)
|
||||
|
||||
return filtered_content
|
||||
|
||||
|
||||
@@ -15,7 +15,7 @@ find tests/test_*py -type f|while read test_name
|
||||
do
|
||||
echo "TEST RUNNING $test_name"
|
||||
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest $test_name
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --tb=long $test_name
|
||||
done
|
||||
|
||||
echo "RUNNING WITH BASE_URL SET"
|
||||
@@ -23,20 +23,20 @@ echo "RUNNING WITH BASE_URL SET"
|
||||
# Now re-run some tests with BASE_URL enabled
|
||||
# Re #65 - Ability to include a link back to the installation, in the notification.
|
||||
export BASE_URL="https://really-unique-domain.io"
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest tests/test_notification.py
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notification.py
|
||||
|
||||
|
||||
# Re-run with HIDE_REFERER set - could affect login
|
||||
export HIDE_REFERER=True
|
||||
pytest tests/test_access_control.py
|
||||
pytest -vv -s --maxfail=1 tests/test_access_control.py
|
||||
|
||||
# Re-run a few tests that will trigger brotli based storage
|
||||
export SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5
|
||||
pytest tests/test_access_control.py
|
||||
pytest -vv -s --maxfail=1 tests/test_access_control.py
|
||||
REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest tests/test_notification.py
|
||||
pytest tests/test_backend.py
|
||||
pytest tests/test_rss.py
|
||||
pytest tests/test_unique_lines.py
|
||||
pytest -vv -s --maxfail=1 tests/test_backend.py
|
||||
pytest -vv -s --maxfail=1 tests/test_rss.py
|
||||
pytest -vv -s --maxfail=1 tests/test_unique_lines.py
|
||||
|
||||
# Try high concurrency
|
||||
FETCH_WORKERS=130 pytest tests/test_history_consistency.py -v -l
|
||||
|
||||
@@ -228,26 +228,36 @@ class ChangeDetectionStore:
|
||||
d['settings']['application']['active_base_url'] = active_base_url.strip('" ')
|
||||
return d
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
def delete_path(self, path: Path):
|
||||
import shutil
|
||||
"""Delete a file or directory tree, including the path itself."""
|
||||
if not path.exists():
|
||||
return
|
||||
if path.is_file() or path.is_symlink():
|
||||
path.unlink(missing_ok=True) # deletes a file or symlink
|
||||
else:
|
||||
shutil.rmtree(path, ignore_errors=True) # deletes dir *and* its contents
|
||||
|
||||
# Delete a single watch by UUID
|
||||
def delete(self, uuid):
|
||||
import pathlib
|
||||
import shutil
|
||||
|
||||
with self.lock:
|
||||
if uuid == 'all':
|
||||
self.__data['watching'] = {}
|
||||
time.sleep(1) # Mainly used for testing to allow all items to flush before running next test
|
||||
|
||||
# GitHub #30 also delete history records
|
||||
for uuid in self.data['watching']:
|
||||
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
self.delete(uuid)
|
||||
|
||||
else:
|
||||
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
self.delete_path(path)
|
||||
|
||||
del self.data['watching'][uuid]
|
||||
|
||||
self.needs_write_urgent = True
|
||||
|
||||
@@ -3,6 +3,10 @@ import os
|
||||
import time
|
||||
import re
|
||||
from flask import url_for
|
||||
from email import message_from_string
|
||||
from email.policy import default as email_policy
|
||||
|
||||
from changedetectionio.diff import REMOVED_STYLE, ADDED_STYLE
|
||||
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
|
||||
wait_for_all_checks, \
|
||||
set_longer_modified_response, delete_all_watches
|
||||
@@ -50,7 +54,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "fallback-body<br> " + default_notification_body,
|
||||
"application-notification_body": "some text\nfallback-body<br> " + default_notification_body,
|
||||
"application-notification_format": 'HTML',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
@@ -77,14 +81,164 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# The email should have two bodies, and the text/html part should be <br>
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '(added) So let\'s see what happens.<br>' in msg # the html part
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert 'fallback-body\r\n' in text_content # The plaintext part
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert 'some text<br>' in html_content # We converted \n from the notification body
|
||||
assert 'fallback-body<br>' in html_content # kept the original <br>
|
||||
assert '(added) So let\'s see what happens.<br>' in html_content # the html part
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_check_notification_plaintext_format(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'Text',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should be plain text only (not multipart)
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
|
||||
# Get the plain text content
|
||||
text_content = msg.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
# Should NOT contain HTML
|
||||
assert '<br>' not in text_content # We should not have HTML in plain text
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
|
||||
def test_check_notification_html_color_format(client, live_server, measure_memory_usage):
|
||||
set_original_response()
|
||||
|
||||
notification_url = f'mailto://changedetection@{smtp_test_server}:11025/?to=fff@home.com'
|
||||
|
||||
#####################
|
||||
# Set this up for when we remove the notification from the watch, it should fallback with these details
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-notification_urls": notification_url,
|
||||
"application-notification_title": "fallback-title " + default_notification_title,
|
||||
"application-notification_body": "some text\n" + default_notification_body,
|
||||
"application-notification_format": 'HTML Color',
|
||||
"requests-time_between_check-minutes": 180,
|
||||
'application-fetch_backend': "html_requests"},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Settings updated." in res.data
|
||||
|
||||
# Add a watch and trigger a HTTP POST
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
res = client.post(
|
||||
url_for("ui.ui_views.form_quick_watch_add"),
|
||||
data={"url": test_url, "tags": 'nice one'},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_longer_modified_response()
|
||||
time.sleep(2)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative with text/plain and text/html)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain (the auto-generated plaintext version)
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert 'So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
assert '(added)' not in text_content # Because apprise only dumb converts the html to text
|
||||
|
||||
# Second part should be text/html with color styling
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert REMOVED_STYLE in html_content
|
||||
assert ADDED_STYLE in html_content
|
||||
|
||||
assert 'some text<br>' in html_content
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
@@ -139,15 +293,21 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
# with open('/tmp/m.txt', 'w') as f:
|
||||
# f.write(msg)
|
||||
# f.write(msg_raw)
|
||||
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should not have two bodies, should be TEXT only
|
||||
assert not msg.is_multipart()
|
||||
assert msg.get_content_type() == 'text/plain'
|
||||
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
|
||||
# Get the plain text content
|
||||
text_content = msg.get_content()
|
||||
assert '(added) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
set_original_response()
|
||||
# Now override as HTML format
|
||||
@@ -164,18 +324,34 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
|
||||
wait_for_all_checks(client)
|
||||
|
||||
time.sleep(3)
|
||||
msg = get_last_message_from_smtp_server()
|
||||
assert len(msg) >= 1
|
||||
msg_raw = get_last_message_from_smtp_server()
|
||||
assert len(msg_raw) >= 1
|
||||
|
||||
# The email should have two bodies, and the text/html part should be <br>
|
||||
assert 'Content-Type: text/plain' in msg
|
||||
assert '(removed) So let\'s see what happens.\r\n' in msg # The plaintext part with \n
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '(removed) So let\'s see what happens.<br>' in msg # the html part
|
||||
# Parse the email properly using Python's email library
|
||||
msg = message_from_string(msg_raw, policy=email_policy)
|
||||
|
||||
# The email should have two bodies (multipart/alternative)
|
||||
assert msg.is_multipart()
|
||||
assert msg.get_content_type() == 'multipart/alternative'
|
||||
|
||||
# Get the parts
|
||||
parts = list(msg.iter_parts())
|
||||
assert len(parts) == 2
|
||||
|
||||
# First part should be text/plain
|
||||
text_part = parts[0]
|
||||
assert text_part.get_content_type() == 'text/plain'
|
||||
text_content = text_part.get_content()
|
||||
assert '(removed) So let\'s see what happens.\r\n' in text_content # The plaintext part
|
||||
|
||||
# Second part should be text/html
|
||||
html_part = parts[1]
|
||||
assert html_part.get_content_type() == 'text/html'
|
||||
html_content = html_part.get_content()
|
||||
assert '(removed) So let\'s see what happens.<br>' in html_content # the html part
|
||||
|
||||
# https://github.com/dgtlmoon/changedetection.io/issues/2103
|
||||
assert '<h1>Test</h1>' in msg
|
||||
assert '<' not in msg
|
||||
assert 'Content-Type: text/html' in msg
|
||||
assert '<h1>Test</h1>' in html_content
|
||||
assert '<' not in html_content
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
@@ -93,7 +93,9 @@ def test_filter_doesnt_exist_then_exists_should_get_notification(client, live_se
|
||||
"tags": "my tag",
|
||||
"title": "my title",
|
||||
"headers": "",
|
||||
"include_filters": '.ticket-available',
|
||||
# preprended with extra filter that intentionally doesn't match any entry,
|
||||
# notification should still be sent even if first filter does not match (PR#3516)
|
||||
"include_filters": ".non-matching-selector\n.ticket-available",
|
||||
"fetch_backend": "html_requests",
|
||||
"time_between_check_use_default": "y"})
|
||||
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
import os
|
||||
import time
|
||||
from loguru import logger
|
||||
from flask import url_for
|
||||
from .util import set_original_response, live_server_setup, extract_UUID_from_client, wait_for_all_checks, \
|
||||
wait_for_notification_endpoint_output
|
||||
from changedetectionio.model import App
|
||||
from .util import set_original_response, wait_for_all_checks, wait_for_notification_endpoint_output
|
||||
from ..notification import valid_notification_formats
|
||||
|
||||
|
||||
def set_response_with_filter():
|
||||
@@ -23,13 +21,14 @@ def set_response_with_filter():
|
||||
f.write(test_return_data)
|
||||
return None
|
||||
|
||||
def run_filter_test(client, live_server, content_filter):
|
||||
def run_filter_test(client, live_server, content_filter, app_notification_format):
|
||||
|
||||
# Response WITHOUT the filter ID element
|
||||
set_original_response()
|
||||
live_server.app.config['DATASTORE'].data['settings']['application']['notification_format'] = app_notification_format
|
||||
|
||||
# Goto the edit page, add our ignore text
|
||||
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'json')
|
||||
notification_url = url_for('test_notification_endpoint', _external=True).replace('http', 'post')
|
||||
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
@@ -127,8 +126,23 @@ def run_filter_test(client, live_server, content_filter):
|
||||
with open("test-datastore/notification.txt", 'r') as f:
|
||||
notification = f.read()
|
||||
|
||||
assert 'CSS/xPath filter was not present in the page' in notification
|
||||
assert content_filter.replace('"', '\\"') in notification
|
||||
assert 'Your configured CSS/xPath filters' in notification
|
||||
|
||||
|
||||
# Text (or HTML conversion) markup to make the notifications a little nicer should have worked
|
||||
if app_notification_format.startswith('html'):
|
||||
# apprise should have used sax-escape (' instead of ", " etc), lets check it worked
|
||||
|
||||
from apprise.conversion import convert_between
|
||||
from apprise.common import NotifyFormat
|
||||
escaped_filter = convert_between(NotifyFormat.TEXT, NotifyFormat.HTML, content_filter)
|
||||
|
||||
assert escaped_filter in notification or escaped_filter.replace('"', '"') in notification
|
||||
assert 'a href="' in notification # Quotes should still be there so the link works
|
||||
|
||||
else:
|
||||
assert 'a href' not in notification
|
||||
assert content_filter in notification
|
||||
|
||||
# Remove it and prove that it doesn't trigger when not expected
|
||||
# It should register a change, but no 'filter not found'
|
||||
@@ -159,14 +173,20 @@ def run_filter_test(client, live_server, content_filter):
|
||||
os.unlink("test-datastore/notification.txt")
|
||||
|
||||
|
||||
|
||||
|
||||
def test_check_include_filters_failure_notification(client, live_server, measure_memory_usage):
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client, live_server,'#nope-doesnt-exist')
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('HTML Color'))
|
||||
# Check markup send conversion didnt affect plaintext preference
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='#nope-doesnt-exist', app_notification_format=valid_notification_formats.get('Text'))
|
||||
|
||||
def test_check_xpath_filter_failure_notification(client, live_server, measure_memory_usage):
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client, live_server, '//*[@id="nope-doesnt-exist"]')
|
||||
# # live_server_setup(live_server) # Setup on conftest per function
|
||||
run_filter_test(client=client, live_server=live_server, content_filter='//*[@id="nope-doesnt-exist"]', app_notification_format=valid_notification_formats.get('HTML Color'))
|
||||
|
||||
# Test that notification is never sent
|
||||
|
||||
def test_basic_markup_from_text(client, live_server, measure_memory_usage):
|
||||
# Test the notification error templates convert to HTML if needed (link activate)
|
||||
from ..notification.handler import markup_text_links_to_html
|
||||
x = markup_text_links_to_html("hello https://google.com")
|
||||
assert 'a href' in x
|
||||
|
||||
@@ -284,6 +284,27 @@ def test_notification_validation(client, live_server, measure_memory_usage):
|
||||
)
|
||||
|
||||
|
||||
def test_notification_urls_jinja2_apprise_integration(client, live_server, measure_memory_usage):
|
||||
|
||||
#
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation
|
||||
test_notification_url = "hassio://127.0.0.1/longaccesstoken?verify=no&nid={{watch_uuid}}"
|
||||
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={
|
||||
"application-fetch_backend": "html_requests",
|
||||
"application-minutes_between_check": 180,
|
||||
"application-notification_body": '{ "url" : "{{ watch_url }}", "secret": 444, "somebug": "网站监测 内容更新了" }',
|
||||
"application-notification_format": default_notification_format,
|
||||
"application-notification_urls": test_notification_url,
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#get-parameter-manipulation
|
||||
"application-notification_title": "New ChangeDetection.io Notification - {{ watch_url }} ",
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b'Settings updated' in res.data
|
||||
|
||||
|
||||
def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_memory_usage):
|
||||
|
||||
@@ -294,7 +315,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
|
||||
# CUSTOM JSON BODY CHECK for POST://
|
||||
set_original_response()
|
||||
# https://github.com/caronc/apprise/wiki/Notify_Custom_JSON#header-manipulation
|
||||
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&xxx={{ watch_url }}&+custom-header=123&+second=hello+world%20%22space%22"
|
||||
test_notification_url = url_for('test_notification_endpoint', _external=True).replace('http://', 'post://')+"?status_code=204&watch_uuid={{ watch_uuid }}&xxx={{ watch_url }}&now={% now 'Europe/London', '%Y-%m-%d' %}&+custom-header=123&+second=hello+world%20%22space%22"
|
||||
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
@@ -320,6 +341,7 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
|
||||
)
|
||||
|
||||
assert b"Watch added" in res.data
|
||||
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||
|
||||
wait_for_all_checks(client)
|
||||
set_modified_response()
|
||||
@@ -349,6 +371,11 @@ def test_notification_custom_endpoint_and_jinja2(client, live_server, measure_me
|
||||
assert 'xxx=http' in notification_url
|
||||
# apprise style headers should be stripped
|
||||
assert 'custom-header' not in notification_url
|
||||
# Check jinja2 custom arrow/jinja2-time replace worked
|
||||
assert 'now=2' in notification_url
|
||||
# Check our watch_uuid appeared
|
||||
assert f'watch_uuid={watch_uuid}' in notification_url
|
||||
|
||||
|
||||
with open("test-datastore/notification-headers.txt", 'r') as f:
|
||||
notification_headers = f.read()
|
||||
@@ -416,7 +443,6 @@ def test_global_send_test_notification(client, live_server, measure_memory_usage
|
||||
assert res.status_code != 400
|
||||
assert res.status_code != 500
|
||||
|
||||
|
||||
with open("test-datastore/notification.txt", 'r') as f:
|
||||
x = f.read()
|
||||
assert test_body in x
|
||||
@@ -515,9 +541,7 @@ def _test_color_notifications(client, notification_body_token):
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
# Just checks the format of the colour notifications was correct
|
||||
def test_html_color_notifications(client, live_server, measure_memory_usage):
|
||||
|
||||
|
||||
_test_color_notifications(client, '{{diff}}')
|
||||
_test_color_notifications(client, '{{diff_full}}')
|
||||
|
||||
@@ -42,6 +42,9 @@ jsonpath-ng~=1.5.3
|
||||
# Notification library
|
||||
apprise==1.9.5
|
||||
|
||||
# Lightweight URL linkifier for notifications
|
||||
linkify-it-py
|
||||
|
||||
# - Needed for apprise/spush, and maybe others? hopefully doesnt trigger a rust compile.
|
||||
# - Requires extra wheel for rPi, adds build time for arm/v8 which is not in piwheels
|
||||
# Pinned to 43.0.1 for ARM compatibility (45.x may not have pre-built ARM wheels)
|
||||
|
||||
19
setup.py
19
setup.py
@@ -5,6 +5,8 @@ import re
|
||||
import sys
|
||||
|
||||
from setuptools import setup, find_packages
|
||||
from setuptools.command.build_py import build_py
|
||||
import shutil
|
||||
|
||||
here = os.path.abspath(os.path.dirname(__file__))
|
||||
|
||||
@@ -22,6 +24,20 @@ def find_version(*file_paths):
|
||||
raise RuntimeError("Unable to find version string.")
|
||||
|
||||
|
||||
class BuildPyCommand(build_py):
|
||||
"""Custom build command to copy api-spec.yaml to the package."""
|
||||
def run(self):
|
||||
build_py.run(self)
|
||||
# Ensure the docs directory exists in the build output
|
||||
docs_dir = os.path.join(self.build_lib, 'changedetectionio', 'docs')
|
||||
os.makedirs(docs_dir, exist_ok=True)
|
||||
# Copy api-spec.yaml to the package
|
||||
shutil.copy(
|
||||
os.path.join(here, 'docs', 'api-spec.yaml'),
|
||||
os.path.join(docs_dir, 'api-spec.yaml')
|
||||
)
|
||||
|
||||
|
||||
install_requires = open('requirements.txt').readlines()
|
||||
|
||||
setup(
|
||||
@@ -37,9 +53,10 @@ setup(
|
||||
scripts=["changedetection.py"],
|
||||
author='dgtlmoon',
|
||||
url='https://changedetection.io',
|
||||
packages=['changedetectionio'],
|
||||
packages=find_packages(include=['changedetectionio', 'changedetectionio.*']),
|
||||
include_package_data=True,
|
||||
install_requires=install_requires,
|
||||
cmdclass={'build_py': BuildPyCommand},
|
||||
license="Apache License 2.0",
|
||||
python_requires=">= 3.10",
|
||||
classifiers=['Intended Audience :: Customer Service',
|
||||
|
||||
Reference in New Issue
Block a user