diff --git a/changedetectionio/blueprint/notification_dashboard/__init__.py b/changedetectionio/blueprint/notification_dashboard/__init__.py
index 0109129d..5a845729 100644
--- a/changedetectionio/blueprint/notification_dashboard/__init__.py
+++ b/changedetectionio/blueprint/notification_dashboard/__init__.py
@@ -91,15 +91,25 @@ def construct_blueprint():
@login_optionally_required
def retry_notification(task_id):
"""Retry a failed notification (from dead letter queue)"""
- from changedetectionio.notification.task_queue import retry_failed_notification
+ from changedetectionio.notification.task_queue import retry_failed_notification, get_task_metadata
+
+ # Check if task_id exists first to provide better error message
+ if not task_id or task_id == 'TASK_ID_PLACEHOLDER':
+ flash("Invalid task ID. Please refresh the page and try again.", 'error')
+ return redirect(url_for('notification_dashboard.dashboard'))
+
+ # Check if task exists in metadata
+ task_metadata = get_task_metadata(task_id)
+ if not task_metadata:
+ flash(f"Task ID '{task_id}' not found. It may have been already retried or removed.", 'error')
+ return redirect(url_for('notification_dashboard.dashboard'))
success = retry_failed_notification(task_id)
- message = f"Notification queued for retry." if success else f"Failed to retry notification. Check logs for details."
if success:
- flash(message, 'notice')
+ flash("Notification queued for retry.", 'notice')
else:
- flash(message, 'error')
+ flash("Failed to retry notification. The task may be missing notification data. Check logs for details.", 'error')
return redirect(url_for('notification_dashboard.dashboard'))
diff --git a/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html b/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html
index 2eb6fc21..80bda3fa 100644
--- a/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html
+++ b/changedetectionio/blueprint/notification_dashboard/templates/notification-dashboard.html
@@ -193,15 +193,20 @@ $(function() {
html += '
';
diff --git a/changedetectionio/notification/task_queue/__init__.py b/changedetectionio/notification/task_queue/__init__.py
index 760efe70..5214b92a 100644
--- a/changedetectionio/notification/task_queue/__init__.py
+++ b/changedetectionio/notification/task_queue/__init__.py
@@ -410,16 +410,17 @@ def get_pending_notifications(limit=50):
if watch_uuid and huey is not None and hasattr(huey.storage, 'path'):
try:
import os
- import json
import glob
+ from .file_storage import _safe_json_load
attempts_dir = os.path.join(huey.storage.path, 'retry_attempts')
if os.path.exists(attempts_dir):
# Load retry attempt files to get notification_urls and payload
attempt_pattern = os.path.join(attempts_dir, f"{watch_uuid}.*.json")
for attempt_file in sorted(glob.glob(attempt_pattern)):
try:
- with open(attempt_file, 'r') as f:
- attempt_data = json.load(f)
+ # Use safe JSON load with corruption handling
+ attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path)
+ if attempt_data:
# Format timestamp for display
attempt_time = attempt_data.get('timestamp')
if attempt_time:
@@ -692,7 +693,6 @@ def get_delivered_notifications(limit=50):
return []
import os
- import json
try:
success_dir = os.path.join(storage_path, 'success')
@@ -709,11 +709,14 @@ def get_delivered_notifications(limit=50):
# Load up to 'limit' files
notifications = []
from changedetectionio.notification_service import timestamp_to_localtime
+ from .file_storage import _safe_json_load
+
for filename in success_files[:limit]:
try:
file_path = os.path.join(success_dir, filename)
- with open(file_path, 'r') as f:
- notif = json.load(f)
+ # Use safe JSON load with corruption handling
+ notif = _safe_json_load(file_path, 'success', storage_path)
+ if notif:
if notif.get('timestamp'):
notif['timestamp_formatted'] = timestamp_to_localtime(notif['timestamp'])
notifications.append(notif)
@@ -839,16 +842,17 @@ def get_failed_notifications(limit=100, max_age_days=30):
notification_watch_uuid = notification_data.get('uuid')
if notification_watch_uuid and hasattr(huey.storage, 'path'):
import os
- import json
import glob
+ from .file_storage import _safe_json_load
attempts_dir = os.path.join(huey.storage.path, 'retry_attempts')
if os.path.exists(attempts_dir):
attempt_pattern = os.path.join(attempts_dir, f"{notification_watch_uuid}.*.json")
for attempt_file in sorted(glob.glob(attempt_pattern)):
try:
- with open(attempt_file, 'r') as f:
- attempt_data = json.load(f)
+ # Use safe JSON load with corruption handling
+ attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path)
+ if attempt_data:
# Format timestamp for display
attempt_time = attempt_data.get('timestamp')
if attempt_time:
@@ -1287,9 +1291,8 @@ def _extract_notification_urls(n_object):
def _store_successful_notification(n_object, apprise_logs, payload=None):
- """Store successful notification record and cleanup retry attempts."""
+ """Store successful notification record and cleanup retry attempts (with atomic write)."""
import os
- import json
import time
storage_path = _get_storage_path()
@@ -1319,19 +1322,22 @@ def _store_successful_notification(n_object, apprise_logs, payload=None):
}
success_file = os.path.join(success_dir, f"success-{unique_id}.json")
- with open(success_file, 'w') as f:
- json.dump(success_data, f, indent=2)
- logger.debug(f"Stored delivered notification: {unique_id}")
+ # Use atomic write to prevent corruption on crash
+ from .file_storage import _atomic_json_write
+ try:
+ _atomic_json_write(success_file, success_data)
+ logger.debug(f"Stored delivered notification: {unique_id}")
+ except Exception as e:
+ logger.error(f"Failed to store successful notification atomically: {e}")
# Cleanup old success files
_cleanup_old_success_notifications(success_dir, keep=50)
def _store_retry_attempt(n_object, error, payload=None):
- """Store retry attempt details after failure."""
+ """Store retry attempt details after failure (with atomic write)."""
import os
- import json
import time
import uuid
@@ -1375,10 +1381,13 @@ def _store_retry_attempt(n_object, error, payload=None):
'payload': payload # What was attempted to be sent to Apprise
}
- with open(attempt_file, 'w') as f:
- json.dump(attempt_data, f, indent=2)
-
- logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}")
+ # Use atomic write to prevent corruption on crash
+ from .file_storage import _atomic_json_write
+ try:
+ _atomic_json_write(attempt_file, attempt_data)
+ logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}")
+ except Exception as e:
+ logger.error(f"Failed to store retry attempt atomically: {e}")
def _handle_notification_error(watch_uuid, error, notification_debug_log, app, datastore):
@@ -1596,6 +1605,19 @@ def _get_task_metadata(task_id):
return task_manager.get_task_metadata(task_id)
+def get_task_metadata(task_id):
+ """
+ Public wrapper to retrieve notification metadata by task ID.
+
+ Args:
+ task_id: Huey task ID
+
+ Returns:
+ Dict with task metadata if found, None otherwise
+ """
+ return _get_task_metadata(task_id)
+
+
def _delete_task_metadata(task_id):
"""Delete task metadata using task manager."""
task_manager = _get_task_manager()
@@ -1681,44 +1703,87 @@ def clear_all_notifications():
return {'error': str(e)}
-def cleanup_old_failed_notifications(max_age_days=30):
+def cleanup_old_failed_notifications(max_age_days=30, max_failed_count=None):
"""
- Clean up failed notifications and retry attempts older than max_age_days.
+ Clean up failed notifications with both age and count limits (INDUSTRIAL-GRADE: overflow protection).
+
+ This prevents unbounded growth of the dead letter queue which could cause
+ disk space issues in long-running aerospace/industrial systems.
Called on startup to prevent indefinite accumulation of old failures.
Args:
max_age_days: Delete failed notifications older than this (default: 30 days)
+ max_failed_count: Maximum number of failed notifications to keep (default: None = no limit)
+ Set to 1000 on startup for overflow protection.
Returns:
- Number of old retry attempts deleted
+ int: Total number of items deleted (failed notifications + retry attempts)
"""
if huey is None:
return 0
import time
+ deleted_counts = {
+ 'failed_by_age': 0,
+ 'failed_by_overflow': 0,
+ 'retry_attempts': 0
+ }
+
try:
cutoff_time = time.time() - (max_age_days * 86400)
- # Trigger cleanup of old failed notifications
- # The get_failed_notifications function already handles cleanup
- get_failed_notifications(limit=1000, max_age_days=max_age_days)
+ # Step 1: Get all failed notifications (age-based cleanup happens inside this call)
+ failed_before_age_cleanup = get_failed_notifications(limit=10000, max_age_days=max_age_days)
- # Clean up old retry attempts using task manager
+ # Step 2: If we still have too many failed notifications, delete oldest ones (overflow protection)
+ # Only applies if max_failed_count is explicitly set
+ if max_failed_count is not None and len(failed_before_age_cleanup) > max_failed_count:
+ # Sort by timestamp (oldest first)
+ failed_sorted = sorted(failed_before_age_cleanup, key=lambda x: x.get('timestamp', 0))
+
+ # Delete excess (oldest notifications beyond the limit)
+ to_delete = failed_sorted[:len(failed_before_age_cleanup) - max_failed_count]
+
+ logger.warning(f"Dead letter queue overflow: {len(failed_before_age_cleanup)} failed notifications exceeds limit of {max_failed_count}")
+ logger.warning(f"Deleting {len(to_delete)} oldest failed notifications for overflow protection")
+
+ for task in to_delete:
+ task_id = task.get('task_id')
+ try:
+ # Delete result and metadata
+ _delete_result(task_id)
+ _delete_task_metadata(task_id)
+ deleted_counts['failed_by_overflow'] += 1
+ logger.debug(f"Deleted old failed notification {task_id[:20]}... (overflow protection)")
+ except Exception as e:
+ logger.debug(f"Error deleting failed task {task_id}: {e}")
+
+ if deleted_counts['failed_by_overflow'] > 0:
+ logger.info(f"Overflow protection: deleted {deleted_counts['failed_by_overflow']} oldest failed notifications")
+
+ # Step 3: Clean up old retry attempts using task manager
task_manager = _get_task_manager()
if task_manager:
- deleted_count = task_manager.cleanup_old_retry_attempts(cutoff_time)
- if deleted_count > 0:
- logger.info(f"Cleaned up {deleted_count} old retry attempt files (older than {max_age_days} days)")
- else:
- deleted_count = 0
+ deleted_counts['retry_attempts'] = task_manager.cleanup_old_retry_attempts(cutoff_time)
+ if deleted_counts['retry_attempts'] > 0:
+ logger.info(f"Cleaned up {deleted_counts['retry_attempts']} old retry attempt files (older than {max_age_days} days)")
- logger.info(f"Completed cleanup check for failed notifications older than {max_age_days} days")
- return deleted_count
+ total_deleted = deleted_counts['failed_by_overflow'] + deleted_counts['retry_attempts']
+
+ if total_deleted > 0:
+ logger.info(f"Cleanup completed - "
+ f"failed_by_overflow: {deleted_counts['failed_by_overflow']}, "
+ f"retry_attempts: {deleted_counts['retry_attempts']}, "
+ f"total: {total_deleted}")
+
+ # Return total count for backward compatibility (tests expect int)
+ # Detailed breakdown is logged above
+ return total_deleted
except Exception as e:
- logger.debug(f"Unable to cleanup old failed notifications: {e}")
+ logger.error(f"Error during cleanup_old_failed_notifications: {e}")
return 0
@@ -1739,8 +1804,14 @@ def start_huey_consumer():
logger.info(f"Starting Huey notification consumer (single-threaded)")
- # Clean up old failed notifications on startup
- cleanup_old_failed_notifications(max_age_days=30)
+ # Clean up old failed notifications on startup (with overflow protection)
+ # Age limit: 30 days, Count limit: 1000 failed notifications max
+ logger.info("Running startup cleanup: removing old failed notifications and enforcing limits...")
+ cleanup_deleted = cleanup_old_failed_notifications(max_age_days=30, max_failed_count=1000)
+ if cleanup_deleted > 0:
+ logger.info(f"Startup cleanup completed: deleted {cleanup_deleted} items")
+ else:
+ logger.info("Startup cleanup completed: no old items to delete")
try:
from huey.consumer import Consumer
diff --git a/changedetectionio/notification/task_queue/file_storage.py b/changedetectionio/notification/task_queue/file_storage.py
index 8011210f..30e64fe3 100644
--- a/changedetectionio/notification/task_queue/file_storage.py
+++ b/changedetectionio/notification/task_queue/file_storage.py
@@ -2,6 +2,11 @@
FileStorage backend task manager for Huey notifications.
This is the default backend, optimized for NAS/CIFS compatibility.
+
+INDUSTRIAL-GRADE ENHANCEMENTS:
+- Atomic file writes (prevents corruption on crash/power failure)
+- JSON corruption detection and recovery
+- Lost-found directory for corrupted files
"""
from loguru import logger
@@ -10,6 +15,107 @@ from .base import HueyTaskManager
import os
+
+def _atomic_json_write(filepath, data):
+ """
+ Atomic JSON write - never leaves partial/corrupted files.
+
+ Critical for aerospace-grade reliability:
+ - Writes to temp file first
+ - Forces data to disk (fsync)
+ - Atomic rename (POSIX guarantees atomicity)
+
+ If process crashes mid-write, you either get:
+ - Old complete file (rename didn't happen)
+ - New complete file (rename succeeded)
+ Never a partial/corrupted file.
+
+ Args:
+ filepath: Target file path
+ data: Data to write (will be JSON encoded)
+
+ Returns:
+ True on success
+
+ Raises:
+ Exception on write failure
+ """
+ import tempfile
+ import json
+
+ directory = os.path.dirname(filepath)
+
+ # Write to temp file in same directory (same filesystem = atomic rename)
+ fd, temp_path = tempfile.mkstemp(dir=directory, prefix='.tmp_', suffix='.json')
+
+ try:
+ with os.fdopen(fd, 'w') as f:
+ json.dump(data, f, indent=2)
+ f.flush()
+ os.fsync(f.fileno()) # Force to disk before rename
+
+ # Atomic rename (POSIX guarantees atomicity)
+ # If crash happens here, worst case is orphaned temp file
+ os.replace(temp_path, filepath)
+ return True
+
+ except Exception as e:
+ # Clean up temp file on error
+ try:
+ os.unlink(temp_path)
+ except:
+ pass
+ raise
+
+
+def _safe_json_load(filepath, schema_name, storage_path):
+ """
+ Load JSON with corruption detection and recovery.
+
+ Corrupted files are moved to lost-found for forensic analysis.
+
+ Args:
+ filepath: JSON file to load
+ schema_name: Schema type (for lost-found directory)
+ storage_path: Root storage path
+
+ Returns:
+ Parsed JSON data or None if corrupted
+ """
+ import json
+ import shutil
+ import time
+
+ try:
+ with open(filepath, 'r') as f:
+ return json.load(f)
+
+ except (json.JSONDecodeError, EOFError) as e:
+ # Corrupted or incomplete JSON file
+ file_size = os.path.getsize(filepath) if os.path.exists(filepath) else 0
+ filename = os.path.basename(filepath)
+ logger.warning(f"Corrupted {schema_name} file: {filename} ({file_size} bytes) - moving to lost-found")
+
+ try:
+ lost_found_dir = os.path.join(storage_path, 'lost-found', schema_name)
+ os.makedirs(lost_found_dir, exist_ok=True)
+
+ timestamp = int(time.time())
+ lost_found_path = os.path.join(lost_found_dir, f"{filename}.{timestamp}.corrupted")
+
+ shutil.move(filepath, lost_found_path)
+ logger.info(f"Moved corrupted {schema_name} file to {lost_found_path}")
+
+ except Exception as move_err:
+ logger.error(f"Unable to move corrupted {schema_name} file: {move_err}")
+
+ return None
+
+ except Exception as e:
+ logger.debug(f"Unable to load {schema_name} file {filepath}: {e}")
+ return None
+
+
class FileStorageTaskManager(HueyTaskManager):
"""Task manager for FileStorage backend (default, NAS-safe)."""
@@ -193,8 +299,7 @@ class FileStorageTaskManager(HueyTaskManager):
return cleared
def store_task_metadata(self, task_id, metadata):
- """Store task metadata as JSON file."""
- import json
+ """Store task metadata as JSON file with atomic write."""
import time
if not self.storage_path:
@@ -211,17 +316,16 @@ class FileStorageTaskManager(HueyTaskManager):
**metadata
}
- with open(metadata_file, 'w') as f:
- json.dump(metadata_with_id, f, indent=2)
+ # Use atomic write to prevent corruption
+ _atomic_json_write(metadata_file, metadata_with_id)
return True
+
except Exception as e:
logger.debug(f"Unable to store task metadata: {e}")
return False
def get_task_metadata(self, task_id):
- """Retrieve task metadata from JSON file."""
- import json
-
+ """Retrieve task metadata from JSON file with corruption handling."""
if not self.storage_path:
return None
@@ -230,8 +334,9 @@ class FileStorageTaskManager(HueyTaskManager):
metadata_file = os.path.join(metadata_dir, f"{task_id}.json")
if os.path.exists(metadata_file):
- with open(metadata_file, 'r') as f:
- return json.load(f)
+ # Use safe JSON load with corruption handling
+ return _safe_json_load(metadata_file, 'task_metadata', self.storage_path)
+
except Exception as e:
logger.debug(f"Unable to load task metadata for {task_id}: {e}")