From 7a08cbb6912f83bf917fa549d9e4f95fd2cee841 Mon Sep 17 00:00:00 2001 From: dgtlmoon Date: Tue, 6 Jan 2026 18:21:09 +0100 Subject: [PATCH] WIP --- .../notification_dashboard/__init__.py | 18 ++- .../templates/notification-dashboard.html | 21 ++- .../notification/task_queue/__init__.py | 145 +++++++++++++----- .../notification/task_queue/file_storage.py | 123 +++++++++++++-- 4 files changed, 249 insertions(+), 58 deletions(-) diff --git a/changedetectionio/blueprint/notification_dashboard/__init__.py b/changedetectionio/blueprint/notification_dashboard/__init__.py index 0109129d..5a845729 100644 --- a/changedetectionio/blueprint/notification_dashboard/__init__.py +++ b/changedetectionio/blueprint/notification_dashboard/__init__.py @@ -91,15 +91,25 @@ def construct_blueprint(): @login_optionally_required def retry_notification(task_id): """Retry a failed notification (from dead letter queue)""" - from changedetectionio.notification.task_queue import retry_failed_notification + from changedetectionio.notification.task_queue import retry_failed_notification, get_task_metadata + + # Check if task_id exists first to provide better error message + if not task_id or task_id == 'TASK_ID_PLACEHOLDER': + flash("Invalid task ID. Please refresh the page and try again.", 'error') + return redirect(url_for('notification_dashboard.dashboard')) + + # Check if task exists in metadata + task_metadata = get_task_metadata(task_id) + if not task_metadata: + flash(f"Task ID '{task_id}' not found. It may have been already retried or removed.", 'error') + return redirect(url_for('notification_dashboard.dashboard')) success = retry_failed_notification(task_id) - message = f"Notification queued for retry." if success else f"Failed to retry notification. Check logs for details." if success: - flash(message, 'notice') + flash("Notification queued for retry.", 'notice') else: - flash(message, 'error') + flash("Failed to retry notification. The task may be missing notification data. Check logs for details.", 'error') return redirect(url_for('notification_dashboard.dashboard')) diff --git a/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html b/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html index 2eb6fc21..80bda3fa 100644 --- a/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html +++ b/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html @@ -193,15 +193,20 @@ $(function() { html += '
'; html += `
Event Details
`; if (event.status === 'failed') { - html += ` - - - - - Retry - `; + const retryUrl = '/notification-dashboard/retry/' + event.id; + html += `
+ + +
`; } else if (event.status === 'retrying') { - html += `Send Now`; + const sendNowUrl = '/notification-dashboard/send-now/' + event.id; + html += `Send Now`; } html += '
'; diff --git a/changedetectionio/notification/task_queue/__init__.py b/changedetectionio/notification/task_queue/__init__.py index 760efe70..5214b92a 100644 --- a/changedetectionio/notification/task_queue/__init__.py +++ b/changedetectionio/notification/task_queue/__init__.py @@ -410,16 +410,17 @@ def get_pending_notifications(limit=50): if watch_uuid and huey is not None and hasattr(huey.storage, 'path'): try: import os - import json import glob + from .file_storage import _safe_json_load attempts_dir = os.path.join(huey.storage.path, 'retry_attempts') if os.path.exists(attempts_dir): # Load retry attempt files to get notification_urls and payload attempt_pattern = os.path.join(attempts_dir, f"{watch_uuid}.*.json") for attempt_file in sorted(glob.glob(attempt_pattern)): try: - with open(attempt_file, 'r') as f: - attempt_data = json.load(f) + # Use safe JSON load with corruption handling + attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path) + if attempt_data: # Format timestamp for display attempt_time = attempt_data.get('timestamp') if attempt_time: @@ -692,7 +693,6 @@ def get_delivered_notifications(limit=50): return [] import os - import json try: success_dir = os.path.join(storage_path, 'success') @@ -709,11 +709,14 @@ def get_delivered_notifications(limit=50): # Load up to 'limit' files notifications = [] from changedetectionio.notification_service import timestamp_to_localtime + from .file_storage import _safe_json_load + for filename in success_files[:limit]: try: file_path = os.path.join(success_dir, filename) - with open(file_path, 'r') as f: - notif = json.load(f) + # Use safe JSON load with corruption handling + notif = _safe_json_load(file_path, 'success', storage_path) + if notif: if notif.get('timestamp'): notif['timestamp_formatted'] = timestamp_to_localtime(notif['timestamp']) notifications.append(notif) @@ -839,16 +842,17 @@ def get_failed_notifications(limit=100, max_age_days=30): notification_watch_uuid = notification_data.get('uuid') if notification_watch_uuid and hasattr(huey.storage, 'path'): import os - import json import glob + from .file_storage import _safe_json_load attempts_dir = os.path.join(huey.storage.path, 'retry_attempts') if os.path.exists(attempts_dir): attempt_pattern = os.path.join(attempts_dir, f"{notification_watch_uuid}.*.json") for attempt_file in sorted(glob.glob(attempt_pattern)): try: - with open(attempt_file, 'r') as f: - attempt_data = json.load(f) + # Use safe JSON load with corruption handling + attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path) + if attempt_data: # Format timestamp for display attempt_time = attempt_data.get('timestamp') if attempt_time: @@ -1287,9 +1291,8 @@ def _extract_notification_urls(n_object): def _store_successful_notification(n_object, apprise_logs, payload=None): - """Store successful notification record and cleanup retry attempts.""" + """Store successful notification record and cleanup retry attempts (with atomic write).""" import os - import json import time storage_path = _get_storage_path() @@ -1319,19 +1322,22 @@ def _store_successful_notification(n_object, apprise_logs, payload=None): } success_file = os.path.join(success_dir, f"success-{unique_id}.json") - with open(success_file, 'w') as f: - json.dump(success_data, f, indent=2) - logger.debug(f"Stored delivered notification: {unique_id}") + # Use atomic write to prevent corruption on crash + from .file_storage import _atomic_json_write + try: + _atomic_json_write(success_file, success_data) + logger.debug(f"Stored delivered notification: {unique_id}") + except Exception as e: + logger.error(f"Failed to store successful notification atomically: {e}") # Cleanup old success files _cleanup_old_success_notifications(success_dir, keep=50) def _store_retry_attempt(n_object, error, payload=None): - """Store retry attempt details after failure.""" + """Store retry attempt details after failure (with atomic write).""" import os - import json import time import uuid @@ -1375,10 +1381,13 @@ def _store_retry_attempt(n_object, error, payload=None): 'payload': payload # What was attempted to be sent to Apprise } - with open(attempt_file, 'w') as f: - json.dump(attempt_data, f, indent=2) - - logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}") + # Use atomic write to prevent corruption on crash + from .file_storage import _atomic_json_write + try: + _atomic_json_write(attempt_file, attempt_data) + logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}") + except Exception as e: + logger.error(f"Failed to store retry attempt atomically: {e}") def _handle_notification_error(watch_uuid, error, notification_debug_log, app, datastore): @@ -1596,6 +1605,19 @@ def _get_task_metadata(task_id): return task_manager.get_task_metadata(task_id) +def get_task_metadata(task_id): + """ + Public wrapper to retrieve notification metadata by task ID. + + Args: + task_id: Huey task ID + + Returns: + Dict with task metadata if found, None otherwise + """ + return _get_task_metadata(task_id) + + def _delete_task_metadata(task_id): """Delete task metadata using task manager.""" task_manager = _get_task_manager() @@ -1681,44 +1703,87 @@ def clear_all_notifications(): return {'error': str(e)} -def cleanup_old_failed_notifications(max_age_days=30): +def cleanup_old_failed_notifications(max_age_days=30, max_failed_count=None): """ - Clean up failed notifications and retry attempts older than max_age_days. + Clean up failed notifications with both age and count limits (INDUSTRIAL-GRADE: overflow protection). + + This prevents unbounded growth of the dead letter queue which could cause + disk space issues in long-running aerospace/industrial systems. Called on startup to prevent indefinite accumulation of old failures. Args: max_age_days: Delete failed notifications older than this (default: 30 days) + max_failed_count: Maximum number of failed notifications to keep (default: None = no limit) + Set to 1000 on startup for overflow protection. Returns: - Number of old retry attempts deleted + int: Total number of items deleted (failed notifications + retry attempts) """ if huey is None: return 0 import time + deleted_counts = { + 'failed_by_age': 0, + 'failed_by_overflow': 0, + 'retry_attempts': 0 + } + try: cutoff_time = time.time() - (max_age_days * 86400) - # Trigger cleanup of old failed notifications - # The get_failed_notifications function already handles cleanup - get_failed_notifications(limit=1000, max_age_days=max_age_days) + # Step 1: Get all failed notifications (age-based cleanup happens inside this call) + failed_before_age_cleanup = get_failed_notifications(limit=10000, max_age_days=max_age_days) - # Clean up old retry attempts using task manager + # Step 2: If we still have too many failed notifications, delete oldest ones (overflow protection) + # Only applies if max_failed_count is explicitly set + if max_failed_count is not None and len(failed_before_age_cleanup) > max_failed_count: + # Sort by timestamp (oldest first) + failed_sorted = sorted(failed_before_age_cleanup, key=lambda x: x.get('timestamp', 0)) + + # Delete excess (oldest notifications beyond the limit) + to_delete = failed_sorted[:len(failed_before_age_cleanup) - max_failed_count] + + logger.warning(f"Dead letter queue overflow: {len(failed_before_age_cleanup)} failed notifications exceeds limit of {max_failed_count}") + logger.warning(f"Deleting {len(to_delete)} oldest failed notifications for overflow protection") + + for task in to_delete: + task_id = task.get('task_id') + try: + # Delete result and metadata + _delete_result(task_id) + _delete_task_metadata(task_id) + deleted_counts['failed_by_overflow'] += 1 + logger.debug(f"Deleted old failed notification {task_id[:20]}... (overflow protection)") + except Exception as e: + logger.debug(f"Error deleting failed task {task_id}: {e}") + + if deleted_counts['failed_by_overflow'] > 0: + logger.info(f"Overflow protection: deleted {deleted_counts['failed_by_overflow']} oldest failed notifications") + + # Step 3: Clean up old retry attempts using task manager task_manager = _get_task_manager() if task_manager: - deleted_count = task_manager.cleanup_old_retry_attempts(cutoff_time) - if deleted_count > 0: - logger.info(f"Cleaned up {deleted_count} old retry attempt files (older than {max_age_days} days)") - else: - deleted_count = 0 + deleted_counts['retry_attempts'] = task_manager.cleanup_old_retry_attempts(cutoff_time) + if deleted_counts['retry_attempts'] > 0: + logger.info(f"Cleaned up {deleted_counts['retry_attempts']} old retry attempt files (older than {max_age_days} days)") - logger.info(f"Completed cleanup check for failed notifications older than {max_age_days} days") - return deleted_count + total_deleted = deleted_counts['failed_by_overflow'] + deleted_counts['retry_attempts'] + + if total_deleted > 0: + logger.info(f"Cleanup completed - " + f"failed_by_overflow: {deleted_counts['failed_by_overflow']}, " + f"retry_attempts: {deleted_counts['retry_attempts']}, " + f"total: {total_deleted}") + + # Return total count for backward compatibility (tests expect int) + # Detailed breakdown is logged above + return total_deleted except Exception as e: - logger.debug(f"Unable to cleanup old failed notifications: {e}") + logger.error(f"Error during cleanup_old_failed_notifications: {e}") return 0 @@ -1739,8 +1804,14 @@ def start_huey_consumer(): logger.info(f"Starting Huey notification consumer (single-threaded)") - # Clean up old failed notifications on startup - cleanup_old_failed_notifications(max_age_days=30) + # Clean up old failed notifications on startup (with overflow protection) + # Age limit: 30 days, Count limit: 1000 failed notifications max + logger.info("Running startup cleanup: removing old failed notifications and enforcing limits...") + cleanup_deleted = cleanup_old_failed_notifications(max_age_days=30, max_failed_count=1000) + if cleanup_deleted > 0: + logger.info(f"Startup cleanup completed: deleted {cleanup_deleted} items") + else: + logger.info("Startup cleanup completed: no old items to delete") try: from huey.consumer import Consumer diff --git a/changedetectionio/notification/task_queue/file_storage.py b/changedetectionio/notification/task_queue/file_storage.py index 8011210f..30e64fe3 100644 --- a/changedetectionio/notification/task_queue/file_storage.py +++ b/changedetectionio/notification/task_queue/file_storage.py @@ -2,6 +2,11 @@ FileStorage backend task manager for Huey notifications. This is the default backend, optimized for NAS/CIFS compatibility. + +INDUSTRIAL-GRADE ENHANCEMENTS: +- Atomic file writes (prevents corruption on crash/power failure) +- JSON corruption detection and recovery +- Lost-found directory for corrupted files """ from loguru import logger @@ -10,6 +15,107 @@ from .base import HueyTaskManager import os + +def _atomic_json_write(filepath, data): + """ + Atomic JSON write - never leaves partial/corrupted files. + + Critical for aerospace-grade reliability: + - Writes to temp file first + - Forces data to disk (fsync) + - Atomic rename (POSIX guarantees atomicity) + + If process crashes mid-write, you either get: + - Old complete file (rename didn't happen) + - New complete file (rename succeeded) + Never a partial/corrupted file. + + Args: + filepath: Target file path + data: Data to write (will be JSON encoded) + + Returns: + True on success + + Raises: + Exception on write failure + """ + import tempfile + import json + + directory = os.path.dirname(filepath) + + # Write to temp file in same directory (same filesystem = atomic rename) + fd, temp_path = tempfile.mkstemp(dir=directory, prefix='.tmp_', suffix='.json') + + try: + with os.fdopen(fd, 'w') as f: + json.dump(data, f, indent=2) + f.flush() + os.fsync(f.fileno()) # Force to disk before rename + + # Atomic rename (POSIX guarantees atomicity) + # If crash happens here, worst case is orphaned temp file + os.replace(temp_path, filepath) + return True + + except Exception as e: + # Clean up temp file on error + try: + os.unlink(temp_path) + except: + pass + raise + + +def _safe_json_load(filepath, schema_name, storage_path): + """ + Load JSON with corruption detection and recovery. + + Corrupted files are moved to lost-found for forensic analysis. + + Args: + filepath: JSON file to load + schema_name: Schema type (for lost-found directory) + storage_path: Root storage path + + Returns: + Parsed JSON data or None if corrupted + """ + import json + import shutil + import time + + try: + with open(filepath, 'r') as f: + return json.load(f) + + except (json.JSONDecodeError, EOFError) as e: + # Corrupted or incomplete JSON file + file_size = os.path.getsize(filepath) if os.path.exists(filepath) else 0 + filename = os.path.basename(filepath) + logger.warning(f"Corrupted {schema_name} file: {filename} ({file_size} bytes) - moving to lost-found") + + try: + lost_found_dir = os.path.join(storage_path, 'lost-found', schema_name) + os.makedirs(lost_found_dir, exist_ok=True) + + timestamp = int(time.time()) + lost_found_path = os.path.join(lost_found_dir, f"{filename}.{timestamp}.corrupted") + + shutil.move(filepath, lost_found_path) + logger.info(f"Moved corrupted {schema_name} file to {lost_found_path}") + + except Exception as move_err: + logger.error(f"Unable to move corrupted {schema_name} file: {move_err}") + + return None + + except Exception as e: + logger.debug(f"Unable to load {schema_name} file {filepath}: {e}") + return None + + class FileStorageTaskManager(HueyTaskManager): """Task manager for FileStorage backend (default, NAS-safe).""" @@ -193,8 +299,7 @@ class FileStorageTaskManager(HueyTaskManager): return cleared def store_task_metadata(self, task_id, metadata): - """Store task metadata as JSON file.""" - import json + """Store task metadata as JSON file with atomic write.""" import time if not self.storage_path: @@ -211,17 +316,16 @@ class FileStorageTaskManager(HueyTaskManager): **metadata } - with open(metadata_file, 'w') as f: - json.dump(metadata_with_id, f, indent=2) + # Use atomic write to prevent corruption + _atomic_json_write(metadata_file, metadata_with_id) return True + except Exception as e: logger.debug(f"Unable to store task metadata: {e}") return False def get_task_metadata(self, task_id): - """Retrieve task metadata from JSON file.""" - import json - + """Retrieve task metadata from JSON file with corruption handling.""" if not self.storage_path: return None @@ -230,8 +334,9 @@ class FileStorageTaskManager(HueyTaskManager): metadata_file = os.path.join(metadata_dir, f"{task_id}.json") if os.path.exists(metadata_file): - with open(metadata_file, 'r') as f: - return json.load(f) + # Use safe JSON load with corruption handling + return _safe_json_load(metadata_file, 'task_metadata', self.storage_path) + except Exception as e: logger.debug(f"Unable to load task metadata for {task_id}: {e}")