Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
3a0b51053f Dont use .lower() because the html could be very big and this uses a lot of ram 2025-04-11 09:13:34 +02:00
21 changed files with 191 additions and 444 deletions

View File

@@ -1,9 +1,9 @@
recursive-include changedetectionio/api *
recursive-include changedetectionio/apprise_plugin *
recursive-include changedetectionio/blueprint *
recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/conditions *
recursive-include changedetectionio/model *
recursive-include changedetectionio/notification *
recursive-include changedetectionio/processors *
recursive-include changedetectionio/static *
recursive-include changedetectionio/templates *

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.49.13'
__version__ = '0.49.12'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -1,145 +0,0 @@
from flask_expects_json import expects_json
from flask_restful import Resource
from . import auth
from flask_restful import abort, Resource
from flask import request
from . import auth
from . import schema_create_notification_urls, schema_delete_notification_urls
class Notifications(Resource):
def __init__(self, **kwargs):
# datastore is a black box dependency
self.datastore = kwargs['datastore']
@auth.check_token
def get(self):
"""
@api {get} /api/v1/notifications Return Notification URL List
@apiDescription Return the Notification URL List from the configuration
@apiExample {curl} Example usage:
curl http://localhost:5000/api/v1/notifications -H"x-api-key:813031b16330fe25e3780cf0325daa45"
HTTP/1.0 200
{
'notification_urls': ["notification-urls-list"]
}
@apiName Get
@apiGroup Notifications
"""
notification_urls = self.datastore.data.get('settings', {}).get('application', {}).get('notification_urls', [])
return {
'notification_urls': notification_urls,
}, 200
@auth.check_token
@expects_json(schema_create_notification_urls)
def post(self):
"""
@api {post} /api/v1/notifications Create Notification URLs
@apiDescription Add one or more notification URLs from the configuration
@apiExample {curl} Example usage:
curl http://localhost:5000/api/v1/notifications/batch -H"x-api-key:813031b16330fe25e3780cf0325daa45" -H "Content-Type: application/json" -d '{"notification_urls": ["url1", "url2"]}'
@apiName CreateBatch
@apiGroup Notifications
@apiSuccess (201) {Object[]} notification_urls List of added notification URLs
@apiError (400) {String} Invalid input
"""
json_data = request.get_json()
notification_urls = json_data.get("notification_urls", [])
from wtforms import ValidationError
try:
validate_notification_urls(notification_urls)
except ValidationError as e:
return str(e), 400
added_urls = []
for url in notification_urls:
clean_url = url.strip()
added_url = self.datastore.add_notification_url(clean_url)
if added_url:
added_urls.append(added_url)
if not added_urls:
return "No valid notification URLs were added", 400
return {'notification_urls': added_urls}, 201
@auth.check_token
@expects_json(schema_create_notification_urls)
def put(self):
"""
@api {put} /api/v1/notifications Replace Notification URLs
@apiDescription Replace all notification URLs with the provided list (can be empty)
@apiExample {curl} Example usage:
curl -X PUT http://localhost:5000/api/v1/notifications -H"x-api-key:813031b16330fe25e3780cf0325daa45" -H "Content-Type: application/json" -d '{"notification_urls": ["url1", "url2"]}'
@apiName Replace
@apiGroup Notifications
@apiSuccess (200) {Object[]} notification_urls List of current notification URLs
@apiError (400) {String} Invalid input
"""
json_data = request.get_json()
notification_urls = json_data.get("notification_urls", [])
from wtforms import ValidationError
try:
validate_notification_urls(notification_urls)
except ValidationError as e:
return str(e), 400
if not isinstance(notification_urls, list):
return "Invalid input format", 400
clean_urls = [url.strip() for url in notification_urls if isinstance(url, str)]
self.datastore.data['settings']['application']['notification_urls'] = clean_urls
self.datastore.needs_write = True
return {'notification_urls': clean_urls}, 200
@auth.check_token
@expects_json(schema_delete_notification_urls)
def delete(self):
"""
@api {delete} /api/v1/notifications Delete Notification URLs
@apiDescription Deletes one or more notification URLs from the configuration
@apiExample {curl} Example usage:
curl http://localhost:5000/api/v1/notifications -X DELETE -H"x-api-key:813031b16330fe25e3780cf0325daa45" -H "Content-Type: application/json" -d '{"notification_urls": ["url1", "url2"]}'
@apiParam {String[]} notification_urls The notification URLs to delete.
@apiName Delete
@apiGroup Notifications
@apiSuccess (204) {String} OK Deleted
@apiError (400) {String} No matching notification URLs found.
"""
json_data = request.get_json()
urls_to_delete = json_data.get("notification_urls", [])
if not isinstance(urls_to_delete, list):
abort(400, message="Expected a list of notification URLs.")
notification_urls = self.datastore.data['settings']['application'].get('notification_urls', [])
deleted = []
for url in urls_to_delete:
clean_url = url.strip()
if clean_url in notification_urls:
notification_urls.remove(clean_url)
deleted.append(clean_url)
if not deleted:
abort(400, message="No matching notification URLs found.")
self.datastore.data['settings']['application']['notification_urls'] = notification_urls
self.datastore.needs_write = True
return 'OK', 204
def validate_notification_urls(notification_urls):
from changedetectionio.forms import ValidateAppRiseServers
validator = ValidateAppRiseServers()
class DummyForm: pass
dummy_form = DummyForm()
field = type("Field", (object,), {"data": notification_urls, "gettext": lambda self, x: x})()
validator(dummy_form, field)

View File

@@ -19,15 +19,8 @@ schema_create_tag['required'] = ['title']
schema_update_tag = copy.deepcopy(schema_tag)
schema_update_tag['additionalProperties'] = False
schema_notification_urls = copy.deepcopy(schema)
schema_create_notification_urls = copy.deepcopy(schema_notification_urls)
schema_create_notification_urls['required'] = ['notification_urls']
schema_delete_notification_urls = copy.deepcopy(schema_notification_urls)
schema_delete_notification_urls['required'] = ['notification_urls']
# Import all API resources
from .Watch import Watch, WatchHistory, WatchSingleHistory, CreateWatch
from .Tags import Tags, Tag
from .Import import Import
from .SystemInfo import SystemInfo
from .Notifications import Notifications

View File

@@ -1,7 +1,5 @@
# Responsible for building the storage dict into a set of rules ("JSON Schema") acceptable via the API
# Probably other ways to solve this when the backend switches to some ORM
from changedetectionio.notification import valid_notification_formats
def build_time_between_check_json_schema():
# Setup time between check schema
@@ -100,6 +98,8 @@ def build_watch_json_schema(d):
}
}
from changedetectionio.notification import valid_notification_formats
schema['properties']['notification_format'] = {'type': 'string',
'enum': list(valid_notification_formats.keys())
}

View File

@@ -4,6 +4,7 @@ from loguru import logger
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
from changedetectionio.notification import process_notification
def construct_blueprint(datastore: ChangeDetectionStore):
notification_blueprint = Blueprint('ui_notification', __name__, template_folder="../ui/templates")
@@ -17,11 +18,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
# Watch_uuid could be unset in the case it`s used in tag editor, global settings
import apprise
from changedetectionio.notification.handler import process_notification
from changedetectionio.notification.apprise_plugin.assets import apprise_asset
from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_http_custom_handler
from ...apprise_plugin.assets import apprise_asset
from ...apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
apobj = apprise.Apprise(asset=apprise_asset)
is_global_settings_form = request.args.get('mode', '') == 'global-settings'

View File

@@ -7,13 +7,13 @@ import os
# Visual Selector scraper - 'Button' is there because some sites have <button>OUT OF STOCK</button>.
visualselector_xpath_selectors = 'div,span,form,table,tbody,tr,td,a,p,ul,li,h1,h2,h3,h4,header,footer,section,article,aside,details,main,nav,section,summary,button'
SCREENSHOT_MAX_HEIGHT_DEFAULT = 20000
SCREENSHOT_MAX_HEIGHT_DEFAULT = 16000
SCREENSHOT_DEFAULT_QUALITY = 40
# Maximum total height for the final image (When in stitch mode).
# We limit this to 16000px due to the huge amount of RAM that was being used
# Example: 16000 × 1400 × 3 = 67,200,000 bytes ≈ 64.1 MB (not including buffers in PIL etc)
SCREENSHOT_MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
MAX_TOTAL_HEIGHT = int(os.getenv("SCREENSHOT_MAX_HEIGHT", SCREENSHOT_MAX_HEIGHT_DEFAULT))
# The size at which we will switch to stitching method, when below this (and
# MAX_TOTAL_HEIGHT which can be set by a user) we will use the default

View File

@@ -5,10 +5,13 @@ from urllib.parse import urlparse
from loguru import logger
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT, visualselector_xpath_selectors, \
SCREENSHOT_SIZE_STITCH_THRESHOLD, SCREENSHOT_MAX_TOTAL_HEIGHT, XPATH_ELEMENT_JS, INSTOCK_DATA_JS
SCREENSHOT_SIZE_STITCH_THRESHOLD, MAX_TOTAL_HEIGHT, SCREENSHOT_DEFAULT_QUALITY, XPATH_ELEMENT_JS, INSTOCK_DATA_JS
from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker
from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable
def capture_full_page(page):
import os
import time
@@ -17,56 +20,84 @@ def capture_full_page(page):
start = time.time()
page_height = page.evaluate("document.documentElement.scrollHeight")
page_width = page.evaluate("document.documentElement.scrollWidth")
original_viewport = page.viewport_size
logger.debug(f"Playwright viewport size {page.viewport_size} page height {page_height} page width {page_width}")
logger.debug(f"Playwright viewport size {page.viewport_size}")
# Use an approach similar to puppeteer: set a larger viewport and take screenshots in chunks
step_size = SCREENSHOT_SIZE_STITCH_THRESHOLD # Size that won't cause GPU to overflow
screenshot_chunks = []
y = 0
# If page height is larger than current viewport, use a larger viewport for better capturing
if page_height > page.viewport_size['height']:
# Set viewport to a larger size to capture more content at once
page.set_viewport_size({'width': page.viewport_size['width'], 'height': step_size})
############################################################
#### SCREENSHOT FITS INTO ONE SNAPSHOT (SMALLER PAGES) #####
############################################################
# Capture screenshots in chunks up to the max total height
while y < min(page_height, SCREENSHOT_MAX_TOTAL_HEIGHT):
# Optimization to avoid unnecessary stitching if we can avoid it
# Use the default screenshot method for smaller pages to take advantage
# of GPU and native playwright screenshot optimizations
# - No PIL needed here, no danger of memory leaks, no sub process required
if (page_height < SCREENSHOT_SIZE_STITCH_THRESHOLD and page_height < MAX_TOTAL_HEIGHT ):
logger.debug("Using default screenshot method")
page.request_gc()
page.evaluate(f"window.scrollTo(0, {y})")
page.request_gc()
screenshot_chunks.append(page.screenshot(
screenshot = page.screenshot(
type="jpeg",
full_page=False,
quality=int(os.getenv("SCREENSHOT_QUALITY", 72))
))
y += step_size
quality=int(os.getenv("SCREENSHOT_QUALITY", SCREENSHOT_DEFAULT_QUALITY)),
full_page=True,
)
page.request_gc()
# Restore original viewport size
page.set_viewport_size({'width': original_viewport['width'], 'height': original_viewport['height']})
# If we have multiple chunks, stitch them together
if len(screenshot_chunks) > 1:
from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker
logger.debug(f"Screenshot stitching {len(screenshot_chunks)} chunks together")
parent_conn, child_conn = Pipe()
p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, SCREENSHOT_MAX_TOTAL_HEIGHT))
p.start()
screenshot = parent_conn.recv_bytes()
p.join()
logger.debug(
f"Screenshot (chunked/stitched) - Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
screenshot_chunks = None
logger.debug(f"Screenshot captured in {time.time() - start:.2f}s")
return screenshot
logger.debug(
f"Screenshot Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
return screenshot_chunks[0]
###################################################################################
#### CASE FOR LARGE SCREENSHOTS THAT NEED TO BE TRIMMED DUE TO MEMORY ISSUES #####
###################################################################################
# - PIL can easily allocate memory and not release it cleanly
# - Fetching screenshot from playwright seems OK
# Image.new is leaky even with .close()
# So lets prepare all the data chunks and farm it out to a subprocess for clean memory handling
logger.debug(
"Using stitching method for large screenshot because page height exceeds threshold"
)
# Limit the total capture height
capture_height = min(page_height, MAX_TOTAL_HEIGHT)
# Calculate number of chunks needed using ORIGINAL viewport height
num_chunks = (capture_height + page.viewport_size['height'] - 1) // page.viewport_size['height']
screenshot_chunks = []
# Track cumulative paste position
y_offset = 0
for _ in range(num_chunks):
page.request_gc()
page.evaluate(f"window.scrollTo(0, {y_offset})")
page.request_gc()
h = min(page.viewport_size['height'], capture_height - y_offset)
screenshot_chunks.append(page.screenshot(
type="jpeg",
clip={
"x": 0,
"y": 0,
"width": page.viewport_size['width'],
"height": h,
},
quality=int(os.getenv("SCREENSHOT_QUALITY", SCREENSHOT_DEFAULT_QUALITY)),
))
y_offset += h # maybe better to inspect the image here?
page.request_gc()
# PIL can leak memory in various situations, assign the work to a subprocess for totally clean handling
parent_conn, child_conn = Pipe()
p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, capture_height))
p.start()
result = parent_conn.recv_bytes()
p.join()
screenshot_chunks = None
logger.debug(f"Screenshot - Page height: {page_height} Capture height: {capture_height} - Stitched together in {time.time() - start:.2f}s")
return result
class fetcher(Fetcher):
@@ -261,7 +292,6 @@ class fetcher(Fetcher):
self.page.request_gc()
self.content = self.page.content()
self.page.request_gc()
logger.debug(f"Scrape xPath element data in browser done in {time.time() - now:.2f}s")
# Bug 3 in Playwright screenshot handling
@@ -287,11 +317,4 @@ class fetcher(Fetcher):
# Clean up resources properly
context.close()
context = None
self.page.close()
self.page = None
browser.close()
borwser = None

View File

@@ -7,11 +7,10 @@ from urllib.parse import urlparse
from loguru import logger
from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT, visualselector_xpath_selectors, \
SCREENSHOT_SIZE_STITCH_THRESHOLD, SCREENSHOT_DEFAULT_QUALITY, XPATH_ELEMENT_JS, INSTOCK_DATA_JS, \
SCREENSHOT_MAX_TOTAL_HEIGHT
SCREENSHOT_SIZE_STITCH_THRESHOLD, MAX_TOTAL_HEIGHT, SCREENSHOT_DEFAULT_QUALITY, XPATH_ELEMENT_JS, INSTOCK_DATA_JS
from changedetectionio.content_fetchers.base import Fetcher, manage_user_agent
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, \
BrowserConnectError
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError
from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker
# Bug 3 in Playwright screenshot handling
@@ -28,53 +27,71 @@ async def capture_full_page(page):
start = time.time()
page_height = await page.evaluate("document.documentElement.scrollHeight")
page_width = await page.evaluate("document.documentElement.scrollWidth")
original_viewport = page.viewport
logger.debug(f"Puppeteer viewport size {page.viewport} page height {page_height} page width {page_width}")
logger.debug(f"Puppeteer viewport size {page.viewport}")
# Bug 3 in Playwright screenshot handling
# Some bug where it gives the wrong screenshot size, but making a request with the clip set first seems to solve it
# JPEG is better here because the screenshots can be very very large
############################################################
#### SCREENSHOT FITS INTO ONE SNAPSHOT (SMALLER PAGES) #####
############################################################
# Screenshots also travel via the ws:// (websocket) meaning that the binary data is base64 encoded
# which will significantly increase the IO size between the server and client, it's recommended to use the lowest
# acceptable screenshot quality here
step_size = SCREENSHOT_SIZE_STITCH_THRESHOLD # Something that will not cause the GPU to overflow when taking the screenshot
screenshot_chunks = []
y = 0
if page_height > page.viewport['height']:
await page.setViewport({'width': page.viewport['width'], 'height': step_size})
while y < min(page_height, SCREENSHOT_MAX_TOTAL_HEIGHT):
await page.evaluate(f"window.scrollTo(0, {y})")
screenshot_chunks.append(await page.screenshot(type_='jpeg',
fullPage=False,
quality=int(os.getenv("SCREENSHOT_QUALITY", 72))))
y += step_size
await page.setViewport({'width': original_viewport['width'], 'height': original_viewport['height']})
if len(screenshot_chunks) > 1:
from changedetectionio.content_fetchers.screenshot_handler import stitch_images_worker
logger.debug(f"Screenshot stitching {len(screenshot_chunks)} chunks together")
parent_conn, child_conn = Pipe()
p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, SCREENSHOT_MAX_TOTAL_HEIGHT))
p.start()
screenshot = parent_conn.recv_bytes()
p.join()
logger.debug(
f"Screenshot (chunked/stitched) - Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
screenshot_chunks = None
# Optimization to avoid unnecessary stitching if we can avoid it
# Use the default screenshot method for smaller pages to take advantage
# of GPU and native playwright screenshot optimizations
# - No PIL needed here, no danger of memory leaks, no sub process required
if (page_height < SCREENSHOT_SIZE_STITCH_THRESHOLD and page_height < MAX_TOTAL_HEIGHT ):
logger.debug("Using default screenshot method")
await page.evaluate(f"window.scrollTo(0, 0)")
screenshot = await page.screenshot(
type_="jpeg",
quality=int(os.getenv("SCREENSHOT_QUALITY", SCREENSHOT_DEFAULT_QUALITY)),
fullPage=True,
)
logger.debug(f"Screenshot captured in {time.time() - start:.2f}s")
return screenshot
###################################################################################
#### CASE FOR LARGE SCREENSHOTS THAT NEED TO BE TRIMMED DUE TO MEMORY ISSUES #####
###################################################################################
# - PIL can easily allocate memory and not release it cleanly
# - Fetching screenshot from playwright seems OK
# Image.new is leaky even with .close()
# So lets prepare all the data chunks and farm it out to a subprocess for clean memory handling
logger.debug(
f"Screenshot Page height: {page_height} Capture height: {SCREENSHOT_MAX_TOTAL_HEIGHT} - Stitched together in {time.time() - start:.2f}s")
return screenshot_chunks[0]
"Using stitching method for large screenshot because page height exceeds threshold"
)
# Limit the total capture height
capture_height = min(page_height, MAX_TOTAL_HEIGHT)
# Calculate number of chunks needed using ORIGINAL viewport height
num_chunks = (capture_height + page.viewport['height'] - 1) // page.viewport['height']
screenshot_chunks = []
# Track cumulative paste position
y_offset = 0
for _ in range(num_chunks):
await page.evaluate(f"window.scrollTo(0, {y_offset})")
h = min(page.viewport['height'], capture_height - y_offset)
screenshot_chunks.append(await page.screenshot(
type_="jpeg",
quality=int(os.getenv("SCREENSHOT_QUALITY", SCREENSHOT_DEFAULT_QUALITY)),
))
y_offset += h # maybe better to inspect the image here?
# PIL can leak memory in various situations, assign the work to a subprocess for totally clean handling
parent_conn, child_conn = Pipe()
p = Process(target=stitch_images_worker, args=(child_conn, screenshot_chunks, page_height, capture_height))
p.start()
result = parent_conn.recv_bytes()
p.join()
screenshot_chunks = None
logger.debug(f"Screenshot - Page height: {page_height} Capture height: {capture_height} - Stitched together in {time.time() - start:.2f}s")
return result
class fetcher(Fetcher):

View File

@@ -33,7 +33,7 @@ from loguru import logger
from changedetectionio import __version__
from changedetectionio import queuedWatchMetaData
from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, CreateWatch, Import, SystemInfo, Tag, Tags, Notifications
from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, CreateWatch, Import, SystemInfo, Tag, Tags
from changedetectionio.api.Search import Search
from .time_handler import is_within_schedule
@@ -285,8 +285,7 @@ def changedetection_app(config=None, datastore_o=None):
watch_api.add_resource(Search, '/api/v1/search',
resource_class_kwargs={'datastore': datastore})
watch_api.add_resource(Notifications, '/api/v1/notifications',
resource_class_kwargs={'datastore': datastore})
@login_manager.user_loader
def user_loader(email):
@@ -515,8 +514,7 @@ def notification_runner():
sent_obj = None
try:
from changedetectionio.notification.handler import process_notification
from changedetectionio import notification
# Fallback to system config if not set
if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'):
n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body')
@@ -526,8 +524,8 @@ def notification_runner():
if not n_object.get('notification_format') and datastore.data['settings']['application'].get('notification_format'):
n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format')
if n_object.get('notification_urls', {}):
sent_obj = process_notification(n_object, datastore)
sent_obj = notification.process_notification(n_object, datastore)
except Exception as e:
logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}")

View File

@@ -306,8 +306,8 @@ class ValidateAppRiseServers(object):
def __call__(self, form, field):
import apprise
from .notification.apprise_plugin.assets import apprise_asset
from .notification.apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
from .apprise_plugin.assets import apprise_asset
from .apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
apobj = apprise.Apprise(asset=apprise_asset)

View File

@@ -2,7 +2,7 @@ import os
import uuid
from changedetectionio import strtobool
default_notification_format_for_watch = 'System default'
from changedetectionio.notification import default_notification_format_for_watch
class watch_base(dict):

View File

@@ -1,17 +1,47 @@
import time
from apprise import NotifyFormat
import apprise
from loguru import logger
from .apprise_plugin.assets import apprise_asset, APPRISE_AVATAR_URL
from .apprise_plugin.assets import APPRISE_AVATAR_URL
from .apprise_plugin.custom_handlers import apprise_http_custom_handler # noqa: F401
from .safe_jinja import render as jinja_render
valid_tokens = {
'base_url': '',
'current_snapshot': '',
'diff': '',
'diff_added': '',
'diff_full': '',
'diff_patch': '',
'diff_removed': '',
'diff_url': '',
'preview_url': '',
'triggered_text': '',
'watch_tag': '',
'watch_title': '',
'watch_url': '',
'watch_uuid': '',
}
default_notification_format_for_watch = 'System default'
default_notification_format = 'HTML Color'
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
valid_notification_formats = {
'Text': NotifyFormat.TEXT,
'Markdown': NotifyFormat.MARKDOWN,
'HTML': NotifyFormat.HTML,
'HTML Color': 'htmlcolor',
# Used only for editing a watch (not for global)
default_notification_format_for_watch: default_notification_format_for_watch
}
def process_notification(n_object, datastore):
from changedetectionio.safe_jinja import render as jinja_render
from . import default_notification_format_for_watch, default_notification_format, valid_notification_formats
# be sure its registered
from .apprise_plugin.custom_handlers import apprise_http_custom_handler
now = time.time()
if n_object.get('notification_timestamp'):
logger.trace(f"Time since queued {now-n_object['notification_timestamp']:.3f}s")
@@ -28,13 +58,14 @@ def process_notification(n_object, datastore):
# Initially text or whatever
n_format = datastore.data['settings']['application'].get('notification_format', valid_notification_formats[default_notification_format])
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.2f}s")
logger.trace(f"Complete notification body including Jinja and placeholders calculated in {time.time() - now:.3f}s")
# https://github.com/caronc/apprise/wiki/Development_LogCapture
# Anything higher than or equal to WARNING (which covers things like Connection errors)
# raise it as an exception
sent_objs = []
from .apprise_plugin.assets import apprise_asset
if 'as_async' in n_object:
apprise_asset.async_mode = n_object.get('as_async')
@@ -145,7 +176,6 @@ def process_notification(n_object, datastore):
# ( Where we prepare the tokens in the notification to be replaced with actual values )
def create_notification_parameters(n_object, datastore):
from copy import deepcopy
from . import valid_tokens
# in the case we send a test notification from the main settings, there is no UUID.
uuid = n_object['uuid'] if 'uuid' in n_object else ''

View File

@@ -1,35 +0,0 @@
from changedetectionio.model import default_notification_format_for_watch
ult_notification_format_for_watch = 'System default'
default_notification_format = 'HTML Color'
default_notification_body = '{{watch_url}} had a change.\n---\n{{diff}}\n---\n'
default_notification_title = 'ChangeDetection.io Notification - {{watch_url}}'
# The values (markdown etc) are from apprise NotifyFormat,
# But to avoid importing the whole heavy module just use the same strings here.
valid_notification_formats = {
'Text': 'text',
'Markdown': 'markdown',
'HTML': 'html',
'HTML Color': 'htmlcolor',
# Used only for editing a watch (not for global)
default_notification_format_for_watch: default_notification_format_for_watch
}
valid_tokens = {
'base_url': '',
'current_snapshot': '',
'diff': '',
'diff_added': '',
'diff_full': '',
'diff_patch': '',
'diff_removed': '',
'diff_url': '',
'preview_url': '',
'triggered_text': '',
'watch_tag': '',
'watch_title': '',
'watch_url': '',
'watch_uuid': '',
}

View File

@@ -964,25 +964,3 @@ class ChangeDetectionStore:
f_d.write(zlib.compress(f_j.read()))
os.unlink(json_path)
def add_notification_url(self, notification_url):
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
notification_urls = self.data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
with self.lock:
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
if notification_url in notification_urls:
return notification_url
# Append and update the datastore
notification_urls.append(notification_url)
self.__data['settings']['application']['notification_urls'] = notification_urls
self.needs_write = True
return notification_url

View File

@@ -1,108 +0,0 @@
#!/usr/bin/env python3
from flask import url_for
from .util import live_server_setup
import json
def test_api_notifications_crud(client, live_server):
live_server_setup(live_server)
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# Confirm notifications are initially empty
res = client.get(
url_for("notifications"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json == {"notification_urls": []}
# Add notification URLs
test_urls = ["posts://example.com/notify1", "posts://example.com/notify2"]
res = client.post(
url_for("notifications"),
data=json.dumps({"notification_urls": test_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 201
for url in test_urls:
assert url in res.json["notification_urls"]
# Confirm the notification URLs were added
res = client.get(
url_for("notifications"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
for url in test_urls:
assert url in res.json["notification_urls"]
# Delete one notification URL
res = client.delete(
url_for("notifications"),
data=json.dumps({"notification_urls": [test_urls[0]]}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 204
# Confirm it was removed and the other remains
res = client.get(
url_for("notifications"),
headers={'x-api-key': api_key}
)
assert res.status_code == 200
assert test_urls[0] not in res.json["notification_urls"]
assert test_urls[1] in res.json["notification_urls"]
# Try deleting a non-existent URL
res = client.delete(
url_for("notifications"),
data=json.dumps({"notification_urls": ["posts://nonexistent.com"]}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400
res = client.post(
url_for("notifications"),
data=json.dumps({"notification_urls": test_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 201
# Replace with a new list
replacement_urls = ["posts://new.example.com"]
res = client.put(
url_for("notifications"),
data=json.dumps({"notification_urls": replacement_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json["notification_urls"] == replacement_urls
# Replace with an empty list
res = client.put(
url_for("notifications"),
data=json.dumps({"notification_urls": []}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 200
assert res.json["notification_urls"] == []
# Provide an invalid AppRise URL to trigger validation error
invalid_urls = ["ftp://not-app-rise"]
res = client.post(
url_for("notifications"),
data=json.dumps({"notification_urls": invalid_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400
assert "is not a valid AppRise URL." in res.data.decode()
res = client.put(
url_for("notifications"),
data=json.dumps({"notification_urls": invalid_urls}),
headers={'content-type': 'application/json', 'x-api-key': api_key}
)
assert res.status_code == 400
assert "is not a valid AppRise URL." in res.data.decode()

View File

@@ -167,10 +167,7 @@ def test_check_notification(client, live_server, measure_memory_usage):
assert ':-)' in notification_submission
# Check the attachment was added, and that it is a JPEG from the original PNG
notification_submission_object = json.loads(notification_submission)
assert notification_submission_object
# We keep PNG screenshots for now
# IF THIS FAILS YOU SHOULD BE TESTING WITH ENV VAR REMOVE_REQUESTS_OLD_SCREENSHOTS=False
assert notification_submission_object['attachments'][0]['filename'] == 'last-screenshot.png'
assert len(notification_submission_object['attachments'][0]['base64'])
assert notification_submission_object['attachments'][0]['mimetype'] == 'image/png'

View File

@@ -109,6 +109,7 @@ class update_worker(threading.Thread):
default_notification_title
)
# Would be better if this was some kind of Object where Watch can reference the parent datastore etc
v = watch.get(var_name)
if v and not watch.get('notification_muted'):