This commit is contained in:
dgtlmoon
2026-01-06 18:21:09 +01:00
parent 9bd704ec9f
commit 7a08cbb691
4 changed files with 249 additions and 58 deletions
@@ -91,15 +91,25 @@ def construct_blueprint():
@login_optionally_required
def retry_notification(task_id):
"""Retry a failed notification (from dead letter queue)"""
from changedetectionio.notification.task_queue import retry_failed_notification
from changedetectionio.notification.task_queue import retry_failed_notification, get_task_metadata
# Check if task_id exists first to provide better error message
if not task_id or task_id == 'TASK_ID_PLACEHOLDER':
flash("Invalid task ID. Please refresh the page and try again.", 'error')
return redirect(url_for('notification_dashboard.dashboard'))
# Check if task exists in metadata
task_metadata = get_task_metadata(task_id)
if not task_metadata:
flash(f"Task ID '{task_id}' not found. It may have been already retried or removed.", 'error')
return redirect(url_for('notification_dashboard.dashboard'))
success = retry_failed_notification(task_id)
message = f"Notification queued for retry." if success else f"Failed to retry notification. Check logs for details."
if success:
flash(message, 'notice')
flash("Notification queued for retry.", 'notice')
else:
flash(message, 'error')
flash("Failed to retry notification. The task may be missing notification data. Check logs for details.", 'error')
return redirect(url_for('notification_dashboard.dashboard'))
@@ -193,15 +193,20 @@ $(function() {
html += '<div class="detail-header">';
html += `<h5>Event Details</h5>`;
if (event.status === 'failed') {
html += `<a href="/notification-dashboard/retry/${event.id}" class="pure-button button-small resend-btn">
<svg width="14" height="14" viewBox="0 0 16 16" style="vertical-align: middle; margin-right: 4px;">
<path d="M13.5 8a5.5 5.5 0 1 1-1.65-3.95" stroke="currentColor" stroke-width="1.5" fill="none"/>
<path d="M13.5 2v4h-4" stroke="currentColor" stroke-width="1.5" fill="none"/>
</svg>
Retry
</a>`;
const retryUrl = '/notification-dashboard/retry/' + event.id;
html += `<form method="POST" action="${retryUrl}" style="display: inline;">
<input type="hidden" name="csrf_token" value="{{ csrf_token() }}"/>
<button type="submit" class="pure-button button-small resend-btn">
<svg width="14" height="14" viewBox="0 0 16 16" style="vertical-align: middle; margin-right: 4px;">
<path d="M13.5 8a5.5 5.5 0 1 1-1.65-3.95" stroke="currentColor" stroke-width="1.5" fill="none"/>
<path d="M13.5 2v4h-4" stroke="currentColor" stroke-width="1.5" fill="none"/>
</svg>
Retry
</button>
</form>`;
} else if (event.status === 'retrying') {
html += `<a href="/notification-dashboard/send-now/${event.id}" class="pure-button button-small resend-btn">Send Now</a>`;
const sendNowUrl = '/notification-dashboard/send-now/' + event.id;
html += `<a href="${sendNowUrl}" class="pure-button button-small resend-btn">Send Now</a>`;
}
html += '</div>';
@@ -410,16 +410,17 @@ def get_pending_notifications(limit=50):
if watch_uuid and huey is not None and hasattr(huey.storage, 'path'):
try:
import os
import json
import glob
from .file_storage import _safe_json_load
attempts_dir = os.path.join(huey.storage.path, 'retry_attempts')
if os.path.exists(attempts_dir):
# Load retry attempt files to get notification_urls and payload
attempt_pattern = os.path.join(attempts_dir, f"{watch_uuid}.*.json")
for attempt_file in sorted(glob.glob(attempt_pattern)):
try:
with open(attempt_file, 'r') as f:
attempt_data = json.load(f)
# Use safe JSON load with corruption handling
attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path)
if attempt_data:
# Format timestamp for display
attempt_time = attempt_data.get('timestamp')
if attempt_time:
@@ -692,7 +693,6 @@ def get_delivered_notifications(limit=50):
return []
import os
import json
try:
success_dir = os.path.join(storage_path, 'success')
@@ -709,11 +709,14 @@ def get_delivered_notifications(limit=50):
# Load up to 'limit' files
notifications = []
from changedetectionio.notification_service import timestamp_to_localtime
from .file_storage import _safe_json_load
for filename in success_files[:limit]:
try:
file_path = os.path.join(success_dir, filename)
with open(file_path, 'r') as f:
notif = json.load(f)
# Use safe JSON load with corruption handling
notif = _safe_json_load(file_path, 'success', storage_path)
if notif:
if notif.get('timestamp'):
notif['timestamp_formatted'] = timestamp_to_localtime(notif['timestamp'])
notifications.append(notif)
@@ -839,16 +842,17 @@ def get_failed_notifications(limit=100, max_age_days=30):
notification_watch_uuid = notification_data.get('uuid')
if notification_watch_uuid and hasattr(huey.storage, 'path'):
import os
import json
import glob
from .file_storage import _safe_json_load
attempts_dir = os.path.join(huey.storage.path, 'retry_attempts')
if os.path.exists(attempts_dir):
attempt_pattern = os.path.join(attempts_dir, f"{notification_watch_uuid}.*.json")
for attempt_file in sorted(glob.glob(attempt_pattern)):
try:
with open(attempt_file, 'r') as f:
attempt_data = json.load(f)
# Use safe JSON load with corruption handling
attempt_data = _safe_json_load(attempt_file, 'retry_attempts', huey.storage.path)
if attempt_data:
# Format timestamp for display
attempt_time = attempt_data.get('timestamp')
if attempt_time:
@@ -1287,9 +1291,8 @@ def _extract_notification_urls(n_object):
def _store_successful_notification(n_object, apprise_logs, payload=None):
"""Store successful notification record and cleanup retry attempts."""
"""Store successful notification record and cleanup retry attempts (with atomic write)."""
import os
import json
import time
storage_path = _get_storage_path()
@@ -1319,19 +1322,22 @@ def _store_successful_notification(n_object, apprise_logs, payload=None):
}
success_file = os.path.join(success_dir, f"success-{unique_id}.json")
with open(success_file, 'w') as f:
json.dump(success_data, f, indent=2)
logger.debug(f"Stored delivered notification: {unique_id}")
# Use atomic write to prevent corruption on crash
from .file_storage import _atomic_json_write
try:
_atomic_json_write(success_file, success_data)
logger.debug(f"Stored delivered notification: {unique_id}")
except Exception as e:
logger.error(f"Failed to store successful notification atomically: {e}")
# Cleanup old success files
_cleanup_old_success_notifications(success_dir, keep=50)
def _store_retry_attempt(n_object, error, payload=None):
"""Store retry attempt details after failure."""
"""Store retry attempt details after failure (with atomic write)."""
import os
import json
import time
import uuid
@@ -1375,10 +1381,13 @@ def _store_retry_attempt(n_object, error, payload=None):
'payload': payload # What was attempted to be sent to Apprise
}
with open(attempt_file, 'w') as f:
json.dump(attempt_data, f, indent=2)
logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}")
# Use atomic write to prevent corruption on crash
from .file_storage import _atomic_json_write
try:
_atomic_json_write(attempt_file, attempt_data)
logger.debug(f"Stored retry attempt {attempt_number} for watch {watch_uuid}")
except Exception as e:
logger.error(f"Failed to store retry attempt atomically: {e}")
def _handle_notification_error(watch_uuid, error, notification_debug_log, app, datastore):
@@ -1596,6 +1605,19 @@ def _get_task_metadata(task_id):
return task_manager.get_task_metadata(task_id)
def get_task_metadata(task_id):
"""
Public wrapper to retrieve notification metadata by task ID.
Args:
task_id: Huey task ID
Returns:
Dict with task metadata if found, None otherwise
"""
return _get_task_metadata(task_id)
def _delete_task_metadata(task_id):
"""Delete task metadata using task manager."""
task_manager = _get_task_manager()
@@ -1681,44 +1703,87 @@ def clear_all_notifications():
return {'error': str(e)}
def cleanup_old_failed_notifications(max_age_days=30):
def cleanup_old_failed_notifications(max_age_days=30, max_failed_count=None):
"""
Clean up failed notifications and retry attempts older than max_age_days.
Clean up failed notifications with both age and count limits (INDUSTRIAL-GRADE: overflow protection).
This prevents unbounded growth of the dead letter queue which could cause
disk space issues in long-running aerospace/industrial systems.
Called on startup to prevent indefinite accumulation of old failures.
Args:
max_age_days: Delete failed notifications older than this (default: 30 days)
max_failed_count: Maximum number of failed notifications to keep (default: None = no limit)
Set to 1000 on startup for overflow protection.
Returns:
Number of old retry attempts deleted
int: Total number of items deleted (failed notifications + retry attempts)
"""
if huey is None:
return 0
import time
deleted_counts = {
'failed_by_age': 0,
'failed_by_overflow': 0,
'retry_attempts': 0
}
try:
cutoff_time = time.time() - (max_age_days * 86400)
# Trigger cleanup of old failed notifications
# The get_failed_notifications function already handles cleanup
get_failed_notifications(limit=1000, max_age_days=max_age_days)
# Step 1: Get all failed notifications (age-based cleanup happens inside this call)
failed_before_age_cleanup = get_failed_notifications(limit=10000, max_age_days=max_age_days)
# Clean up old retry attempts using task manager
# Step 2: If we still have too many failed notifications, delete oldest ones (overflow protection)
# Only applies if max_failed_count is explicitly set
if max_failed_count is not None and len(failed_before_age_cleanup) > max_failed_count:
# Sort by timestamp (oldest first)
failed_sorted = sorted(failed_before_age_cleanup, key=lambda x: x.get('timestamp', 0))
# Delete excess (oldest notifications beyond the limit)
to_delete = failed_sorted[:len(failed_before_age_cleanup) - max_failed_count]
logger.warning(f"Dead letter queue overflow: {len(failed_before_age_cleanup)} failed notifications exceeds limit of {max_failed_count}")
logger.warning(f"Deleting {len(to_delete)} oldest failed notifications for overflow protection")
for task in to_delete:
task_id = task.get('task_id')
try:
# Delete result and metadata
_delete_result(task_id)
_delete_task_metadata(task_id)
deleted_counts['failed_by_overflow'] += 1
logger.debug(f"Deleted old failed notification {task_id[:20]}... (overflow protection)")
except Exception as e:
logger.debug(f"Error deleting failed task {task_id}: {e}")
if deleted_counts['failed_by_overflow'] > 0:
logger.info(f"Overflow protection: deleted {deleted_counts['failed_by_overflow']} oldest failed notifications")
# Step 3: Clean up old retry attempts using task manager
task_manager = _get_task_manager()
if task_manager:
deleted_count = task_manager.cleanup_old_retry_attempts(cutoff_time)
if deleted_count > 0:
logger.info(f"Cleaned up {deleted_count} old retry attempt files (older than {max_age_days} days)")
else:
deleted_count = 0
deleted_counts['retry_attempts'] = task_manager.cleanup_old_retry_attempts(cutoff_time)
if deleted_counts['retry_attempts'] > 0:
logger.info(f"Cleaned up {deleted_counts['retry_attempts']} old retry attempt files (older than {max_age_days} days)")
logger.info(f"Completed cleanup check for failed notifications older than {max_age_days} days")
return deleted_count
total_deleted = deleted_counts['failed_by_overflow'] + deleted_counts['retry_attempts']
if total_deleted > 0:
logger.info(f"Cleanup completed - "
f"failed_by_overflow: {deleted_counts['failed_by_overflow']}, "
f"retry_attempts: {deleted_counts['retry_attempts']}, "
f"total: {total_deleted}")
# Return total count for backward compatibility (tests expect int)
# Detailed breakdown is logged above
return total_deleted
except Exception as e:
logger.debug(f"Unable to cleanup old failed notifications: {e}")
logger.error(f"Error during cleanup_old_failed_notifications: {e}")
return 0
@@ -1739,8 +1804,14 @@ def start_huey_consumer():
logger.info(f"Starting Huey notification consumer (single-threaded)")
# Clean up old failed notifications on startup
cleanup_old_failed_notifications(max_age_days=30)
# Clean up old failed notifications on startup (with overflow protection)
# Age limit: 30 days, Count limit: 1000 failed notifications max
logger.info("Running startup cleanup: removing old failed notifications and enforcing limits...")
cleanup_deleted = cleanup_old_failed_notifications(max_age_days=30, max_failed_count=1000)
if cleanup_deleted > 0:
logger.info(f"Startup cleanup completed: deleted {cleanup_deleted} items")
else:
logger.info("Startup cleanup completed: no old items to delete")
try:
from huey.consumer import Consumer
@@ -2,6 +2,11 @@
FileStorage backend task manager for Huey notifications.
This is the default backend, optimized for NAS/CIFS compatibility.
INDUSTRIAL-GRADE ENHANCEMENTS:
- Atomic file writes (prevents corruption on crash/power failure)
- JSON corruption detection and recovery
- Lost-found directory for corrupted files
"""
from loguru import logger
@@ -10,6 +15,107 @@ from .base import HueyTaskManager
import os
def _atomic_json_write(filepath, data):
"""
Atomic JSON write - never leaves partial/corrupted files.
Critical for aerospace-grade reliability:
- Writes to temp file first
- Forces data to disk (fsync)
- Atomic rename (POSIX guarantees atomicity)
If process crashes mid-write, you either get:
- Old complete file (rename didn't happen)
- New complete file (rename succeeded)
Never a partial/corrupted file.
Args:
filepath: Target file path
data: Data to write (will be JSON encoded)
Returns:
True on success
Raises:
Exception on write failure
"""
import tempfile
import json
directory = os.path.dirname(filepath)
# Write to temp file in same directory (same filesystem = atomic rename)
fd, temp_path = tempfile.mkstemp(dir=directory, prefix='.tmp_', suffix='.json')
try:
with os.fdopen(fd, 'w') as f:
json.dump(data, f, indent=2)
f.flush()
os.fsync(f.fileno()) # Force to disk before rename
# Atomic rename (POSIX guarantees atomicity)
# If crash happens here, worst case is orphaned temp file
os.replace(temp_path, filepath)
return True
except Exception as e:
# Clean up temp file on error
try:
os.unlink(temp_path)
except:
pass
raise
def _safe_json_load(filepath, schema_name, storage_path):
"""
Load JSON with corruption detection and recovery.
Corrupted files are moved to lost-found for forensic analysis.
Args:
filepath: JSON file to load
schema_name: Schema type (for lost-found directory)
storage_path: Root storage path
Returns:
Parsed JSON data or None if corrupted
"""
import json
import shutil
import time
try:
with open(filepath, 'r') as f:
return json.load(f)
except (json.JSONDecodeError, EOFError) as e:
# Corrupted or incomplete JSON file
file_size = os.path.getsize(filepath) if os.path.exists(filepath) else 0
filename = os.path.basename(filepath)
logger.warning(f"Corrupted {schema_name} file: {filename} ({file_size} bytes) - moving to lost-found")
try:
lost_found_dir = os.path.join(storage_path, 'lost-found', schema_name)
os.makedirs(lost_found_dir, exist_ok=True)
timestamp = int(time.time())
lost_found_path = os.path.join(lost_found_dir, f"{filename}.{timestamp}.corrupted")
shutil.move(filepath, lost_found_path)
logger.info(f"Moved corrupted {schema_name} file to {lost_found_path}")
except Exception as move_err:
logger.error(f"Unable to move corrupted {schema_name} file: {move_err}")
return None
except Exception as e:
logger.debug(f"Unable to load {schema_name} file {filepath}: {e}")
return None
class FileStorageTaskManager(HueyTaskManager):
"""Task manager for FileStorage backend (default, NAS-safe)."""
@@ -193,8 +299,7 @@ class FileStorageTaskManager(HueyTaskManager):
return cleared
def store_task_metadata(self, task_id, metadata):
"""Store task metadata as JSON file."""
import json
"""Store task metadata as JSON file with atomic write."""
import time
if not self.storage_path:
@@ -211,17 +316,16 @@ class FileStorageTaskManager(HueyTaskManager):
**metadata
}
with open(metadata_file, 'w') as f:
json.dump(metadata_with_id, f, indent=2)
# Use atomic write to prevent corruption
_atomic_json_write(metadata_file, metadata_with_id)
return True
except Exception as e:
logger.debug(f"Unable to store task metadata: {e}")
return False
def get_task_metadata(self, task_id):
"""Retrieve task metadata from JSON file."""
import json
"""Retrieve task metadata from JSON file with corruption handling."""
if not self.storage_path:
return None
@@ -230,8 +334,9 @@ class FileStorageTaskManager(HueyTaskManager):
metadata_file = os.path.join(metadata_dir, f"{task_id}.json")
if os.path.exists(metadata_file):
with open(metadata_file, 'r') as f:
return json.load(f)
# Use safe JSON load with corruption handling
return _safe_json_load(metadata_file, 'task_metadata', self.storage_path)
except Exception as e:
logger.debug(f"Unable to load task metadata for {task_id}: {e}")