""" File-based datastore with individual watch persistence and dirty tracking. This module provides the FileSavingDataStore abstract class that implements: - Individual watch.json file persistence - Hash-based change detection (only save what changed) - Periodic audit scan (catches unmarked changes) - Background save thread with batched parallel saves - Atomic file writes safe for NFS/NAS """ import glob import hashlib import json import os import tempfile import time from concurrent.futures import ThreadPoolExecutor, as_completed from threading import Thread from loguru import logger from .base import DataStore from .. import strtobool # Try to import orjson for faster JSON serialization try: import orjson HAS_ORJSON = True except ImportError: HAS_ORJSON = False # Fsync configuration: Force file data to disk for crash safety # Default False to match legacy behavior (write-and-rename without fsync) # Set to True for mission-critical deployments requiring crash consistency FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False'))) # Save interval configuration: How often the background thread saves dirty items # Default 10 seconds - increase for less frequent saves, decrease for more frequent DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS = int(os.getenv('DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS', '10')) # Rolling audit configuration: Scans a fraction of watches each cycle # Default: Run audit every 10s, split into 5 shards # Full audit completes every 50s (10s × 5 shards) # With 56k watches: 56k / 5 = ~11k watches per cycle (~60ms vs 316ms for all) # Handles dynamic watch count - recalculates shard boundaries each cycle DATASTORE_AUDIT_INTERVAL_SECONDS = int(os.getenv('DATASTORE_AUDIT_INTERVAL_SECONDS', '10')) DATASTORE_AUDIT_SHARDS = int(os.getenv('DATASTORE_AUDIT_SHARDS', '5')) # ============================================================================ # Helper Functions for Atomic File Operations # ============================================================================ def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10): """ Save JSON data to disk using atomic write pattern. Generic helper for saving any JSON data (settings, watches, etc.) with: - Atomic write (temp file + rename) - Directory fsync for crash consistency (only for new files) - Size validation - Proper error handling Args: file_path: Full path to target JSON file data_dict: Dictionary to serialize label: Human-readable label for error messages (e.g., "watch", "settings") max_size_mb: Maximum allowed file size in MB Raises: ValueError: If serialized data exceeds max_size_mb OSError: If disk is full (ENOSPC) or other I/O error """ # Check if file already exists (before we start writing) # Directory fsync only needed for NEW files to persist the filename file_exists = os.path.exists(file_path) # Ensure parent directory exists parent_dir = os.path.dirname(file_path) os.makedirs(parent_dir, exist_ok=True) # Create temp file in same directory (required for NFS atomicity) fd, temp_path = tempfile.mkstemp( suffix='.tmp', prefix='json-', dir=parent_dir, text=False ) fd_closed = False try: # Serialize data t0 = time.time() if HAS_ORJSON: data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2) else: data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8') serialize_ms = (time.time() - t0) * 1000 # Safety check: validate size MAX_SIZE = max_size_mb * 1024 * 1024 data_size = len(data) if data_size > MAX_SIZE: raise ValueError( f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB " f"(max: {max_size_mb}MB). This indicates a bug or data corruption." ) # Write to temp file t1 = time.time() os.write(fd, data) write_ms = (time.time() - t1) * 1000 # Optional fsync: Force file data to disk for crash safety # Only if FORCE_FSYNC_DATA_IS_CRITICAL=True (default: False, matches legacy behavior) t2 = time.time() if FORCE_FSYNC_DATA_IS_CRITICAL: os.fsync(fd) file_fsync_ms = (time.time() - t2) * 1000 os.close(fd) fd_closed = True # Atomic rename t3 = time.time() os.replace(temp_path, file_path) rename_ms = (time.time() - t3) * 1000 # Sync directory to ensure filename metadata is durable # OPTIMIZATION: Only needed for NEW files. Existing files already have # directory entry persisted, so we only need file fsync for data durability. dir_fsync_ms = 0 if not file_exists: try: dir_fd = os.open(parent_dir, os.O_RDONLY) try: t4 = time.time() os.fsync(dir_fd) dir_fsync_ms = (time.time() - t4) * 1000 finally: os.close(dir_fd) except (OSError, AttributeError): # Windows doesn't support fsync on directories pass # Log timing breakdown for slow saves # total_ms = serialize_ms + write_ms + file_fsync_ms + rename_ms + dir_fsync_ms # if total_ms: # Log if save took more than 10ms # file_status = "new" if not file_exists else "update" # logger.trace( # f"Save timing breakdown ({total_ms:.1f}ms total, {file_status}): " # f"serialize={serialize_ms:.1f}ms, write={write_ms:.1f}ms, " # f"file_fsync={file_fsync_ms:.1f}ms, rename={rename_ms:.1f}ms, " # f"dir_fsync={dir_fsync_ms:.1f}ms, using_orjson={HAS_ORJSON}" # ) except OSError as e: # Cleanup temp file if not fd_closed: try: os.close(fd) except: pass if os.path.exists(temp_path): try: os.unlink(temp_path) except: pass # Provide helpful error messages if e.errno == 28: # ENOSPC raise OSError(f"Disk full: Cannot save {label}") from e elif e.errno == 122: # EDQUOT raise OSError(f"Disk quota exceeded: Cannot save {label}") from e else: raise OSError(f"I/O error saving {label}: {e}") from e except Exception as e: # Cleanup temp file if not fd_closed: try: os.close(fd) except: pass if os.path.exists(temp_path): try: os.unlink(temp_path) except: pass raise e def save_watch_atomic(watch_dir, uuid, watch_dict): """ Save a watch to disk using atomic write pattern. Convenience wrapper around save_json_atomic for watches. Args: watch_dir: Directory for this watch (e.g., /datastore/{uuid}) uuid: Watch UUID (for logging) watch_dict: Dictionary representation of the watch Raises: ValueError: If serialized data exceeds 10MB (indicates bug or corruption) OSError: If disk is full (ENOSPC) or other I/O error """ watch_json = os.path.join(watch_dir, "watch.json") save_json_atomic(watch_json, watch_dict, label=f"watch {uuid}", max_size_mb=10) def load_watch_from_file(watch_json, uuid, rehydrate_entity_func): """ Load a watch from its JSON file. Args: watch_json: Path to the watch.json file uuid: Watch UUID rehydrate_entity_func: Function to convert dict to Watch object Returns: Tuple of (Watch object, raw_data_dict) or (None, None) if failed The raw_data_dict is needed to compute the hash before rehydration """ try: # Check file size before reading file_size = os.path.getsize(watch_json) MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB if file_size > MAX_WATCH_SIZE: logger.critical( f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: " f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). " f"File: {watch_json}. This indicates a bug or data corruption. " f"Watch will be skipped." ) return None, None if HAS_ORJSON: with open(watch_json, 'rb') as f: watch_data = orjson.loads(f.read()) else: with open(watch_json, 'r', encoding='utf-8') as f: watch_data = json.load(f) if watch_data.get('time_schedule_limit'): del watch_data['time_schedule_limit'] if watch_data.get('time_between_check'): del watch_data['time_between_check'] # Return both the raw data and the rehydrated watch # Raw data is needed to compute hash before rehydration changes anything watch_obj = rehydrate_entity_func(uuid, watch_data) return watch_obj, watch_data except json.JSONDecodeError as e: logger.critical( f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. " f"File: {watch_json}. Error: {e}. " f"Watch will be skipped and may need manual recovery from backup." ) return None, None except ValueError as e: # orjson raises ValueError for invalid JSON if "invalid json" in str(e).lower() or HAS_ORJSON: logger.critical( f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. " f"File: {watch_json}. Error: {e}. " f"Watch will be skipped and may need manual recovery from backup." ) return None, None # Re-raise if it's not a JSON parsing error raise except FileNotFoundError: logger.error(f"Watch file not found: {watch_json} for watch {uuid}") return None, None except Exception as e: logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}") return None, None def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func): """ Load all watches from individual watch.json files. SYNCHRONOUS loading: Blocks until all watches are loaded. This ensures data consistency - web server won't accept requests until all watches are available. Progress logged every 100 watches. Args: datastore_path: Path to the datastore directory rehydrate_entity_func: Function to convert dict to Watch object compute_hash_func: Function to compute hash from raw watch dict Returns: Tuple of (watching_dict, hashes_dict) - watching_dict: uuid -> Watch object - hashes_dict: uuid -> hash string (computed from raw data) """ start_time = time.time() logger.info("Loading watches from individual watch.json files...") watching = {} watch_hashes = {} if not os.path.exists(datastore_path): return watching, watch_hashes # Find all watch.json files using glob (faster than manual directory traversal) glob_start = time.time() watch_files = glob.glob(os.path.join(datastore_path, "*", "watch.json")) glob_time = time.time() - glob_start total = len(watch_files) logger.debug(f"Found {total} watch.json files in {glob_time:.3f}s") loaded = 0 failed = 0 for watch_json in watch_files: # Extract UUID from path: /datastore/{uuid}/watch.json uuid_dir = os.path.basename(os.path.dirname(watch_json)) watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func) if watch and raw_data: watching[uuid_dir] = watch # Compute hash from rehydrated Watch object (as dict) to match how we compute on save # This ensures hash matches what audit will compute from dict(watch) watch_hashes[uuid_dir] = compute_hash_func(dict(watch)) loaded += 1 if loaded % 100 == 0: logger.info(f"Loaded {loaded}/{total} watches...") else: # load_watch_from_file already logged the specific error failed += 1 elapsed = time.time() - start_time if failed > 0: logger.critical( f"LOAD COMPLETE: {loaded} watches loaded successfully, " f"{failed} watches FAILED to load (corrupted or invalid) " f"in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)" ) else: logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)") return watching, watch_hashes # ============================================================================ # FileSavingDataStore Class # ============================================================================ class FileSavingDataStore(DataStore): """ Abstract datastore that provides file persistence with change tracking. Features: - Individual watch.json files (one per watch) - Dirty tracking: Only saves items that have changed - Hash-based change detection: Prevents unnecessary writes - Background save thread: Non-blocking persistence - Two-tier urgency: Standard (60s) and urgent (immediate) saves Subclasses must implement: - rehydrate_entity(): Convert dict to Watch object - Access to internal __data structure for watch management """ needs_write = False needs_write_urgent = False stop_thread = False # Change tracking _dirty_watches = set() # Watch UUIDs that need saving _dirty_settings = False # Settings changed _watch_hashes = {} # UUID -> SHA256 hash for change detection # Health monitoring _last_save_time = 0 # Timestamp of last successful save _last_audit_time = 0 # Timestamp of last audit scan _save_cycle_count = 0 # Number of save cycles completed _total_saves = 0 # Total watches saved (lifetime) _save_errors = 0 # Total save errors (lifetime) _audit_count = 0 # Number of audit scans completed _audit_found_changes = 0 # Total unmarked changes found by audits _audit_shard_index = 0 # Current shard being audited (rolling audit) def __init__(self): super().__init__() self.save_data_thread = None self._last_save_time = time.time() self._last_audit_time = time.time() def mark_watch_dirty(self, uuid): """ Mark a watch as needing save. Args: uuid: Watch UUID """ with self.lock: self._dirty_watches.add(uuid) dirty_count = len(self._dirty_watches) # Backpressure detection - warn if dirty set grows too large if dirty_count > 1000: logger.critical( f"BACKPRESSURE WARNING: {dirty_count} watches pending save! " f"Save thread may not be keeping up with write rate. " f"This could indicate disk I/O bottleneck or save thread failure." ) elif dirty_count > 500: logger.warning( f"Dirty watch count high: {dirty_count} watches pending save. " f"Monitoring for potential backpressure." ) self.needs_write = True def mark_settings_dirty(self): """Mark settings as needing save.""" with self.lock: self._dirty_settings = True self.needs_write = True def _compute_hash(self, watch_dict): """ Compute SHA256 hash of watch for change detection. Args: watch_dict: Dictionary representation of watch Returns: Hex string of SHA256 hash """ # Use orjson for deterministic serialization if available if HAS_ORJSON: json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS) else: json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False) json_bytes = json_str.encode('utf-8') return hashlib.sha256(json_bytes).hexdigest() def save_watch(self, uuid, force=False, watch_dict=None, current_hash=None): """ Save a single watch if it has changed (polymorphic method). Args: uuid: Watch UUID force: If True, skip hash check and save anyway watch_dict: Pre-computed watch dictionary (optimization) current_hash: Pre-computed hash (optimization) Returns: True if saved, False if skipped (unchanged) """ if not self._watch_exists(uuid): logger.warning(f"Cannot save watch {uuid} - does not exist") return False # Get watch dict if not provided if watch_dict is None: watch_dict = self._get_watch_dict(uuid) # Compute hash if not provided if current_hash is None: current_hash = self._compute_hash(watch_dict) # Skip save if unchanged (unless forced) if not force and current_hash == self._watch_hashes.get(uuid): return False try: self._save_watch(uuid, watch_dict) self._watch_hashes[uuid] = current_hash logger.debug(f"Saved watch {uuid}") return True except Exception as e: logger.error(f"Failed to save watch {uuid}: {e}") raise def _save_watch(self, uuid, watch_dict): """ Save a single watch to storage (polymorphic). Backend-specific implementation. Subclasses override for different storage: - File backend: Writes to {uuid}/watch.json - Redis backend: SET watch:{uuid} - SQL backend: UPDATE watches WHERE uuid=? Args: uuid: Watch UUID watch_dict: Dictionary representation of watch """ # Default file implementation watch_dir = os.path.join(self.datastore_path, uuid) save_watch_atomic(watch_dir, uuid, watch_dict) def _save_settings(self): """ Save settings to storage (polymorphic). Subclasses must implement for their backend. - File: changedetection.json - Redis: SET settings - SQL: UPDATE settings table """ raise NotImplementedError("Subclass must implement _save_settings") def _load_watches(self): """ Load all watches from storage (polymorphic). Subclasses must implement for their backend. - File: Read individual watch.json files - Redis: SCAN watch:* keys - SQL: SELECT * FROM watches """ raise NotImplementedError("Subclass must implement _load_watches") def _delete_watch(self, uuid): """ Delete a watch from storage (polymorphic). Subclasses must implement for their backend. - File: Delete {uuid}/ directory recursively - Redis: DEL watch:{uuid} - SQL: DELETE FROM watches WHERE uuid=? Args: uuid: Watch UUID to delete """ raise NotImplementedError("Subclass must implement _delete_watch") def _save_dirty_items(self): """ Save dirty watches and settings. This is the core optimization: instead of saving the entire datastore, we only save watches that were marked dirty and settings if changed. """ start_time = time.time() # Capture dirty sets under lock with self.lock: dirty_watches = list(self._dirty_watches) dirty_settings = self._dirty_settings self._dirty_watches.clear() self._dirty_settings = False if not dirty_watches and not dirty_settings: return logger.trace(f"Saving {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}") # Save each dirty watch using the polymorphic save method saved_count = 0 error_count = 0 skipped_unchanged = 0 # Process in batches of 50, using thread pool for parallel saves BATCH_SIZE = 50 MAX_WORKERS = 20 # Number of parallel save threads def save_single_watch(uuid): """Helper function for thread pool execution.""" try: # Check if watch still exists (might have been deleted) if not self._watch_exists(uuid): # Watch was deleted, remove hash self._watch_hashes.pop(uuid, None) return {'status': 'deleted', 'uuid': uuid} # Pre-check hash to avoid unnecessary save_watch() calls watch_dict = self._get_watch_dict(uuid) current_hash = self._compute_hash(watch_dict) if current_hash == self._watch_hashes.get(uuid): # Watch hasn't actually changed, skip return {'status': 'unchanged', 'uuid': uuid} # Pass pre-computed values to avoid redundant serialization/hashing if self.save_watch(uuid, force=True, watch_dict=watch_dict, current_hash=current_hash): return {'status': 'saved', 'uuid': uuid} else: return {'status': 'skipped', 'uuid': uuid} except Exception as e: logger.error(f"Error saving watch {uuid}: {e}") return {'status': 'error', 'uuid': uuid, 'error': e} # Process dirty watches in batches for batch_start in range(0, len(dirty_watches), BATCH_SIZE): batch = dirty_watches[batch_start:batch_start + BATCH_SIZE] batch_num = (batch_start // BATCH_SIZE) + 1 total_batches = (len(dirty_watches) + BATCH_SIZE - 1) // BATCH_SIZE if len(dirty_watches) > BATCH_SIZE: logger.trace(f"Save batch {batch_num}/{total_batches} ({len(batch)} watches)") # Use thread pool to save watches in parallel with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: # Submit all save tasks future_to_uuid = {executor.submit(save_single_watch, uuid): uuid for uuid in batch} # Collect results as they complete for future in as_completed(future_to_uuid): result = future.result() status = result['status'] if status == 'saved': saved_count += 1 elif status == 'unchanged': skipped_unchanged += 1 elif status == 'error': error_count += 1 # Re-mark for retry with self.lock: self._dirty_watches.add(result['uuid']) # 'deleted' and 'skipped' don't need special handling # Save settings if changed if dirty_settings: try: self._save_settings() logger.debug("Saved settings") except Exception as e: logger.error(f"Failed to save settings: {e}") error_count += 1 with self.lock: self._dirty_settings = True # Update metrics elapsed = time.time() - start_time self._save_cycle_count += 1 self._total_saves += saved_count self._save_errors += error_count self._last_save_time = time.time() # Log performance metrics if saved_count > 0: avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else "" parallel_msg = f" [parallel: {MAX_WORKERS} workers]" if saved_count > 1 else "" logger.info( f"Successfully saved {saved_count} watches in {elapsed:.2f}s " f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}){parallel_msg}. " f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)" ) elif skipped_unchanged > 0: logger.debug(f"Save cycle: {skipped_unchanged} watches verified unchanged (hash match), nothing saved") if error_count > 0: logger.error(f"Save cycle completed with {error_count} errors") self.needs_write = False self.needs_write_urgent = False def _watch_exists(self, uuid): """ Check if watch exists. Subclass must implement. Args: uuid: Watch UUID Returns: bool """ raise NotImplementedError("Subclass must implement _watch_exists") def _get_watch_dict(self, uuid): """ Get watch as dictionary. Subclass must implement. Args: uuid: Watch UUID Returns: Dictionary representation of watch """ raise NotImplementedError("Subclass must implement _get_watch_dict") def _audit_all_watches(self): """ Rolling audit: Scans a fraction of watches to detect unmarked changes. Instead of scanning ALL watches at once, this scans 1/N shards per cycle. The shard rotates each cycle, completing a full audit every N cycles. Handles dynamic watch count - recalculates shard boundaries each cycle, so newly added watches will be audited in subsequent cycles. Benefits: - Lower CPU per cycle (56k / 5 = ~11k watches vs all 56k) - More frequent audits overall (every 50s vs every 10s) - Spreads load evenly across time """ audit_start = time.time() # Get list of all watch UUIDs (read-only, no lock needed) try: all_uuids = list(self.data['watching'].keys()) except (KeyError, AttributeError, RuntimeError): # Data structure not ready or being modified return if not all_uuids: return total_watches = len(all_uuids) # Calculate this cycle's shard boundaries # Example: 56,278 watches / 5 shards = 11,255 watches per shard # Shard 0: [0:11255], Shard 1: [11255:22510], etc. shard_size = (total_watches + DATASTORE_AUDIT_SHARDS - 1) // DATASTORE_AUDIT_SHARDS start_idx = self._audit_shard_index * shard_size end_idx = min(start_idx + shard_size, total_watches) # Handle wrap-around (shouldn't happen normally, but defensive) if start_idx >= total_watches: self._audit_shard_index = 0 start_idx = 0 end_idx = min(shard_size, total_watches) # Audit only this shard's watches shard_uuids = all_uuids[start_idx:end_idx] changes_found = 0 errors = 0 for uuid in shard_uuids: try: # Get current watch dict and compute hash watch_dict = self._get_watch_dict(uuid) current_hash = self._compute_hash(watch_dict) stored_hash = self._watch_hashes.get(uuid) # If hash changed and not already marked dirty, mark it if current_hash != stored_hash: with self.lock: if uuid not in self._dirty_watches: self._dirty_watches.add(uuid) changes_found += 1 logger.warning( f"Audit detected unmarked change in watch {uuid[:8]}... current {current_hash:8} stored hash {stored_hash[:8]}" f"(hash changed but not marked dirty)" ) self.needs_write = True except Exception as e: errors += 1 logger.trace(f"Audit error for watch {uuid[:8]}...: {e}") audit_elapsed = (time.time() - audit_start) * 1000 # milliseconds # Advance to next shard (wrap around after last shard) self._audit_shard_index = (self._audit_shard_index + 1) % DATASTORE_AUDIT_SHARDS # Update metrics self._audit_count += 1 self._audit_found_changes += changes_found self._last_audit_time = time.time() if changes_found > 0: logger.warning( f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS} found {changes_found} " f"unmarked changes in {len(shard_uuids)}/{total_watches} watches ({audit_elapsed:.1f}ms)" ) else: logger.trace( f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS}: " f"{len(shard_uuids)}/{total_watches} watches checked, 0 changes ({audit_elapsed:.1f}ms)" ) def save_datastore(self): """ Background thread that periodically saves dirty items and audits watches. Runs two independent cycles: 1. Save dirty items every DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS (default 10s) 2. Rolling audit: every DATASTORE_AUDIT_INTERVAL_SECONDS (default 10s) - Scans 1/DATASTORE_AUDIT_SHARDS watches per cycle (default 1/5) - Full audit completes every 50s (10s × 5 shards) - Automatically handles new/deleted watches Uses 0.5s sleep intervals for responsiveness to urgent saves. """ while True: if self.stop_thread: # Graceful shutdown: flush any remaining dirty items before stopping if self.needs_write or self._dirty_watches or self._dirty_settings: logger.warning("Datastore save thread stopping - flushing remaining dirty items...") try: self._save_dirty_items() logger.info("Graceful shutdown complete - all data saved") except Exception as e: logger.critical(f"FAILED to save dirty items during shutdown: {e}") else: logger.info("Datastore save thread stopping - no dirty items") return # Check if it's time to run audit scan (every N seconds) if time.time() - self._last_audit_time >= DATASTORE_AUDIT_INTERVAL_SECONDS: try: self._audit_all_watches() except Exception as e: logger.error(f"Error in audit cycle: {e}") # Save dirty items if needed if self.needs_write or self.needs_write_urgent: try: self._save_dirty_items() except Exception as e: logger.error(f"Error in save cycle: {e}") # Timer with early break for urgent saves # Each iteration is 0.5 seconds, so iterations = DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2 for i in range(DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2): time.sleep(0.5) if self.stop_thread or self.needs_write_urgent: break def start_save_thread(self): """Start the background save thread.""" if not self.save_data_thread or not self.save_data_thread.is_alive(): self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver") self.save_data_thread.start() logger.info("Datastore save thread started") def force_save_all(self): """ Force immediate synchronous save of all changes to storage. File backend implementation of the abstract force_save_all() method. Marks all watches and settings as dirty, then saves immediately. Used by: - Backup creation (ensure everything is saved before backup) - Shutdown (ensure all changes are persisted) - Manual save operations """ logger.info("Force saving all data to storage...") # Mark everything as dirty to ensure complete save for uuid in self.data['watching'].keys(): self.mark_watch_dirty(uuid) self.mark_settings_dirty() # Save immediately (synchronous) self._save_dirty_items() logger.success("All data saved to storage") def get_health_status(self): """ Get datastore health status for monitoring. Returns: dict with health metrics and status """ now = time.time() time_since_last_save = now - self._last_save_time with self.lock: dirty_count = len(self._dirty_watches) is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive() # Determine health status if not is_thread_alive: status = "CRITICAL" message = "Save thread is DEAD" elif time_since_last_save > 300: # 5 minutes status = "WARNING" message = f"No save activity for {time_since_last_save:.0f}s" elif dirty_count > 1000: status = "WARNING" message = f"High backpressure: {dirty_count} watches pending" elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01: status = "WARNING" message = f"High error rate: {self._save_errors} errors" else: status = "HEALTHY" message = "Operating normally" return { "status": status, "message": message, "thread_alive": is_thread_alive, "dirty_watches": dirty_count, "dirty_settings": self._dirty_settings, "last_save_seconds_ago": int(time_since_last_save), "save_cycles": self._save_cycle_count, "total_saves": self._total_saves, "total_errors": self._save_errors, "error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2) }