From 7382eb985f38bbfde42470714612f41d57110ff0 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Thu, 28 Aug 2025 23:00:58 +0200 Subject: [PATCH] tweaks --- .../notification/BrowserNotifications.py | 216 ++++++++++++++++++ .../browser_notification_helpers.py | 52 ++--- 2 files changed, 242 insertions(+), 26 deletions(-) create mode 100644 changedetectionio/notification/BrowserNotifications.py diff --git a/changedetectionio/notification/BrowserNotifications.py b/changedetectionio/notification/BrowserNotifications.py new file mode 100644 index 00000000..31df9ea5 --- /dev/null +++ b/changedetectionio/notification/BrowserNotifications.py @@ -0,0 +1,216 @@ +import json +from flask import request, current_app +from flask_restful import Resource, marshal_with, fields +from loguru import logger + + +browser_notifications_fields = { + 'success': fields.Boolean, + 'message': fields.String, +} + +vapid_public_key_fields = { + 'publicKey': fields.String, +} + +test_notification_fields = { + 'success': fields.Boolean, + 'message': fields.String, + 'sent_count': fields.Integer, +} + + +class BrowserNotificationsVapidPublicKey(Resource): + """Get VAPID public key for browser push notifications""" + + @marshal_with(vapid_public_key_fields) + def get(self): + try: + from changedetectionio.notification.apprise_plugin.browser_notification_helpers import ( + get_vapid_config_from_datastore, convert_pem_public_key_for_browser + ) + + datastore = current_app.config.get('DATASTORE') + if not datastore: + return {'publicKey': None}, 500 + + private_key, public_key_pem, contact_email = get_vapid_config_from_datastore(datastore) + + if not public_key_pem: + return {'publicKey': None}, 404 + + # Convert PEM format to URL-safe base64 format for browser + public_key_b64 = convert_pem_public_key_for_browser(public_key_pem) + + if public_key_b64: + return {'publicKey': public_key_b64} + else: + return {'publicKey': None}, 500 + + except Exception as e: + logger.error(f"Failed to get VAPID public key: {e}") + return {'publicKey': None}, 500 + + +class BrowserNotificationsSubscribe(Resource): + """Subscribe to browser notifications""" + + @marshal_with(browser_notifications_fields) + def post(self): + try: + data = request.get_json() + if not data: + return {'success': False, 'message': 'No data provided'}, 400 + + subscription = data.get('subscription') + + if not subscription: + return {'success': False, 'message': 'Subscription is required'}, 400 + + # Validate subscription format + required_fields = ['endpoint', 'keys'] + for field in required_fields: + if field not in subscription: + return {'success': False, 'message': f'Missing subscription field: {field}'}, 400 + + if 'p256dh' not in subscription['keys'] or 'auth' not in subscription['keys']: + return {'success': False, 'message': 'Missing subscription keys'}, 400 + + # Get datastore + datastore = current_app.config.get('DATASTORE') + if not datastore: + return {'success': False, 'message': 'Datastore not available'}, 500 + + # Initialize browser_subscriptions if it doesn't exist + if 'browser_subscriptions' not in datastore.data['settings']['application']: + datastore.data['settings']['application']['browser_subscriptions'] = [] + + # Check if subscription already exists + existing_subscriptions = datastore.data['settings']['application']['browser_subscriptions'] + for existing_sub in existing_subscriptions: + if existing_sub.get('endpoint') == subscription.get('endpoint'): + return {'success': True, 'message': 'Already subscribed to browser notifications'} + + # Add new subscription + datastore.data['settings']['application']['browser_subscriptions'].append(subscription) + datastore.needs_write = True + + logger.info(f"New browser notification subscription: {subscription.get('endpoint')}") + + return {'success': True, 'message': 'Successfully subscribed to browser notifications'} + + except Exception as e: + logger.error(f"Failed to subscribe to browser notifications: {e}") + return {'success': False, 'message': f'Subscription failed: {str(e)}'}, 500 + + +class BrowserNotificationsUnsubscribe(Resource): + """Unsubscribe from browser notifications""" + + @marshal_with(browser_notifications_fields) + def post(self): + try: + data = request.get_json() + if not data: + return {'success': False, 'message': 'No data provided'}, 400 + + subscription = data.get('subscription') + + if not subscription or not subscription.get('endpoint'): + return {'success': False, 'message': 'Valid subscription is required'}, 400 + + # Get datastore + datastore = current_app.config.get('DATASTORE') + if not datastore: + return {'success': False, 'message': 'Datastore not available'}, 500 + + # Check if subscriptions exist + browser_subscriptions = datastore.data.get('settings', {}).get('application', {}).get('browser_subscriptions', []) + if not browser_subscriptions: + return {'success': True, 'message': 'No subscriptions found'} + + # Remove subscription with matching endpoint + endpoint = subscription.get('endpoint') + original_count = len(browser_subscriptions) + + datastore.data['settings']['application']['browser_subscriptions'] = [ + sub for sub in browser_subscriptions + if sub.get('endpoint') != endpoint + ] + + removed_count = original_count - len(datastore.data['settings']['application']['browser_subscriptions']) + + if removed_count > 0: + datastore.needs_write = True + logger.info(f"Removed {removed_count} browser notification subscription(s)") + return {'success': True, 'message': 'Successfully unsubscribed from browser notifications'} + else: + return {'success': True, 'message': 'No matching subscription found'} + + except Exception as e: + logger.error(f"Failed to unsubscribe from browser notifications: {e}") + return {'success': False, 'message': f'Unsubscribe failed: {str(e)}'}, 500 + + +class BrowserNotificationsTest(Resource): + """Send a test browser notification""" + + @marshal_with(test_notification_fields) + def post(self): + try: + data = request.get_json() + if not data: + return {'success': False, 'message': 'No data provided', 'sent_count': 0}, 400 + + title = data.get('title', 'Test Notification') + body = data.get('body', 'This is a test notification from changedetection.io') + + # Get datastore to check if subscriptions exist + datastore = current_app.config.get('DATASTORE') + if not datastore: + return {'success': False, 'message': 'Datastore not available', 'sent_count': 0}, 500 + + # Check if there are subscriptions before attempting to send + browser_subscriptions = datastore.data.get('settings', {}).get('application', {}).get('browser_subscriptions', []) + if not browser_subscriptions: + return {'success': False, 'message': 'No subscriptions found', 'sent_count': 0}, 404 + + # Use the apprise handler directly + try: + from changedetectionio.notification.apprise_plugin.custom_handlers import apprise_browser_notification_handler + + # Call the apprise handler with test data + success = apprise_browser_notification_handler( + body=body, + title=title, + notify_type='info', + meta={'url': 'browser://test'} + ) + + # Count how many subscriptions we have after sending (some may have been removed if invalid) + final_subscriptions = datastore.data.get('settings', {}).get('application', {}).get('browser_subscriptions', []) + sent_count = len(browser_subscriptions) # Original count + + if success: + return { + 'success': True, + 'message': f'Test notification sent successfully to {sent_count} subscriber(s)', + 'sent_count': sent_count + } + else: + return { + 'success': False, + 'message': 'Failed to send test notification', + 'sent_count': 0 + }, 500 + + except ImportError: + return {'success': False, 'message': 'Browser notification handler not available', 'sent_count': 0}, 500 + + except Exception as e: + logger.error(f"Failed to send test browser notification: {e}") + return {'success': False, 'message': f'Test failed: {str(e)}', 'sent_count': 0}, 500 + + + + diff --git a/changedetectionio/notification/apprise_plugin/browser_notification_helpers.py b/changedetectionio/notification/apprise_plugin/browser_notification_helpers.py index 6ad2c32b..85a97af5 100644 --- a/changedetectionio/notification/apprise_plugin/browser_notification_helpers.py +++ b/changedetectionio/notification/apprise_plugin/browser_notification_helpers.py @@ -17,38 +17,38 @@ def convert_pem_private_key_for_pywebpush(private_key): private_key: PEM private key string or already converted key Returns: - Private key in the format pywebpush expects (PEM string for pywebpush) + Vapid instance for pywebpush (avoids PEM parsing compatibility issues) """ - # pywebpush expects the PEM string directly - if not isinstance(private_key, str): - return private_key - - # If it doesn't look like PEM, return as-is - if not private_key.startswith('-----BEGIN'): - return private_key - try: - from cryptography.hazmat.primitives import serialization - from cryptography.hazmat.primitives.asymmetric import ec + from py_vapid import Vapid + import tempfile + import os - # Validate the key by loading it - private_key_bytes = private_key.encode('utf-8') - private_key_obj = serialization.load_pem_private_key(private_key_bytes, password=None) - - # Verify it's an ECDSA key (required for VAPID) - if not isinstance(private_key_obj, ec.EllipticCurvePrivateKey): - logger.error("Private key is not an ECDSA key - VAPID requires ECDSA") + # If we get a string, assume it's PEM and create a Vapid instance from it + if isinstance(private_key, str) and private_key.startswith('-----BEGIN'): + # Write PEM to temporary file and load with Vapid.from_file + with tempfile.NamedTemporaryFile(mode='w', suffix='.pem', delete=False) as tmp_file: + tmp_file.write(private_key) + tmp_file.flush() + temp_path = tmp_file.name + + try: + # Load using Vapid.from_file - this is more compatible with pywebpush + vapid_instance = Vapid.from_file(temp_path) + os.unlink(temp_path) # Clean up + logger.debug("Successfully created Vapid instance from PEM") + return vapid_instance + except Exception as e: + os.unlink(temp_path) # Clean up even on error + logger.error(f"Failed to create Vapid instance from PEM: {e}") + # Fall back to returning the original PEM string + return private_key + else: + # Return as-is if not a PEM string return private_key - # Ensure the key has the right curve (P-256 for VAPID) - if private_key_obj.curve.name != 'secp256r1': - logger.warning(f"Private key uses curve {private_key_obj.curve.name}, VAPID recommends secp256r1 (P-256)") - - # Return the original PEM - pywebpush handles PEM format correctly - return private_key - except Exception as e: - logger.warning(f"Failed to validate private key format: {e}") + logger.error(f"Failed to convert private key: {e}") return private_key