mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-04-03 09:38:04 +00:00
510 lines
17 KiB
Python
510 lines
17 KiB
Python
"""
|
|
File-based datastore with individual watch persistence and immediate commits.
|
|
|
|
This module provides the FileSavingDataStore abstract class that implements:
|
|
- Individual watch.json file persistence
|
|
- Immediate commit-based persistence (watch.commit(), datastore.commit())
|
|
- Atomic file writes safe for NFS/NAS
|
|
"""
|
|
|
|
import glob
|
|
import json
|
|
import os
|
|
import tempfile
|
|
import time
|
|
from loguru import logger
|
|
|
|
from .base import DataStore
|
|
from .. import strtobool
|
|
|
|
# Try to import orjson for faster JSON serialization
|
|
try:
|
|
import orjson
|
|
HAS_ORJSON = True
|
|
except ImportError:
|
|
HAS_ORJSON = False
|
|
|
|
# Fsync configuration: Force file data to disk for crash safety
|
|
# Default False to match legacy behavior (write-and-rename without fsync)
|
|
# Set to True for mission-critical deployments requiring crash consistency
|
|
FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False')))
|
|
|
|
# ============================================================================
|
|
# Helper Functions for Atomic File Operations
|
|
# ============================================================================
|
|
|
|
def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10):
|
|
"""
|
|
Save JSON data to disk using atomic write pattern.
|
|
|
|
Generic helper for saving any JSON data (settings, watches, etc.) with:
|
|
- Atomic write (temp file + rename)
|
|
- Directory fsync for crash consistency (only for new files)
|
|
- Size validation
|
|
- Proper error handling
|
|
|
|
Thread safety: Caller must hold datastore.lock to prevent concurrent modifications.
|
|
Multi-process safety: Not supported - run only one app instance per datastore.
|
|
|
|
Args:
|
|
file_path: Full path to target JSON file
|
|
data_dict: Dictionary to serialize
|
|
label: Human-readable label for error messages (e.g., "watch", "settings")
|
|
max_size_mb: Maximum allowed file size in MB
|
|
|
|
Raises:
|
|
ValueError: If serialized data exceeds max_size_mb
|
|
OSError: If disk is full (ENOSPC) or other I/O error
|
|
"""
|
|
# Check if file already exists (before we start writing)
|
|
# Directory fsync only needed for NEW files to persist the filename
|
|
file_exists = os.path.exists(file_path)
|
|
|
|
# Ensure parent directory exists
|
|
parent_dir = os.path.dirname(file_path)
|
|
os.makedirs(parent_dir, exist_ok=True)
|
|
|
|
# Create temp file in same directory (required for NFS atomicity)
|
|
fd, temp_path = tempfile.mkstemp(
|
|
suffix='.tmp',
|
|
prefix='json-',
|
|
dir=parent_dir,
|
|
text=False
|
|
)
|
|
|
|
fd_closed = False
|
|
try:
|
|
# Serialize data
|
|
t0 = time.time()
|
|
if HAS_ORJSON:
|
|
data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2)
|
|
else:
|
|
data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8')
|
|
serialize_ms = (time.time() - t0) * 1000
|
|
|
|
# Safety check: validate size
|
|
MAX_SIZE = max_size_mb * 1024 * 1024
|
|
data_size = len(data)
|
|
if data_size > MAX_SIZE:
|
|
raise ValueError(
|
|
f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB "
|
|
f"(max: {max_size_mb}MB). This indicates a bug or data corruption."
|
|
)
|
|
|
|
# Write to temp file
|
|
t1 = time.time()
|
|
os.write(fd, data)
|
|
write_ms = (time.time() - t1) * 1000
|
|
|
|
# Optional fsync: Force file data to disk for crash safety
|
|
# Only if FORCE_FSYNC_DATA_IS_CRITICAL=True (default: False, matches legacy behavior)
|
|
t2 = time.time()
|
|
if FORCE_FSYNC_DATA_IS_CRITICAL:
|
|
os.fsync(fd)
|
|
file_fsync_ms = (time.time() - t2) * 1000
|
|
|
|
os.close(fd)
|
|
fd_closed = True
|
|
|
|
# Atomic rename
|
|
t3 = time.time()
|
|
os.replace(temp_path, file_path)
|
|
rename_ms = (time.time() - t3) * 1000
|
|
|
|
# Sync directory to ensure filename metadata is durable
|
|
# OPTIMIZATION: Only needed for NEW files. Existing files already have
|
|
# directory entry persisted, so we only need file fsync for data durability.
|
|
dir_fsync_ms = 0
|
|
if not file_exists:
|
|
try:
|
|
dir_fd = os.open(parent_dir, os.O_RDONLY)
|
|
try:
|
|
t4 = time.time()
|
|
os.fsync(dir_fd)
|
|
dir_fsync_ms = (time.time() - t4) * 1000
|
|
finally:
|
|
os.close(dir_fd)
|
|
except (OSError, AttributeError):
|
|
# Windows doesn't support fsync on directories
|
|
pass
|
|
|
|
# Log timing breakdown for slow saves
|
|
# total_ms = serialize_ms + write_ms + file_fsync_ms + rename_ms + dir_fsync_ms
|
|
# if total_ms: # Log if save took more than 10ms
|
|
# file_status = "new" if not file_exists else "update"
|
|
# logger.trace(
|
|
# f"Save timing breakdown ({total_ms:.1f}ms total, {file_status}): "
|
|
# f"serialize={serialize_ms:.1f}ms, write={write_ms:.1f}ms, "
|
|
# f"file_fsync={file_fsync_ms:.1f}ms, rename={rename_ms:.1f}ms, "
|
|
# f"dir_fsync={dir_fsync_ms:.1f}ms, using_orjson={HAS_ORJSON}"
|
|
# )
|
|
|
|
except OSError as e:
|
|
# Cleanup temp file
|
|
if not fd_closed:
|
|
try:
|
|
os.close(fd)
|
|
except:
|
|
pass
|
|
if os.path.exists(temp_path):
|
|
try:
|
|
os.unlink(temp_path)
|
|
except:
|
|
pass
|
|
|
|
# Provide helpful error messages
|
|
if e.errno == 28: # ENOSPC
|
|
raise OSError(f"Disk full: Cannot save {label}") from e
|
|
elif e.errno == 122: # EDQUOT
|
|
raise OSError(f"Disk quota exceeded: Cannot save {label}") from e
|
|
else:
|
|
raise OSError(f"I/O error saving {label}: {e}") from e
|
|
|
|
except Exception as e:
|
|
# Cleanup temp file
|
|
if not fd_closed:
|
|
try:
|
|
os.close(fd)
|
|
except:
|
|
pass
|
|
if os.path.exists(temp_path):
|
|
try:
|
|
os.unlink(temp_path)
|
|
except:
|
|
pass
|
|
raise e
|
|
|
|
|
|
def save_entity_atomic(entity_dir, uuid, entity_dict, filename, entity_type, max_size_mb):
|
|
"""
|
|
Save an entity (watch/tag) to disk using atomic write pattern.
|
|
|
|
Generic function for saving any watch_base subclass (Watch, Tag, etc.).
|
|
|
|
Args:
|
|
entity_dir: Directory for this entity (e.g., /datastore/{uuid})
|
|
uuid: Entity UUID (for logging)
|
|
entity_dict: Dictionary representation of the entity
|
|
filename: JSON filename (e.g., 'watch.json', 'tag.json')
|
|
entity_type: Type label for logging (e.g., 'watch', 'tag')
|
|
max_size_mb: Maximum allowed file size in MB
|
|
|
|
Raises:
|
|
ValueError: If serialized data exceeds max_size_mb
|
|
OSError: If disk is full (ENOSPC) or other I/O error
|
|
"""
|
|
entity_json = os.path.join(entity_dir, filename)
|
|
save_json_atomic(entity_json, entity_dict, label=f"{entity_type} {uuid}", max_size_mb=max_size_mb)
|
|
|
|
|
|
def save_watch_atomic(watch_dir, uuid, watch_dict):
|
|
"""
|
|
Save a watch to disk using atomic write pattern.
|
|
|
|
Convenience wrapper around save_entity_atomic for watches.
|
|
Kept for backwards compatibility.
|
|
"""
|
|
save_entity_atomic(watch_dir, uuid, watch_dict, "watch.json", "watch", max_size_mb=10)
|
|
|
|
|
|
|
|
def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
|
|
"""
|
|
Load a watch from its JSON file.
|
|
|
|
Args:
|
|
watch_json: Path to the watch.json file
|
|
uuid: Watch UUID
|
|
rehydrate_entity_func: Function to convert dict to Watch object
|
|
|
|
Returns:
|
|
Watch object or None if failed
|
|
"""
|
|
try:
|
|
# Check file size before reading
|
|
file_size = os.path.getsize(watch_json)
|
|
MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB
|
|
if file_size > MAX_WATCH_SIZE:
|
|
logger.critical(
|
|
f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: "
|
|
f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). "
|
|
f"File: {watch_json}. This indicates a bug or data corruption. "
|
|
f"Watch will be skipped."
|
|
)
|
|
return None
|
|
|
|
if HAS_ORJSON:
|
|
with open(watch_json, 'rb') as f:
|
|
watch_data = orjson.loads(f.read())
|
|
else:
|
|
with open(watch_json, 'r', encoding='utf-8') as f:
|
|
watch_data = json.load(f)
|
|
|
|
# Rehydrate and return watch object
|
|
watch_obj = rehydrate_entity_func(uuid, watch_data)
|
|
return watch_obj
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.critical(
|
|
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
|
|
f"File: {watch_json}. Error: {e}. "
|
|
f"Watch will be skipped and may need manual recovery from backup."
|
|
)
|
|
return None
|
|
except ValueError as e:
|
|
# orjson raises ValueError for invalid JSON
|
|
if "invalid json" in str(e).lower() or HAS_ORJSON:
|
|
logger.critical(
|
|
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
|
|
f"File: {watch_json}. Error: {e}. "
|
|
f"Watch will be skipped and may need manual recovery from backup."
|
|
)
|
|
return None
|
|
# Re-raise if it's not a JSON parsing error
|
|
raise
|
|
except FileNotFoundError:
|
|
logger.error(f"Watch file not found: {watch_json} for watch {uuid}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}")
|
|
return None
|
|
|
|
|
|
def load_all_watches(datastore_path, rehydrate_entity_func):
|
|
"""
|
|
Load all watches from individual watch.json files.
|
|
|
|
SYNCHRONOUS loading: Blocks until all watches are loaded.
|
|
This ensures data consistency - web server won't accept requests
|
|
until all watches are available. Progress logged every 100 watches.
|
|
|
|
Args:
|
|
datastore_path: Path to the datastore directory
|
|
rehydrate_entity_func: Function to convert dict to Watch object
|
|
|
|
Returns:
|
|
Dictionary of uuid -> Watch object
|
|
"""
|
|
start_time = time.time()
|
|
logger.info("Loading watches from individual watch.json files...")
|
|
|
|
watching = {}
|
|
|
|
if not os.path.exists(datastore_path):
|
|
return watching
|
|
|
|
# Find all watch.json files using glob (faster than manual directory traversal)
|
|
glob_start = time.time()
|
|
watch_files = glob.glob(os.path.join(datastore_path, "*", "watch.json"))
|
|
glob_time = time.time() - glob_start
|
|
|
|
total = len(watch_files)
|
|
logger.debug(f"Found {total} watch.json files in {glob_time:.3f}s")
|
|
|
|
loaded = 0
|
|
failed = 0
|
|
|
|
for watch_json in watch_files:
|
|
# Extract UUID from path: /datastore/{uuid}/watch.json
|
|
uuid_dir = os.path.basename(os.path.dirname(watch_json))
|
|
watch = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
|
|
if watch:
|
|
watching[uuid_dir] = watch
|
|
loaded += 1
|
|
|
|
if loaded % 100 == 0:
|
|
logger.info(f"Loaded {loaded}/{total} watches...")
|
|
else:
|
|
# load_watch_from_file already logged the specific error
|
|
failed += 1
|
|
|
|
elapsed = time.time() - start_time
|
|
|
|
if failed > 0:
|
|
logger.critical(
|
|
f"LOAD COMPLETE: {loaded} watches loaded successfully, "
|
|
f"{failed} watches FAILED to load (corrupted or invalid) "
|
|
f"in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)"
|
|
)
|
|
else:
|
|
logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)")
|
|
|
|
return watching
|
|
|
|
|
|
def load_tag_from_file(tag_json, uuid, rehydrate_entity_func):
|
|
"""
|
|
Load a tag from its JSON file.
|
|
|
|
Args:
|
|
tag_json: Path to the tag.json file
|
|
uuid: Tag UUID
|
|
rehydrate_entity_func: Function to convert dict to Tag object
|
|
|
|
Returns:
|
|
Tag object or None if failed
|
|
"""
|
|
try:
|
|
# Check file size before reading
|
|
file_size = os.path.getsize(tag_json)
|
|
MAX_TAG_SIZE = 1 * 1024 * 1024 # 1MB
|
|
if file_size > MAX_TAG_SIZE:
|
|
logger.critical(
|
|
f"CORRUPTED TAG DATA: Tag {uuid} file is unexpectedly large: "
|
|
f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_TAG_SIZE / 1024 / 1024}MB). "
|
|
f"File: {tag_json}. This indicates a bug or data corruption. "
|
|
f"Tag will be skipped."
|
|
)
|
|
return None
|
|
|
|
if HAS_ORJSON:
|
|
with open(tag_json, 'rb') as f:
|
|
tag_data = orjson.loads(f.read())
|
|
else:
|
|
with open(tag_json, 'r', encoding='utf-8') as f:
|
|
tag_data = json.load(f)
|
|
|
|
tag_data['processor'] = 'restock_diff'
|
|
# Rehydrate tag (convert dict to Tag object)
|
|
# processor_override is set inside the rehydration function
|
|
tag_obj = rehydrate_entity_func(uuid, tag_data)
|
|
return tag_obj
|
|
|
|
except json.JSONDecodeError as e:
|
|
logger.critical(
|
|
f"CORRUPTED TAG DATA: Failed to parse JSON for tag {uuid}. "
|
|
f"File: {tag_json}. Error: {e}. "
|
|
f"Tag will be skipped and may need manual recovery from backup."
|
|
)
|
|
return None
|
|
except ValueError as e:
|
|
# orjson raises ValueError for invalid JSON
|
|
if "invalid json" in str(e).lower() or HAS_ORJSON:
|
|
logger.critical(
|
|
f"CORRUPTED TAG DATA: Failed to parse JSON for tag {uuid}. "
|
|
f"File: {tag_json}. Error: {e}. "
|
|
f"Tag will be skipped and may need manual recovery from backup."
|
|
)
|
|
return None
|
|
# Re-raise if it's not a JSON parsing error
|
|
raise
|
|
except FileNotFoundError:
|
|
logger.debug(f"Tag file not found: {tag_json} for tag {uuid}")
|
|
return None
|
|
except Exception as e:
|
|
logger.error(f"Failed to load tag {uuid} from {tag_json}: {e}")
|
|
return None
|
|
|
|
|
|
def load_all_tags(datastore_path, rehydrate_entity_func):
|
|
"""
|
|
Load all tags from individual tag.json files.
|
|
|
|
Tags are stored separately from settings in {uuid}/tag.json files.
|
|
|
|
Args:
|
|
datastore_path: Path to the datastore directory
|
|
rehydrate_entity_func: Function to convert dict to Tag object
|
|
|
|
Returns:
|
|
Dictionary of uuid -> Tag object
|
|
"""
|
|
logger.info("Loading tags from individual tag.json files...")
|
|
|
|
tags = {}
|
|
|
|
if not os.path.exists(datastore_path):
|
|
return tags
|
|
|
|
# Find all tag.json files using glob
|
|
tag_files = glob.glob(os.path.join(datastore_path, "*", "tag.json"))
|
|
|
|
total = len(tag_files)
|
|
if total == 0:
|
|
logger.debug("No tag.json files found")
|
|
return tags
|
|
|
|
logger.debug(f"Found {total} tag.json files")
|
|
|
|
loaded = 0
|
|
failed = 0
|
|
|
|
for tag_json in tag_files:
|
|
# Extract UUID from path: /datastore/{uuid}/tag.json
|
|
uuid_dir = os.path.basename(os.path.dirname(tag_json))
|
|
tag = load_tag_from_file(tag_json, uuid_dir, rehydrate_entity_func)
|
|
if tag:
|
|
tags[uuid_dir] = tag
|
|
loaded += 1
|
|
else:
|
|
# load_tag_from_file already logged the specific error
|
|
failed += 1
|
|
|
|
if failed > 0:
|
|
logger.warning(f"Loaded {loaded} tags, {failed} tags FAILED to load")
|
|
else:
|
|
logger.info(f"Loaded {loaded} tags from disk")
|
|
|
|
return tags
|
|
|
|
|
|
# ============================================================================
|
|
# FileSavingDataStore Class
|
|
# ============================================================================
|
|
|
|
class FileSavingDataStore(DataStore):
|
|
"""
|
|
Abstract datastore that provides file persistence with immediate commits.
|
|
|
|
Features:
|
|
- Individual watch.json files (one per watch)
|
|
- Immediate persistence via watch.commit() and datastore.commit()
|
|
- Atomic file writes for crash safety
|
|
|
|
Subclasses must implement:
|
|
- rehydrate_entity(): Convert dict to Watch object
|
|
- Access to internal __data structure for watch management
|
|
"""
|
|
|
|
def __init__(self):
|
|
super().__init__()
|
|
|
|
def _save_settings(self):
|
|
"""
|
|
Save settings to storage (polymorphic).
|
|
|
|
Subclasses must implement for their backend.
|
|
- File: changedetection.json
|
|
- Redis: SET settings
|
|
- SQL: UPDATE settings table
|
|
"""
|
|
raise NotImplementedError("Subclass must implement _save_settings")
|
|
|
|
|
|
def _load_watches(self):
|
|
"""
|
|
Load all watches from storage (polymorphic).
|
|
|
|
Subclasses must implement for their backend.
|
|
- File: Read individual watch.json files
|
|
- Redis: SCAN watch:* keys
|
|
- SQL: SELECT * FROM watches
|
|
"""
|
|
raise NotImplementedError("Subclass must implement _load_watches")
|
|
|
|
def _delete_watch(self, uuid):
|
|
"""
|
|
Delete a watch from storage (polymorphic).
|
|
|
|
Subclasses must implement for their backend.
|
|
- File: Delete {uuid}/ directory recursively
|
|
- Redis: DEL watch:{uuid}
|
|
- SQL: DELETE FROM watches WHERE uuid=?
|
|
|
|
Args:
|
|
uuid: Watch UUID to delete
|
|
"""
|
|
raise NotImplementedError("Subclass must implement _delete_watch")
|
|
|
|
|