mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-19 13:40:34 +00:00
Compare commits
5 Commits
datastore-
...
translatio
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
b3af9ae288 | ||
|
|
eb8470f5cb | ||
|
|
20b15484a9 | ||
|
|
7968b83f6d | ||
|
|
248daffb1b |
@@ -9,7 +9,6 @@ recursive-include changedetectionio/notification *
|
||||
recursive-include changedetectionio/processors *
|
||||
recursive-include changedetectionio/realtime *
|
||||
recursive-include changedetectionio/static *
|
||||
recursive-include changedetectionio/store *
|
||||
recursive-include changedetectionio/templates *
|
||||
recursive-include changedetectionio/tests *
|
||||
recursive-include changedetectionio/translations *
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||||
# Semver means never use .01, or 00. Should be .1.
|
||||
__version__ = '0.52.7'
|
||||
__version__ = '0.52.6'
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
from json.decoder import JSONDecodeError
|
||||
@@ -112,12 +112,12 @@ def sigshutdown_handler(_signo, _stack_frame):
|
||||
except Exception as e:
|
||||
logger.error(f"Error shutting down Socket.IO server: {str(e)}")
|
||||
|
||||
# Save data quickly - force immediate save using abstract method
|
||||
# Save data quickly
|
||||
try:
|
||||
datastore.force_save_all()
|
||||
logger.success('Fast sync to storage complete.')
|
||||
datastore.sync_to_json()
|
||||
logger.success('Fast sync to disk complete.')
|
||||
except Exception as e:
|
||||
logger.error(f"Error syncing to storage: {str(e)}")
|
||||
logger.error(f"Error syncing to disk: {str(e)}")
|
||||
|
||||
sys.exit()
|
||||
|
||||
|
||||
@@ -27,23 +27,11 @@ def create_backup(datastore_path, watches: dict):
|
||||
compression=zipfile.ZIP_DEFLATED,
|
||||
compresslevel=8) as zipObj:
|
||||
|
||||
# Add the settings file (supports both formats)
|
||||
# New format: changedetection.json
|
||||
changedetection_json = os.path.join(datastore_path, "changedetection.json")
|
||||
if os.path.isfile(changedetection_json):
|
||||
zipObj.write(changedetection_json, arcname="changedetection.json")
|
||||
logger.debug("Added changedetection.json to backup")
|
||||
# Add the index
|
||||
zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json")
|
||||
|
||||
# Legacy format: url-watches.json (for backward compatibility)
|
||||
url_watches_json = os.path.join(datastore_path, "url-watches.json")
|
||||
if os.path.isfile(url_watches_json):
|
||||
zipObj.write(url_watches_json, arcname="url-watches.json")
|
||||
logger.debug("Added url-watches.json to backup")
|
||||
|
||||
# Add the flask app secret (if it exists)
|
||||
secret_file = os.path.join(datastore_path, "secret.txt")
|
||||
if os.path.isfile(secret_file):
|
||||
zipObj.write(secret_file, arcname="secret.txt")
|
||||
# Add the flask app secret
|
||||
zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt")
|
||||
|
||||
# Add any data in the watch data directory.
|
||||
for uuid, w in watches.items():
|
||||
@@ -102,8 +90,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
flash(gettext("Maximum number of backups reached, please remove some"), "error")
|
||||
return redirect(url_for('backups.index'))
|
||||
|
||||
# Be sure we're written fresh - force immediate save using abstract method
|
||||
datastore.force_save_all()
|
||||
# Be sure we're written fresh
|
||||
datastore.sync_to_json()
|
||||
zip_thread = threading.Thread(
|
||||
target=create_backup,
|
||||
args=(datastore.datastore_path, datastore.data.get("watching")),
|
||||
|
||||
@@ -62,7 +62,7 @@ class import_url_list(Importer):
|
||||
extras = None
|
||||
if processor:
|
||||
extras = {'processor': processor}
|
||||
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, save_immediately=False, extras=extras)
|
||||
new_uuid = datastore.add_watch(url=url.strip(), tag=tags, write_to_disk_now=False, extras=extras)
|
||||
|
||||
if new_uuid:
|
||||
# Straight into the queue.
|
||||
@@ -129,7 +129,7 @@ class import_distill_io_json(Importer):
|
||||
new_uuid = datastore.add_watch(url=d['uri'].strip(),
|
||||
tag=",".join(d.get('tags', [])),
|
||||
extras=extras,
|
||||
save_immediately=False)
|
||||
write_to_disk_now=False)
|
||||
|
||||
if new_uuid:
|
||||
# Straight into the queue.
|
||||
@@ -204,7 +204,7 @@ class import_xlsx_wachete(Importer):
|
||||
new_uuid = datastore.add_watch(url=data['url'].strip(),
|
||||
extras=extras,
|
||||
tag=data.get('folder'),
|
||||
save_immediately=False)
|
||||
write_to_disk_now=False)
|
||||
if new_uuid:
|
||||
# Straight into the queue.
|
||||
self.new_uuids.append(new_uuid)
|
||||
@@ -287,7 +287,7 @@ class import_xlsx_custom(Importer):
|
||||
new_uuid = datastore.add_watch(url=url,
|
||||
extras=extras,
|
||||
tag=tags,
|
||||
save_immediately=False)
|
||||
write_to_disk_now=False)
|
||||
if new_uuid:
|
||||
# Straight into the queue.
|
||||
self.new_uuids.append(new_uuid)
|
||||
|
||||
1139
changedetectionio/store.py
Normal file
1139
changedetectionio/store.py
Normal file
File diff suppressed because it is too large
Load Diff
@@ -1,958 +0,0 @@
|
||||
import shutil
|
||||
|
||||
from changedetectionio.strtobool import strtobool
|
||||
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
|
||||
from flask import (
|
||||
flash
|
||||
)
|
||||
from flask_babel import gettext
|
||||
|
||||
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
from ..model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from copy import deepcopy, copy
|
||||
from os import path, unlink
|
||||
from threading import Lock
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid as uuid_builder
|
||||
from loguru import logger
|
||||
from blinker import signal
|
||||
|
||||
# Try to import orjson for faster JSON serialization
|
||||
try:
|
||||
import orjson
|
||||
|
||||
HAS_ORJSON = True
|
||||
except ImportError:
|
||||
HAS_ORJSON = False
|
||||
|
||||
from ..processors import get_custom_watch_obj_for_processor
|
||||
from ..processors.restock_diff import Restock
|
||||
|
||||
# Import the base class and helpers
|
||||
from .file_saving_datastore import FileSavingDataStore, load_all_watches, save_watch_atomic, save_json_atomic
|
||||
from .updates import DatastoreUpdatesMixin
|
||||
from .legacy_loader import detect_format, has_legacy_datastore
|
||||
|
||||
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
|
||||
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
|
||||
|
||||
dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)])
|
||||
|
||||
|
||||
# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods?
|
||||
# Open a github issue if you know something :)
|
||||
# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change
|
||||
class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
__version_check = True
|
||||
|
||||
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
|
||||
# Initialize parent class
|
||||
super().__init__()
|
||||
|
||||
# Should only be active for docker
|
||||
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
|
||||
self.datastore_path = datastore_path
|
||||
self.needs_write = False
|
||||
self.start_time = time.time()
|
||||
self.stop_thread = False
|
||||
self.save_version_copy_json_db(version_tag)
|
||||
self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag)
|
||||
|
||||
def save_version_copy_json_db(self, version_tag):
|
||||
"""
|
||||
Create version-tagged backup of changedetection.json.
|
||||
|
||||
This is called on version upgrades to preserve a backup in case
|
||||
the new version has issues.
|
||||
"""
|
||||
import re
|
||||
|
||||
version_text = re.sub(r'\D+', '-', version_tag)
|
||||
db_path = os.path.join(self.datastore_path, "changedetection.json")
|
||||
db_path_version_backup = os.path.join(self.datastore_path, f"changedetection-{version_text}.json")
|
||||
|
||||
if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path):
|
||||
from shutil import copyfile
|
||||
logger.info(f"Backing up changedetection.json due to new version to '{db_path_version_backup}'.")
|
||||
copyfile(db_path, db_path_version_backup)
|
||||
|
||||
def _load_settings(self):
|
||||
"""
|
||||
Load settings from storage.
|
||||
|
||||
File backend implementation: reads from changedetection.json
|
||||
|
||||
Returns:
|
||||
dict: Settings data loaded from storage
|
||||
"""
|
||||
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
|
||||
|
||||
logger.info(f"Loading settings from {changedetection_json}")
|
||||
|
||||
if HAS_ORJSON:
|
||||
with open(changedetection_json, 'rb') as f:
|
||||
return orjson.loads(f.read())
|
||||
else:
|
||||
with open(changedetection_json, 'r', encoding='utf-8') as f:
|
||||
return json.load(f)
|
||||
|
||||
def _apply_settings(self, settings_data):
|
||||
"""
|
||||
Apply loaded settings data to internal data structure.
|
||||
|
||||
Args:
|
||||
settings_data: Dictionary loaded from changedetection.json
|
||||
"""
|
||||
# Apply top-level fields
|
||||
if 'app_guid' in settings_data:
|
||||
self.__data['app_guid'] = settings_data['app_guid']
|
||||
if 'build_sha' in settings_data:
|
||||
self.__data['build_sha'] = settings_data['build_sha']
|
||||
if 'version_tag' in settings_data:
|
||||
self.__data['version_tag'] = settings_data['version_tag']
|
||||
|
||||
# Apply settings sections
|
||||
if 'settings' in settings_data:
|
||||
if 'headers' in settings_data['settings']:
|
||||
self.__data['settings']['headers'].update(settings_data['settings']['headers'])
|
||||
if 'requests' in settings_data['settings']:
|
||||
self.__data['settings']['requests'].update(settings_data['settings']['requests'])
|
||||
if 'application' in settings_data['settings']:
|
||||
self.__data['settings']['application'].update(settings_data['settings']['application'])
|
||||
|
||||
def _rehydrate_tags(self):
|
||||
"""Rehydrate tag entities from stored data."""
|
||||
for uuid, tag in self.__data['settings']['application']['tags'].items():
|
||||
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(
|
||||
uuid, tag, processor_override='restock_diff'
|
||||
)
|
||||
logger.info(f"Tag: {uuid} {tag['title']}")
|
||||
|
||||
|
||||
def _load_state(self):
|
||||
"""
|
||||
Load complete datastore state from storage.
|
||||
|
||||
Orchestrates loading of settings and watches using polymorphic methods.
|
||||
"""
|
||||
# Load settings
|
||||
settings_data = self._load_settings()
|
||||
self._apply_settings(settings_data)
|
||||
|
||||
# Load watches (polymorphic - parent class method)
|
||||
self._load_watches()
|
||||
|
||||
# Rehydrate tags
|
||||
self._rehydrate_tags()
|
||||
|
||||
def reload_state(self, datastore_path, include_default_watches, version_tag):
|
||||
"""
|
||||
Load datastore from storage or create new one.
|
||||
|
||||
Supports two scenarios:
|
||||
1. NEW format: changedetection.json exists → load and run updates if needed
|
||||
2. EMPTY: No changedetection.json → create new OR trigger migration from legacy
|
||||
|
||||
Note: Legacy url-watches.json migration happens in update_26, not here.
|
||||
"""
|
||||
logger.info(f"Datastore path is '{datastore_path}'")
|
||||
|
||||
# Initialize data structure
|
||||
self.__data = App.model()
|
||||
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
|
||||
|
||||
# Base definition for all watchers (deepcopy part of #569)
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
|
||||
|
||||
# Load build SHA if available (Docker deployments)
|
||||
if path.isfile('changedetectionio/source.txt'):
|
||||
with open('changedetectionio/source.txt') as f:
|
||||
self.__data['build_sha'] = f.read()
|
||||
|
||||
# Detect which format to load
|
||||
format_type = detect_format(self.datastore_path)
|
||||
logger.info(f"Detected datastore format: {format_type}")
|
||||
|
||||
if format_type == 'new':
|
||||
# Load from new format (changedetection.json + watch.json files)
|
||||
logger.info("Loading from new format (changedetection.json + individual watch.json)")
|
||||
try:
|
||||
self._load_state()
|
||||
except Exception as e:
|
||||
logger.critical(f"Failed to load datastore: {e}")
|
||||
raise
|
||||
|
||||
# Run schema updates if needed
|
||||
self.run_updates()
|
||||
|
||||
else:
|
||||
# Empty - check if this is a fresh install or migration needed
|
||||
# Generate app_guid FIRST (required for all operations)
|
||||
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
|
||||
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
|
||||
else:
|
||||
self.__data['app_guid'] = str(uuid_builder.uuid4())
|
||||
|
||||
# Generate RSS access token
|
||||
self.__data['settings']['application']['rss_access_token'] = secrets.token_hex(16)
|
||||
|
||||
# Generate API access token
|
||||
self.__data['settings']['application']['api_access_token'] = secrets.token_hex(16)
|
||||
|
||||
# Check if legacy datastore exists (url-watches.json)
|
||||
if has_legacy_datastore(self.datastore_path):
|
||||
# Legacy datastore detected - trigger migration
|
||||
logger.critical(f"Legacy datastore detected at {self.datastore_path}/url-watches.json")
|
||||
logger.critical("Migration will be triggered via update_26")
|
||||
|
||||
# Set schema version to 0 to trigger ALL updates including update_26
|
||||
self.__data['settings']['application']['schema_version'] = 0
|
||||
|
||||
# update_26 will load the legacy data and migrate to new format
|
||||
# Data will be loaded into memory during update_26, no need to add default watches
|
||||
self.run_updates()
|
||||
|
||||
else:
|
||||
# Fresh install - create new datastore
|
||||
logger.critical(f"No datastore found, creating new datastore at {self.datastore_path}")
|
||||
|
||||
# Set schema version to latest (no updates needed)
|
||||
updates_available = self.get_updates_available()
|
||||
self.__data['settings']['application']['schema_version'] = updates_available.pop() if updates_available else 26
|
||||
|
||||
# Add default watches if requested
|
||||
if include_default_watches:
|
||||
self.add_watch(
|
||||
url='https://news.ycombinator.com/',
|
||||
tag='Tech news',
|
||||
extras={'fetch_backend': 'html_requests'}
|
||||
)
|
||||
self.add_watch(
|
||||
url='https://changedetection.io/CHANGELOG.txt',
|
||||
tag='changedetection.io',
|
||||
extras={'fetch_backend': 'html_requests'}
|
||||
)
|
||||
|
||||
# Create changedetection.json immediately
|
||||
try:
|
||||
self._save_settings()
|
||||
logger.info("Created changedetection.json for new datastore")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create initial changedetection.json: {e}")
|
||||
|
||||
# Set version tag
|
||||
self.__data['version_tag'] = version_tag
|
||||
|
||||
# Validate proxies.json if it exists
|
||||
_ = self.proxy_list # Just to test parsing
|
||||
|
||||
# Ensure app_guid exists (for datastores loaded from existing files)
|
||||
if 'app_guid' not in self.__data:
|
||||
if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ:
|
||||
self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4())
|
||||
else:
|
||||
self.__data['app_guid'] = str(uuid_builder.uuid4())
|
||||
self.mark_settings_dirty()
|
||||
|
||||
# Ensure RSS access token exists
|
||||
if not self.__data['settings']['application'].get('rss_access_token'):
|
||||
secret = secrets.token_hex(16)
|
||||
self.__data['settings']['application']['rss_access_token'] = secret
|
||||
self.mark_settings_dirty()
|
||||
|
||||
# Ensure API access token exists
|
||||
if not self.__data['settings']['application'].get('api_access_token'):
|
||||
secret = secrets.token_hex(16)
|
||||
self.__data['settings']['application']['api_access_token'] = secret
|
||||
self.mark_settings_dirty()
|
||||
|
||||
# Handle password reset lockfile
|
||||
password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock")
|
||||
if path.isfile(password_reset_lockfile):
|
||||
self.remove_password()
|
||||
unlink(password_reset_lockfile)
|
||||
|
||||
# Start the background save thread
|
||||
self.start_save_thread()
|
||||
|
||||
def rehydrate_entity(self, uuid, entity, processor_override=None):
|
||||
"""Set the dict back to the dict Watch object"""
|
||||
entity['uuid'] = uuid
|
||||
|
||||
if processor_override:
|
||||
watch_class = get_custom_watch_obj_for_processor(processor_override)
|
||||
entity['processor'] = processor_override
|
||||
else:
|
||||
watch_class = get_custom_watch_obj_for_processor(entity.get('processor'))
|
||||
|
||||
if entity.get('uuid') != 'text_json_diff':
|
||||
logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}")
|
||||
|
||||
entity = watch_class(datastore_path=self.datastore_path, default=entity)
|
||||
return entity
|
||||
|
||||
# ============================================================================
|
||||
# FileSavingDataStore Abstract Method Implementations
|
||||
# ============================================================================
|
||||
|
||||
def _watch_exists(self, uuid):
|
||||
"""Check if watch exists in datastore."""
|
||||
return uuid in self.__data['watching']
|
||||
|
||||
def _get_watch_dict(self, uuid):
|
||||
"""Get watch as dictionary."""
|
||||
return dict(self.__data['watching'][uuid])
|
||||
|
||||
def _build_settings_data(self):
|
||||
"""
|
||||
Build settings data structure for saving.
|
||||
|
||||
Returns:
|
||||
dict: Settings data ready for serialization
|
||||
"""
|
||||
return {
|
||||
'note': 'Settings file - watches are stored in individual {uuid}/watch.json files',
|
||||
'app_guid': self.__data['app_guid'],
|
||||
'settings': self.__data['settings'],
|
||||
'build_sha': self.__data.get('build_sha'),
|
||||
'version_tag': self.__data.get('version_tag')
|
||||
}
|
||||
|
||||
def _save_settings(self):
|
||||
"""
|
||||
Save settings to storage.
|
||||
|
||||
File backend implementation: saves to changedetection.json
|
||||
Implementation of abstract method from FileSavingDataStore.
|
||||
Uses the generic save_json_atomic helper.
|
||||
|
||||
Raises:
|
||||
OSError: If disk is full or other I/O error
|
||||
"""
|
||||
settings_data = self._build_settings_data()
|
||||
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
|
||||
save_json_atomic(changedetection_json, settings_data, label="settings", max_size_mb=10)
|
||||
|
||||
def _load_watches(self):
|
||||
"""
|
||||
Load all watches from storage.
|
||||
|
||||
File backend implementation: reads individual watch.json files
|
||||
Implementation of abstract method from FileSavingDataStore.
|
||||
Delegates to helper function and stores results in internal data structure.
|
||||
"""
|
||||
watching, watch_hashes = load_all_watches(
|
||||
self.datastore_path,
|
||||
self.rehydrate_entity,
|
||||
self._compute_hash
|
||||
)
|
||||
|
||||
# Store loaded data
|
||||
self.__data['watching'] = watching
|
||||
self._watch_hashes = watch_hashes
|
||||
|
||||
def _delete_watch(self, uuid):
|
||||
"""
|
||||
Delete a watch from storage.
|
||||
|
||||
File backend implementation: deletes entire {uuid}/ directory recursively.
|
||||
Implementation of abstract method from FileSavingDataStore.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID to delete
|
||||
"""
|
||||
watch_dir = os.path.join(self.datastore_path, uuid)
|
||||
if os.path.exists(watch_dir):
|
||||
shutil.rmtree(watch_dir)
|
||||
logger.info(f"Deleted watch directory: {watch_dir}")
|
||||
|
||||
# ============================================================================
|
||||
# Watch Management Methods
|
||||
# ============================================================================
|
||||
|
||||
def set_last_viewed(self, uuid, timestamp):
|
||||
logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}")
|
||||
self.data['watching'][uuid].update({'last_viewed': int(timestamp)})
|
||||
self.mark_watch_dirty(uuid)
|
||||
|
||||
watch_check_update = signal('watch_check_update')
|
||||
if watch_check_update:
|
||||
watch_check_update.send(watch_uuid=uuid)
|
||||
|
||||
def remove_password(self):
|
||||
self.__data['settings']['application']['password'] = False
|
||||
self.mark_settings_dirty()
|
||||
|
||||
def update_watch(self, uuid, update_obj):
|
||||
|
||||
# It's possible that the watch could be deleted before update
|
||||
if not self.__data['watching'].get(uuid):
|
||||
return
|
||||
|
||||
with self.lock:
|
||||
|
||||
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
|
||||
for dict_key, d in self.generic_definition.items():
|
||||
if isinstance(d, dict):
|
||||
if update_obj is not None and dict_key in update_obj:
|
||||
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
|
||||
del (update_obj[dict_key])
|
||||
|
||||
self.__data['watching'][uuid].update(update_obj)
|
||||
|
||||
self.mark_watch_dirty(uuid)
|
||||
|
||||
@property
|
||||
def threshold_seconds(self):
|
||||
seconds = 0
|
||||
for m, n in Watch.mtable.items():
|
||||
x = self.__data['settings']['requests']['time_between_check'].get(m)
|
||||
if x:
|
||||
seconds += x * n
|
||||
return seconds
|
||||
|
||||
@property
|
||||
def unread_changes_count(self):
|
||||
unread_changes_count = 0
|
||||
for uuid, watch in self.__data['watching'].items():
|
||||
if watch.history_n >= 2 and watch.viewed == False:
|
||||
unread_changes_count += 1
|
||||
|
||||
return unread_changes_count
|
||||
|
||||
@property
|
||||
def data(self):
|
||||
# Re #152, Return env base_url if not overriden
|
||||
# Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services
|
||||
# like 'Join', so it's always best to atleast set something obvious so that they are not broken.
|
||||
|
||||
active_base_url = BASE_URL_NOT_SET_TEXT
|
||||
if self.__data['settings']['application'].get('base_url'):
|
||||
active_base_url = self.__data['settings']['application'].get('base_url')
|
||||
elif os.getenv('BASE_URL'):
|
||||
active_base_url = os.getenv('BASE_URL')
|
||||
|
||||
# I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable
|
||||
# even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something
|
||||
d = self.__data
|
||||
d['settings']['application']['active_base_url'] = active_base_url.strip('" ')
|
||||
return d
|
||||
|
||||
# Delete a single watch by UUID
|
||||
def delete(self, uuid):
|
||||
"""
|
||||
Delete a watch by UUID.
|
||||
|
||||
Uses abstracted storage method for backend-agnostic deletion.
|
||||
Supports 'all' to delete all watches (mainly for testing).
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID to delete, or 'all' to delete all watches
|
||||
"""
|
||||
with self.lock:
|
||||
if uuid == 'all':
|
||||
# Delete all watches - capture UUIDs first before modifying dict
|
||||
all_uuids = list(self.__data['watching'].keys())
|
||||
|
||||
for watch_uuid in all_uuids:
|
||||
# Delete from storage using polymorphic method
|
||||
try:
|
||||
self._delete_watch(watch_uuid)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}")
|
||||
|
||||
# Clean up tracking data
|
||||
self._watch_hashes.pop(watch_uuid, None)
|
||||
self._dirty_watches.discard(watch_uuid)
|
||||
|
||||
# Send delete signal
|
||||
watch_delete_signal = signal('watch_deleted')
|
||||
if watch_delete_signal:
|
||||
watch_delete_signal.send(watch_uuid=watch_uuid)
|
||||
|
||||
# Clear the dict
|
||||
self.__data['watching'] = {}
|
||||
|
||||
# Mainly used for testing to allow all items to flush before running next test
|
||||
time.sleep(1)
|
||||
|
||||
else:
|
||||
# Delete single watch from storage using polymorphic method
|
||||
try:
|
||||
self._delete_watch(uuid)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete watch {uuid} from storage: {e}")
|
||||
|
||||
# Remove from watching dict
|
||||
del self.data['watching'][uuid]
|
||||
|
||||
# Clean up tracking data
|
||||
self._watch_hashes.pop(uuid, None)
|
||||
self._dirty_watches.discard(uuid)
|
||||
|
||||
# Send delete signal
|
||||
watch_delete_signal = signal('watch_deleted')
|
||||
if watch_delete_signal:
|
||||
watch_delete_signal.send(watch_uuid=uuid)
|
||||
|
||||
self.needs_write_urgent = True
|
||||
|
||||
# Clone a watch by UUID
|
||||
def clone(self, uuid):
|
||||
url = self.data['watching'][uuid].get('url')
|
||||
extras = deepcopy(self.data['watching'][uuid])
|
||||
new_uuid = self.add_watch(url=url, extras=extras)
|
||||
watch = self.data['watching'][new_uuid]
|
||||
return new_uuid
|
||||
|
||||
def url_exists(self, url):
|
||||
|
||||
# Probably their should be dict...
|
||||
for watch in self.data['watching'].values():
|
||||
if watch['url'].lower() == url.lower():
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
# Remove a watchs data but keep the entry (URL etc)
|
||||
def clear_watch_history(self, uuid):
|
||||
self.__data['watching'][uuid].clear_watch()
|
||||
self.needs_write_urgent = True
|
||||
|
||||
def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True):
|
||||
import requests
|
||||
|
||||
if extras is None:
|
||||
extras = {}
|
||||
|
||||
# Incase these are copied across, assume it's a reference and deepcopy()
|
||||
apply_extras = deepcopy(extras)
|
||||
apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags')
|
||||
|
||||
# Was it a share link? try to fetch the data
|
||||
if (url.startswith("https://changedetection.io/share/")):
|
||||
try:
|
||||
r = requests.request(method="GET",
|
||||
url=url,
|
||||
# So we know to return the JSON instead of the human-friendly "help" page
|
||||
headers={'App-Guid': self.__data['app_guid']})
|
||||
res = r.json()
|
||||
|
||||
# List of permissible attributes we accept from the wild internet
|
||||
for k in [
|
||||
'body',
|
||||
'browser_steps',
|
||||
'css_filter',
|
||||
'extract_text',
|
||||
'headers',
|
||||
'ignore_text',
|
||||
'include_filters',
|
||||
'method',
|
||||
'paused',
|
||||
'previous_md5',
|
||||
'processor',
|
||||
'subtractive_selectors',
|
||||
'tag',
|
||||
'tags',
|
||||
'text_should_not_be_present',
|
||||
'title',
|
||||
'trigger_text',
|
||||
'url',
|
||||
'use_page_title_in_list',
|
||||
'webdriver_js_execute_code',
|
||||
]:
|
||||
if res.get(k):
|
||||
if k != 'css_filter':
|
||||
apply_extras[k] = res[k]
|
||||
else:
|
||||
# We renamed the field and made it a list
|
||||
apply_extras['include_filters'] = [res['css_filter']]
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}")
|
||||
flash(gettext("Error fetching metadata for {}").format(url), 'error')
|
||||
return False
|
||||
|
||||
if not is_safe_valid_url(url):
|
||||
flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error')
|
||||
|
||||
return None
|
||||
|
||||
if tag and type(tag) == str:
|
||||
# Then it's probably a string of the actual tag by name, split and add it
|
||||
for t in tag.split(','):
|
||||
# for each stripped tag, add tag as UUID
|
||||
for a_t in t.split(','):
|
||||
tag_uuid = self.add_tag(a_t)
|
||||
apply_extras['tags'].append(tag_uuid)
|
||||
|
||||
# Or if UUIDs given directly
|
||||
if tag_uuids:
|
||||
for t in tag_uuids:
|
||||
apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()]))
|
||||
|
||||
# Make any uuids unique
|
||||
if apply_extras.get('tags'):
|
||||
apply_extras['tags'] = list(set(apply_extras.get('tags')))
|
||||
|
||||
# If the processor also has its own Watch implementation
|
||||
watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor'))
|
||||
new_watch = watch_class(datastore_path=self.datastore_path, url=url)
|
||||
|
||||
new_uuid = new_watch.get('uuid')
|
||||
|
||||
logger.debug(f"Adding URL '{url}' - {new_uuid}")
|
||||
|
||||
for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']:
|
||||
if k in apply_extras:
|
||||
del apply_extras[k]
|
||||
|
||||
if not apply_extras.get('date_created'):
|
||||
apply_extras['date_created'] = int(time.time())
|
||||
|
||||
new_watch.update(apply_extras)
|
||||
new_watch.ensure_data_dir_exists()
|
||||
self.__data['watching'][new_uuid] = new_watch
|
||||
|
||||
if save_immediately:
|
||||
# Save immediately using polymorphic method
|
||||
try:
|
||||
self.save_watch(new_uuid, force=True)
|
||||
logger.debug(f"Saved new watch {new_uuid}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save new watch {new_uuid}: {e}")
|
||||
# Mark dirty for retry
|
||||
self.mark_watch_dirty(new_uuid)
|
||||
else:
|
||||
self.mark_watch_dirty(new_uuid)
|
||||
|
||||
logger.debug(f"Added '{url}'")
|
||||
|
||||
return new_uuid
|
||||
|
||||
def _watch_resource_exists(self, watch_uuid, resource_name):
|
||||
"""
|
||||
Check if a watch-related resource exists.
|
||||
|
||||
File backend implementation: checks if file exists in watch directory.
|
||||
|
||||
Args:
|
||||
watch_uuid: Watch UUID
|
||||
resource_name: Name of resource (e.g., "last-screenshot.png")
|
||||
|
||||
Returns:
|
||||
bool: True if resource exists
|
||||
"""
|
||||
resource_path = os.path.join(self.datastore_path, watch_uuid, resource_name)
|
||||
return path.isfile(resource_path)
|
||||
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
"""
|
||||
Check if visual selector data (screenshot + elements) is ready.
|
||||
|
||||
Returns:
|
||||
bool: True if both screenshot and elements data exist
|
||||
"""
|
||||
has_screenshot = self._watch_resource_exists(watch_uuid, "last-screenshot.png")
|
||||
has_elements = self._watch_resource_exists(watch_uuid, "elements.deflate")
|
||||
return has_screenshot and has_elements
|
||||
|
||||
# Old sync_to_json and save_datastore methods removed - now handled by FileSavingDataStore parent class
|
||||
|
||||
# Go through the datastore path and remove any snapshots that are not mentioned in the index
|
||||
# This usually is not used, but can be handy.
|
||||
def remove_unused_snapshots(self):
|
||||
logger.info("Removing snapshots from datastore that are not in the index..")
|
||||
|
||||
index = []
|
||||
for uuid in self.data['watching']:
|
||||
for id in self.data['watching'][uuid].history:
|
||||
index.append(self.data['watching'][uuid].history[str(id)])
|
||||
|
||||
import pathlib
|
||||
|
||||
# Only in the sub-directories
|
||||
for uuid in self.data['watching']:
|
||||
for item in pathlib.Path(self.datastore_path).rglob(uuid + "/*.txt"):
|
||||
if not str(item) in index:
|
||||
logger.info(f"Removing {item}")
|
||||
unlink(item)
|
||||
|
||||
@property
|
||||
def proxy_list(self):
|
||||
proxy_list = {}
|
||||
proxy_list_file = os.path.join(self.datastore_path, 'proxies.json')
|
||||
|
||||
# Load from external config file
|
||||
if path.isfile(proxy_list_file):
|
||||
if HAS_ORJSON:
|
||||
# orjson.loads() expects UTF-8 encoded bytes #3611
|
||||
with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f:
|
||||
proxy_list = orjson.loads(f.read())
|
||||
else:
|
||||
with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f:
|
||||
proxy_list = json.load(f)
|
||||
|
||||
# Mapping from UI config if available
|
||||
extras = self.data['settings']['requests'].get('extra_proxies')
|
||||
if extras:
|
||||
i = 0
|
||||
for proxy in extras:
|
||||
i += 0
|
||||
if proxy.get('proxy_name') and proxy.get('proxy_url'):
|
||||
k = "ui-" + str(i) + proxy.get('proxy_name')
|
||||
proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')}
|
||||
|
||||
if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')):
|
||||
proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''}
|
||||
|
||||
return proxy_list if len(proxy_list) else None
|
||||
|
||||
def get_preferred_proxy_for_watch(self, uuid):
|
||||
"""
|
||||
Returns the preferred proxy by ID key
|
||||
:param uuid: UUID
|
||||
:return: proxy "key" id
|
||||
"""
|
||||
|
||||
if self.proxy_list is None:
|
||||
return None
|
||||
|
||||
# If it's a valid one
|
||||
watch = self.data['watching'].get(uuid)
|
||||
|
||||
if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy":
|
||||
return None
|
||||
|
||||
if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()):
|
||||
return watch.get('proxy')
|
||||
|
||||
# not valid (including None), try the system one
|
||||
else:
|
||||
system_proxy_id = self.data['settings']['requests'].get('proxy')
|
||||
# Is not None and exists
|
||||
if self.proxy_list.get(system_proxy_id):
|
||||
return system_proxy_id
|
||||
|
||||
# Fallback - Did not resolve anything, or doesnt exist, use the first available
|
||||
if system_proxy_id is None or not self.proxy_list.get(system_proxy_id):
|
||||
first_default = list(self.proxy_list)[0]
|
||||
return first_default
|
||||
|
||||
return None
|
||||
|
||||
@property
|
||||
def has_extra_headers_file(self):
|
||||
filepath = os.path.join(self.datastore_path, 'headers.txt')
|
||||
return os.path.isfile(filepath)
|
||||
|
||||
def get_all_base_headers(self):
|
||||
headers = {}
|
||||
# Global app settings
|
||||
headers.update(self.data['settings'].get('headers', {}))
|
||||
|
||||
return headers
|
||||
|
||||
def get_all_headers_in_textfile_for_watch(self, uuid):
|
||||
from ..model.App import parse_headers_from_text_file
|
||||
headers = {}
|
||||
|
||||
# Global in /datastore/headers.txt
|
||||
filepath = os.path.join(self.datastore_path, 'headers.txt')
|
||||
try:
|
||||
if os.path.isfile(filepath):
|
||||
headers.update(parse_headers_from_text_file(filepath))
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
|
||||
|
||||
watch = self.data['watching'].get(uuid)
|
||||
if watch:
|
||||
|
||||
# In /datastore/xyz-xyz/headers.txt
|
||||
filepath = os.path.join(watch.watch_data_dir, 'headers.txt')
|
||||
try:
|
||||
if os.path.isfile(filepath):
|
||||
headers.update(parse_headers_from_text_file(filepath))
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
|
||||
|
||||
# In /datastore/tag-name.txt
|
||||
tags = self.get_all_tags_for_watch(uuid=uuid)
|
||||
for tag_uuid, tag in tags.items():
|
||||
fname = "headers-" + re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt"
|
||||
filepath = os.path.join(self.datastore_path, fname)
|
||||
try:
|
||||
if os.path.isfile(filepath):
|
||||
headers.update(parse_headers_from_text_file(filepath))
|
||||
except Exception as e:
|
||||
logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}")
|
||||
|
||||
return headers
|
||||
|
||||
def get_tag_overrides_for_watch(self, uuid, attr):
|
||||
tags = self.get_all_tags_for_watch(uuid=uuid)
|
||||
ret = []
|
||||
|
||||
if tags:
|
||||
for tag_uuid, tag in tags.items():
|
||||
if attr in tag and tag[attr]:
|
||||
ret = [*ret, *tag[attr]]
|
||||
|
||||
return ret
|
||||
|
||||
def add_tag(self, title):
|
||||
# If name exists, return that
|
||||
n = title.strip().lower()
|
||||
logger.debug(f">>> Adding new tag - '{n}'")
|
||||
if not n:
|
||||
return False
|
||||
|
||||
for uuid, tag in self.__data['settings']['application'].get('tags', {}).items():
|
||||
if n == tag.get('title', '').lower().strip():
|
||||
logger.warning(f"Tag '{title}' already exists, skipping creation.")
|
||||
return uuid
|
||||
|
||||
# Eventually almost everything todo with a watch will apply as a Tag
|
||||
# So we use the same model as a Watch
|
||||
with self.lock:
|
||||
from ..model import Tag
|
||||
new_tag = Tag.model(datastore_path=self.datastore_path, default={
|
||||
'title': title.strip(),
|
||||
'date_created': int(time.time())
|
||||
})
|
||||
|
||||
new_uuid = new_tag.get('uuid')
|
||||
|
||||
self.__data['settings']['application']['tags'][new_uuid] = new_tag
|
||||
|
||||
self.mark_settings_dirty()
|
||||
return new_uuid
|
||||
|
||||
def get_all_tags_for_watch(self, uuid):
|
||||
"""This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet"""
|
||||
watch = self.data['watching'].get(uuid)
|
||||
|
||||
# Should return a dict of full tag info linked by UUID
|
||||
if watch:
|
||||
return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', []))
|
||||
|
||||
return {}
|
||||
|
||||
@property
|
||||
def extra_browsers(self):
|
||||
res = []
|
||||
p = list(filter(
|
||||
lambda s: (s.get('browser_name') and s.get('browser_connection_url')),
|
||||
self.__data['settings']['requests'].get('extra_browsers', [])))
|
||||
if p:
|
||||
for i in p:
|
||||
res.append(("extra_browser_" + i['browser_name'], i['browser_name']))
|
||||
|
||||
return res
|
||||
|
||||
def tag_exists_by_name(self, tag_name):
|
||||
# Check if any tag dictionary has a 'title' attribute matching the provided tag_name
|
||||
tags = self.__data['settings']['application']['tags'].values()
|
||||
return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()),
|
||||
None)
|
||||
|
||||
def any_watches_have_processor_by_name(self, processor_name):
|
||||
for watch in self.data['watching'].values():
|
||||
if watch.get('processor') == processor_name:
|
||||
return True
|
||||
return False
|
||||
|
||||
def search_watches_for_url(self, query, tag_limit=None, partial=False):
|
||||
"""Search watches by URL, title, or error messages
|
||||
|
||||
Args:
|
||||
query (str): Search term to match against watch URLs, titles, and error messages
|
||||
tag_limit (str, optional): Optional tag name to limit search results
|
||||
partial: (bool, optional): sub-string matching
|
||||
|
||||
Returns:
|
||||
list: List of UUIDs of watches that match the search criteria
|
||||
"""
|
||||
matching_uuids = []
|
||||
query = query.lower().strip()
|
||||
tag = self.tag_exists_by_name(tag_limit) if tag_limit else False
|
||||
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
# Filter by tag if requested
|
||||
if tag_limit:
|
||||
if not tag.get('uuid') in watch.get('tags', []):
|
||||
continue
|
||||
|
||||
# Search in URL, title, or error messages
|
||||
if partial:
|
||||
if ((watch.get('title') and query in watch.get('title').lower()) or
|
||||
query in watch.get('url', '').lower() or
|
||||
(watch.get('last_error') and query in watch.get('last_error').lower())):
|
||||
matching_uuids.append(uuid)
|
||||
else:
|
||||
if ((watch.get('title') and query == watch.get('title').lower()) or
|
||||
query == watch.get('url', '').lower() or
|
||||
(watch.get('last_error') and query == watch.get('last_error').lower())):
|
||||
matching_uuids.append(uuid)
|
||||
|
||||
return matching_uuids
|
||||
|
||||
def get_unique_notification_tokens_available(self):
|
||||
# Ask each type of watch if they have any extra notification token to add to the validation
|
||||
extra_notification_tokens = {}
|
||||
watch_processors_checked = set()
|
||||
|
||||
for watch_uuid, watch in self.__data['watching'].items():
|
||||
processor = watch.get('processor')
|
||||
if processor not in watch_processors_checked:
|
||||
extra_notification_tokens.update(watch.extra_notification_token_values())
|
||||
watch_processors_checked.add(processor)
|
||||
|
||||
return extra_notification_tokens
|
||||
|
||||
def get_unique_notification_token_placeholders_available(self):
|
||||
# The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice
|
||||
extra_notification_tokens = []
|
||||
watch_processors_checked = set()
|
||||
|
||||
for watch_uuid, watch in self.__data['watching'].items():
|
||||
processor = watch.get('processor')
|
||||
if processor not in watch_processors_checked:
|
||||
extra_notification_tokens += watch.extra_notification_token_placeholder_info()
|
||||
watch_processors_checked.add(processor)
|
||||
|
||||
return extra_notification_tokens
|
||||
|
||||
def add_notification_url(self, notification_url):
|
||||
|
||||
logger.debug(f">>> Adding new notification_url - '{notification_url}'")
|
||||
|
||||
notification_urls = self.data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
with self.lock:
|
||||
notification_urls = self.__data['settings']['application'].get('notification_urls', [])
|
||||
|
||||
if notification_url in notification_urls:
|
||||
return notification_url
|
||||
|
||||
# Append and update the datastore
|
||||
notification_urls.append(notification_url)
|
||||
self.__data['settings']['application']['notification_urls'] = notification_urls
|
||||
|
||||
self.mark_settings_dirty()
|
||||
return notification_url
|
||||
|
||||
# Schema update methods moved to store/updates.py (DatastoreUpdatesMixin)
|
||||
# This includes: get_updates_available(), run_updates(), and update_1() through update_26()
|
||||
@@ -1,100 +0,0 @@
|
||||
"""
|
||||
Base classes for the datastore.
|
||||
|
||||
This module defines the abstract interfaces that all datastore implementations must follow.
|
||||
"""
|
||||
|
||||
from abc import ABC, abstractmethod
|
||||
from threading import Lock
|
||||
from loguru import logger
|
||||
|
||||
|
||||
class DataStore(ABC):
|
||||
"""
|
||||
Abstract base class for all datastore implementations.
|
||||
|
||||
Defines the core interface that all datastores must implement for:
|
||||
- Loading and saving data
|
||||
- Managing watches
|
||||
- Handling settings
|
||||
- Providing data access
|
||||
"""
|
||||
|
||||
lock = Lock()
|
||||
datastore_path = None
|
||||
|
||||
@abstractmethod
|
||||
def reload_state(self, datastore_path, include_default_watches, version_tag):
|
||||
"""
|
||||
Load data from persistent storage.
|
||||
|
||||
Args:
|
||||
datastore_path: Path to the datastore directory
|
||||
include_default_watches: Whether to create default watches if none exist
|
||||
version_tag: Application version string
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def add_watch(self, url, **kwargs):
|
||||
"""
|
||||
Add a new watch.
|
||||
|
||||
Args:
|
||||
url: URL to watch
|
||||
**kwargs: Additional watch parameters
|
||||
|
||||
Returns:
|
||||
UUID of the created watch
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def update_watch(self, uuid, update_obj):
|
||||
"""
|
||||
Update an existing watch.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
update_obj: Dictionary of fields to update
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete(self, uuid):
|
||||
"""
|
||||
Delete a watch.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID to delete
|
||||
"""
|
||||
pass
|
||||
|
||||
@property
|
||||
@abstractmethod
|
||||
def data(self):
|
||||
"""
|
||||
Access to the underlying data structure.
|
||||
|
||||
Returns:
|
||||
Dictionary containing all datastore data
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def force_save_all(self):
|
||||
"""
|
||||
Force immediate synchronous save of all data to storage.
|
||||
|
||||
This is the abstract method for forcing a complete save.
|
||||
Different backends implement this differently:
|
||||
- File backend: Mark all watches/settings dirty, then save
|
||||
- Redis backend: SAVE command or pipeline flush
|
||||
- SQL backend: COMMIT transaction
|
||||
|
||||
Used by:
|
||||
- Backup creation (ensure everything is saved before backup)
|
||||
- Shutdown (ensure all changes are persisted)
|
||||
- Manual save operations
|
||||
"""
|
||||
pass
|
||||
@@ -1,698 +0,0 @@
|
||||
"""
|
||||
File-based datastore with individual watch persistence and dirty tracking.
|
||||
|
||||
This module provides the FileSavingDataStore abstract class that implements:
|
||||
- Individual watch.json file persistence
|
||||
- Hash-based change detection (only save what changed)
|
||||
- Background save thread with dirty tracking
|
||||
- Atomic file writes safe for NFS/NAS
|
||||
"""
|
||||
|
||||
import hashlib
|
||||
import json
|
||||
import os
|
||||
import tempfile
|
||||
import time
|
||||
from threading import Thread
|
||||
from loguru import logger
|
||||
|
||||
from .base import DataStore
|
||||
|
||||
# Try to import orjson for faster JSON serialization
|
||||
try:
|
||||
import orjson
|
||||
HAS_ORJSON = True
|
||||
except ImportError:
|
||||
HAS_ORJSON = False
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# Helper Functions for Atomic File Operations
|
||||
# ============================================================================
|
||||
|
||||
def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10):
|
||||
"""
|
||||
Save JSON data to disk using atomic write pattern.
|
||||
|
||||
Generic helper for saving any JSON data (settings, watches, etc.) with:
|
||||
- Atomic write (temp file + rename)
|
||||
- Directory fsync for crash consistency
|
||||
- Size validation
|
||||
- Proper error handling
|
||||
|
||||
Args:
|
||||
file_path: Full path to target JSON file
|
||||
data_dict: Dictionary to serialize
|
||||
label: Human-readable label for error messages (e.g., "watch", "settings")
|
||||
max_size_mb: Maximum allowed file size in MB
|
||||
|
||||
Raises:
|
||||
ValueError: If serialized data exceeds max_size_mb
|
||||
OSError: If disk is full (ENOSPC) or other I/O error
|
||||
"""
|
||||
# Ensure parent directory exists
|
||||
parent_dir = os.path.dirname(file_path)
|
||||
os.makedirs(parent_dir, exist_ok=True)
|
||||
|
||||
# Create temp file in same directory (required for NFS atomicity)
|
||||
fd, temp_path = tempfile.mkstemp(
|
||||
suffix='.tmp',
|
||||
prefix='json-',
|
||||
dir=parent_dir,
|
||||
text=False
|
||||
)
|
||||
|
||||
fd_closed = False
|
||||
try:
|
||||
# Serialize data
|
||||
if HAS_ORJSON:
|
||||
data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2)
|
||||
else:
|
||||
data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8')
|
||||
|
||||
# Safety check: validate size
|
||||
MAX_SIZE = max_size_mb * 1024 * 1024
|
||||
data_size = len(data)
|
||||
if data_size > MAX_SIZE:
|
||||
raise ValueError(
|
||||
f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB "
|
||||
f"(max: {max_size_mb}MB). This indicates a bug or data corruption."
|
||||
)
|
||||
|
||||
# Write to temp file
|
||||
os.write(fd, data)
|
||||
os.fsync(fd) # Force file data to disk
|
||||
os.close(fd)
|
||||
fd_closed = True
|
||||
|
||||
# Atomic rename
|
||||
os.replace(temp_path, file_path)
|
||||
|
||||
# Sync directory to ensure filename metadata is durable
|
||||
try:
|
||||
dir_fd = os.open(parent_dir, os.O_RDONLY)
|
||||
try:
|
||||
os.fsync(dir_fd)
|
||||
finally:
|
||||
os.close(dir_fd)
|
||||
except (OSError, AttributeError):
|
||||
# Windows doesn't support fsync on directories
|
||||
pass
|
||||
|
||||
except OSError as e:
|
||||
# Cleanup temp file
|
||||
if not fd_closed:
|
||||
try:
|
||||
os.close(fd)
|
||||
except:
|
||||
pass
|
||||
if os.path.exists(temp_path):
|
||||
try:
|
||||
os.unlink(temp_path)
|
||||
except:
|
||||
pass
|
||||
|
||||
# Provide helpful error messages
|
||||
if e.errno == 28: # ENOSPC
|
||||
raise OSError(f"Disk full: Cannot save {label}") from e
|
||||
elif e.errno == 122: # EDQUOT
|
||||
raise OSError(f"Disk quota exceeded: Cannot save {label}") from e
|
||||
else:
|
||||
raise OSError(f"I/O error saving {label}: {e}") from e
|
||||
|
||||
except Exception as e:
|
||||
# Cleanup temp file
|
||||
if not fd_closed:
|
||||
try:
|
||||
os.close(fd)
|
||||
except:
|
||||
pass
|
||||
if os.path.exists(temp_path):
|
||||
try:
|
||||
os.unlink(temp_path)
|
||||
except:
|
||||
pass
|
||||
raise e
|
||||
|
||||
|
||||
def save_watch_atomic(watch_dir, uuid, watch_dict):
|
||||
"""
|
||||
Save a watch to disk using atomic write pattern.
|
||||
|
||||
Convenience wrapper around save_json_atomic for watches.
|
||||
|
||||
Args:
|
||||
watch_dir: Directory for this watch (e.g., /datastore/{uuid})
|
||||
uuid: Watch UUID (for logging)
|
||||
watch_dict: Dictionary representation of the watch
|
||||
|
||||
Raises:
|
||||
ValueError: If serialized data exceeds 10MB (indicates bug or corruption)
|
||||
OSError: If disk is full (ENOSPC) or other I/O error
|
||||
"""
|
||||
watch_json = os.path.join(watch_dir, "watch.json")
|
||||
save_json_atomic(watch_json, watch_dict, label=f"watch {uuid}", max_size_mb=10)
|
||||
|
||||
|
||||
def load_watch_from_file(watch_json, uuid, rehydrate_entity_func):
|
||||
"""
|
||||
Load a watch from its JSON file.
|
||||
|
||||
Args:
|
||||
watch_json: Path to the watch.json file
|
||||
uuid: Watch UUID
|
||||
rehydrate_entity_func: Function to convert dict to Watch object
|
||||
|
||||
Returns:
|
||||
Tuple of (Watch object, raw_data_dict) or (None, None) if failed
|
||||
The raw_data_dict is needed to compute the hash before rehydration
|
||||
"""
|
||||
try:
|
||||
# Check file size before reading
|
||||
file_size = os.path.getsize(watch_json)
|
||||
MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB
|
||||
if file_size > MAX_WATCH_SIZE:
|
||||
logger.critical(
|
||||
f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: "
|
||||
f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). "
|
||||
f"File: {watch_json}. This indicates a bug or data corruption. "
|
||||
f"Watch will be skipped."
|
||||
)
|
||||
return None, None
|
||||
|
||||
if HAS_ORJSON:
|
||||
with open(watch_json, 'rb') as f:
|
||||
watch_data = orjson.loads(f.read())
|
||||
else:
|
||||
with open(watch_json, 'r', encoding='utf-8') as f:
|
||||
watch_data = json.load(f)
|
||||
|
||||
# Return both the raw data and the rehydrated watch
|
||||
# Raw data is needed to compute hash before rehydration changes anything
|
||||
watch_obj = rehydrate_entity_func(uuid, watch_data)
|
||||
return watch_obj, watch_data
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.critical(
|
||||
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
|
||||
f"File: {watch_json}. Error: {e}. "
|
||||
f"Watch will be skipped and may need manual recovery from backup."
|
||||
)
|
||||
return None, None
|
||||
except ValueError as e:
|
||||
# orjson raises ValueError for invalid JSON
|
||||
if "invalid json" in str(e).lower() or HAS_ORJSON:
|
||||
logger.critical(
|
||||
f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. "
|
||||
f"File: {watch_json}. Error: {e}. "
|
||||
f"Watch will be skipped and may need manual recovery from backup."
|
||||
)
|
||||
return None, None
|
||||
# Re-raise if it's not a JSON parsing error
|
||||
raise
|
||||
except FileNotFoundError:
|
||||
logger.error(f"Watch file not found: {watch_json} for watch {uuid}")
|
||||
return None, None
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}")
|
||||
return None, None
|
||||
|
||||
|
||||
def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func):
|
||||
"""
|
||||
Load all watches from individual watch.json files.
|
||||
|
||||
SYNCHRONOUS loading: Blocks until all watches are loaded.
|
||||
This ensures data consistency - web server won't accept requests
|
||||
until all watches are available. Progress logged every 100 watches.
|
||||
|
||||
Args:
|
||||
datastore_path: Path to the datastore directory
|
||||
rehydrate_entity_func: Function to convert dict to Watch object
|
||||
compute_hash_func: Function to compute hash from raw watch dict
|
||||
|
||||
Returns:
|
||||
Tuple of (watching_dict, hashes_dict)
|
||||
- watching_dict: uuid -> Watch object
|
||||
- hashes_dict: uuid -> hash string (computed from raw data)
|
||||
"""
|
||||
logger.info("Loading watches from individual watch.json files...")
|
||||
|
||||
watching = {}
|
||||
watch_hashes = {}
|
||||
|
||||
# Find all UUID directories
|
||||
if not os.path.exists(datastore_path):
|
||||
return watching, watch_hashes
|
||||
|
||||
# Get all directories that look like UUIDs
|
||||
try:
|
||||
all_items = os.listdir(datastore_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to list datastore directory: {e}")
|
||||
return watching, watch_hashes
|
||||
|
||||
uuid_dirs = [
|
||||
d for d in all_items
|
||||
if os.path.isdir(os.path.join(datastore_path, d))
|
||||
and not d.startswith('.') # Skip hidden dirs
|
||||
and d not in ['__pycache__'] # Skip Python cache dirs
|
||||
]
|
||||
|
||||
# First pass: count directories with watch.json files
|
||||
watch_dirs = []
|
||||
for uuid_dir in uuid_dirs:
|
||||
watch_json = os.path.join(datastore_path, uuid_dir, "watch.json")
|
||||
if os.path.isfile(watch_json):
|
||||
watch_dirs.append(uuid_dir)
|
||||
|
||||
total = len(watch_dirs)
|
||||
loaded = 0
|
||||
failed = 0
|
||||
|
||||
for uuid_dir in watch_dirs:
|
||||
watch_json = os.path.join(datastore_path, uuid_dir, "watch.json")
|
||||
watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func)
|
||||
if watch and raw_data:
|
||||
watching[uuid_dir] = watch
|
||||
# Compute hash from raw data BEFORE rehydration to match saved hash
|
||||
watch_hashes[uuid_dir] = compute_hash_func(raw_data)
|
||||
loaded += 1
|
||||
|
||||
if loaded % 100 == 0:
|
||||
logger.info(f"Loaded {loaded}/{total} watches...")
|
||||
else:
|
||||
# load_watch_from_file already logged the specific error
|
||||
failed += 1
|
||||
|
||||
if failed > 0:
|
||||
logger.critical(
|
||||
f"LOAD COMPLETE: {loaded} watches loaded successfully, "
|
||||
f"{failed} watches FAILED to load (corrupted or invalid)"
|
||||
)
|
||||
else:
|
||||
logger.info(f"Loaded {loaded} watches from disk")
|
||||
|
||||
return watching, watch_hashes
|
||||
|
||||
|
||||
# ============================================================================
|
||||
# FileSavingDataStore Class
|
||||
# ============================================================================
|
||||
|
||||
class FileSavingDataStore(DataStore):
|
||||
"""
|
||||
Abstract datastore that provides file persistence with change tracking.
|
||||
|
||||
Features:
|
||||
- Individual watch.json files (one per watch)
|
||||
- Dirty tracking: Only saves items that have changed
|
||||
- Hash-based change detection: Prevents unnecessary writes
|
||||
- Background save thread: Non-blocking persistence
|
||||
- Two-tier urgency: Standard (60s) and urgent (immediate) saves
|
||||
|
||||
Subclasses must implement:
|
||||
- rehydrate_entity(): Convert dict to Watch object
|
||||
- Access to internal __data structure for watch management
|
||||
"""
|
||||
|
||||
needs_write = False
|
||||
needs_write_urgent = False
|
||||
stop_thread = False
|
||||
|
||||
# Change tracking
|
||||
_dirty_watches = set() # Watch UUIDs that need saving
|
||||
_dirty_settings = False # Settings changed
|
||||
_watch_hashes = {} # UUID -> SHA256 hash for change detection
|
||||
|
||||
# Health monitoring
|
||||
_last_save_time = 0 # Timestamp of last successful save
|
||||
_save_cycle_count = 0 # Number of save cycles completed
|
||||
_total_saves = 0 # Total watches saved (lifetime)
|
||||
_save_errors = 0 # Total save errors (lifetime)
|
||||
|
||||
def __init__(self):
|
||||
super().__init__()
|
||||
self.save_data_thread = None
|
||||
self._last_save_time = time.time()
|
||||
|
||||
def _compute_hash(self, watch_dict):
|
||||
"""
|
||||
Compute SHA256 hash of watch for change detection.
|
||||
|
||||
Args:
|
||||
watch_dict: Dictionary representation of watch
|
||||
|
||||
Returns:
|
||||
Hex string of SHA256 hash
|
||||
"""
|
||||
# Use orjson for deterministic serialization if available
|
||||
if HAS_ORJSON:
|
||||
json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS)
|
||||
else:
|
||||
json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False)
|
||||
json_bytes = json_str.encode('utf-8')
|
||||
|
||||
return hashlib.sha256(json_bytes).hexdigest()
|
||||
|
||||
def mark_watch_dirty(self, uuid):
|
||||
"""
|
||||
Mark a watch as needing save.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
"""
|
||||
with self.lock:
|
||||
self._dirty_watches.add(uuid)
|
||||
dirty_count = len(self._dirty_watches)
|
||||
|
||||
# Backpressure detection - warn if dirty set grows too large
|
||||
if dirty_count > 1000:
|
||||
logger.critical(
|
||||
f"BACKPRESSURE WARNING: {dirty_count} watches pending save! "
|
||||
f"Save thread may not be keeping up with write rate. "
|
||||
f"This could indicate disk I/O bottleneck or save thread failure."
|
||||
)
|
||||
elif dirty_count > 500:
|
||||
logger.warning(
|
||||
f"Dirty watch count high: {dirty_count} watches pending save. "
|
||||
f"Monitoring for potential backpressure."
|
||||
)
|
||||
|
||||
self.needs_write = True
|
||||
|
||||
def mark_settings_dirty(self):
|
||||
"""Mark settings as needing save."""
|
||||
with self.lock:
|
||||
self._dirty_settings = True
|
||||
self.needs_write = True
|
||||
|
||||
def save_watch(self, uuid, force=False):
|
||||
"""
|
||||
Save a single watch if it has changed (polymorphic method).
|
||||
|
||||
This is the high-level save method that handles:
|
||||
- Hash computation and change detection
|
||||
- Calling the backend-specific save implementation
|
||||
- Updating the hash cache
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
force: If True, skip hash check and save anyway
|
||||
|
||||
Returns:
|
||||
True if saved, False if skipped (unchanged)
|
||||
"""
|
||||
if not self._watch_exists(uuid):
|
||||
logger.warning(f"Cannot save watch {uuid} - does not exist")
|
||||
return False
|
||||
|
||||
watch_dict = self._get_watch_dict(uuid)
|
||||
current_hash = self._compute_hash(watch_dict)
|
||||
|
||||
# Skip save if unchanged (unless forced)
|
||||
if not force and current_hash == self._watch_hashes.get(uuid):
|
||||
#logger.debug(f"Watch {uuid} unchanged, skipping save")
|
||||
return False
|
||||
|
||||
try:
|
||||
self._save_watch(uuid, watch_dict)
|
||||
self._watch_hashes[uuid] = current_hash
|
||||
logger.debug(f"Saved watch {uuid}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save watch {uuid}: {e}")
|
||||
raise
|
||||
|
||||
def _save_watch(self, uuid, watch_dict):
|
||||
"""
|
||||
Save a single watch to storage (polymorphic).
|
||||
|
||||
Backend-specific implementation. Subclasses override for different storage:
|
||||
- File backend: Writes to {uuid}/watch.json
|
||||
- Redis backend: SET watch:{uuid}
|
||||
- SQL backend: UPDATE watches WHERE uuid=?
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
watch_dict: Dictionary representation of watch
|
||||
"""
|
||||
# Default file implementation
|
||||
watch_dir = os.path.join(self.datastore_path, uuid)
|
||||
save_watch_atomic(watch_dir, uuid, watch_dict)
|
||||
|
||||
def _save_settings(self):
|
||||
"""
|
||||
Save settings to storage (polymorphic).
|
||||
|
||||
Subclasses must implement for their backend.
|
||||
- File: changedetection.json
|
||||
- Redis: SET settings
|
||||
- SQL: UPDATE settings table
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _save_settings")
|
||||
|
||||
def _load_watches(self):
|
||||
"""
|
||||
Load all watches from storage (polymorphic).
|
||||
|
||||
Subclasses must implement for their backend.
|
||||
- File: Read individual watch.json files
|
||||
- Redis: SCAN watch:* keys
|
||||
- SQL: SELECT * FROM watches
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _load_watches")
|
||||
|
||||
def _delete_watch(self, uuid):
|
||||
"""
|
||||
Delete a watch from storage (polymorphic).
|
||||
|
||||
Subclasses must implement for their backend.
|
||||
- File: Delete {uuid}/ directory recursively
|
||||
- Redis: DEL watch:{uuid}
|
||||
- SQL: DELETE FROM watches WHERE uuid=?
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID to delete
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _delete_watch")
|
||||
|
||||
def _save_dirty_items(self):
|
||||
"""
|
||||
Save only items that have changed.
|
||||
|
||||
This is the core optimization: instead of saving the entire datastore,
|
||||
we only save watches that were marked dirty and settings if changed.
|
||||
"""
|
||||
start_time = time.time()
|
||||
|
||||
# Capture dirty sets under lock
|
||||
with self.lock:
|
||||
dirty_watches = list(self._dirty_watches)
|
||||
dirty_settings = self._dirty_settings
|
||||
self._dirty_watches.clear()
|
||||
self._dirty_settings = False
|
||||
|
||||
if not dirty_watches and not dirty_settings:
|
||||
return
|
||||
|
||||
logger.debug(f"Checking {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}")
|
||||
|
||||
# Save each dirty watch using the polymorphic save method
|
||||
saved_count = 0
|
||||
error_count = 0
|
||||
skipped_unchanged = 0
|
||||
|
||||
for uuid in dirty_watches:
|
||||
# Check if watch still exists (might have been deleted)
|
||||
if not self._watch_exists(uuid):
|
||||
# Watch was deleted, remove hash
|
||||
self._watch_hashes.pop(uuid, None)
|
||||
continue
|
||||
|
||||
# Pre-check hash to avoid unnecessary save_watch() calls
|
||||
watch_dict = self._get_watch_dict(uuid)
|
||||
current_hash = self._compute_hash(watch_dict)
|
||||
|
||||
if current_hash == self._watch_hashes.get(uuid):
|
||||
# Watch hasn't actually changed, skip
|
||||
skipped_unchanged += 1
|
||||
continue
|
||||
|
||||
try:
|
||||
if self.save_watch(uuid, force=True): # force=True since we already checked hash
|
||||
saved_count += 1
|
||||
except Exception as e:
|
||||
error_count += 1
|
||||
# Re-mark for retry
|
||||
with self.lock:
|
||||
self._dirty_watches.add(uuid)
|
||||
|
||||
# Save settings if changed
|
||||
if dirty_settings:
|
||||
try:
|
||||
self._save_settings()
|
||||
logger.debug("Saved settings")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save settings: {e}")
|
||||
error_count += 1
|
||||
with self.lock:
|
||||
self._dirty_settings = True
|
||||
|
||||
# Update metrics
|
||||
elapsed = time.time() - start_time
|
||||
self._save_cycle_count += 1
|
||||
self._total_saves += saved_count
|
||||
self._save_errors += error_count
|
||||
self._last_save_time = time.time()
|
||||
|
||||
# Log performance metrics
|
||||
if saved_count > 0:
|
||||
avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds
|
||||
skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else ""
|
||||
logger.info(
|
||||
f"Successfully saved {saved_count} watches in {elapsed:.2f}s "
|
||||
f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}). "
|
||||
f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)"
|
||||
)
|
||||
elif skipped_unchanged > 0:
|
||||
logger.debug(f"Save cycle: {skipped_unchanged} watches unchanged, nothing saved")
|
||||
|
||||
if error_count > 0:
|
||||
logger.error(f"Save cycle completed with {error_count} errors")
|
||||
|
||||
self.needs_write = False
|
||||
self.needs_write_urgent = False
|
||||
|
||||
def _watch_exists(self, uuid):
|
||||
"""
|
||||
Check if watch exists. Subclass must implement.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
|
||||
Returns:
|
||||
bool
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _watch_exists")
|
||||
|
||||
def _get_watch_dict(self, uuid):
|
||||
"""
|
||||
Get watch as dictionary. Subclass must implement.
|
||||
|
||||
Args:
|
||||
uuid: Watch UUID
|
||||
|
||||
Returns:
|
||||
Dictionary representation of watch
|
||||
"""
|
||||
raise NotImplementedError("Subclass must implement _get_watch_dict")
|
||||
|
||||
def save_datastore(self):
|
||||
"""
|
||||
Background thread that periodically saves dirty items.
|
||||
|
||||
Runs every 60 seconds (with 0.5s sleep intervals for responsiveness),
|
||||
or immediately when needs_write_urgent is set.
|
||||
"""
|
||||
while True:
|
||||
if self.stop_thread:
|
||||
# Graceful shutdown: flush any remaining dirty items before stopping
|
||||
if self.needs_write or self._dirty_watches or self._dirty_settings:
|
||||
logger.warning("Datastore save thread stopping - flushing remaining dirty items...")
|
||||
try:
|
||||
self._save_dirty_items()
|
||||
logger.info("Graceful shutdown complete - all data saved")
|
||||
except Exception as e:
|
||||
logger.critical(f"FAILED to save dirty items during shutdown: {e}")
|
||||
else:
|
||||
logger.info("Datastore save thread stopping - no dirty items")
|
||||
return
|
||||
|
||||
if self.needs_write or self.needs_write_urgent:
|
||||
try:
|
||||
self._save_dirty_items()
|
||||
except Exception as e:
|
||||
logger.error(f"Error in save cycle: {e}")
|
||||
|
||||
# 60 second timer with early break for urgent saves
|
||||
for i in range(120):
|
||||
time.sleep(0.5)
|
||||
if self.stop_thread or self.needs_write_urgent:
|
||||
break
|
||||
|
||||
def start_save_thread(self):
|
||||
"""Start the background save thread."""
|
||||
if not self.save_data_thread or not self.save_data_thread.is_alive():
|
||||
self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver")
|
||||
self.save_data_thread.start()
|
||||
logger.info("Datastore save thread started")
|
||||
|
||||
def force_save_all(self):
|
||||
"""
|
||||
Force immediate synchronous save of all changes to storage.
|
||||
|
||||
File backend implementation of the abstract force_save_all() method.
|
||||
Marks all watches and settings as dirty, then saves immediately.
|
||||
|
||||
Used by:
|
||||
- Backup creation (ensure everything is saved before backup)
|
||||
- Shutdown (ensure all changes are persisted)
|
||||
- Manual save operations
|
||||
"""
|
||||
logger.info("Force saving all data to storage...")
|
||||
|
||||
# Mark everything as dirty to ensure complete save
|
||||
for uuid in self.data['watching'].keys():
|
||||
self.mark_watch_dirty(uuid)
|
||||
self.mark_settings_dirty()
|
||||
|
||||
# Save immediately (synchronous)
|
||||
self._save_dirty_items()
|
||||
|
||||
logger.success("All data saved to storage")
|
||||
|
||||
def get_health_status(self):
|
||||
"""
|
||||
Get datastore health status for monitoring.
|
||||
|
||||
Returns:
|
||||
dict with health metrics and status
|
||||
"""
|
||||
now = time.time()
|
||||
time_since_last_save = now - self._last_save_time
|
||||
|
||||
with self.lock:
|
||||
dirty_count = len(self._dirty_watches)
|
||||
|
||||
is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive()
|
||||
|
||||
# Determine health status
|
||||
if not is_thread_alive:
|
||||
status = "CRITICAL"
|
||||
message = "Save thread is DEAD"
|
||||
elif time_since_last_save > 300: # 5 minutes
|
||||
status = "WARNING"
|
||||
message = f"No save activity for {time_since_last_save:.0f}s"
|
||||
elif dirty_count > 1000:
|
||||
status = "WARNING"
|
||||
message = f"High backpressure: {dirty_count} watches pending"
|
||||
elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01:
|
||||
status = "WARNING"
|
||||
message = f"High error rate: {self._save_errors} errors"
|
||||
else:
|
||||
status = "HEALTHY"
|
||||
message = "Operating normally"
|
||||
|
||||
return {
|
||||
"status": status,
|
||||
"message": message,
|
||||
"thread_alive": is_thread_alive,
|
||||
"dirty_watches": dirty_count,
|
||||
"dirty_settings": self._dirty_settings,
|
||||
"last_save_seconds_ago": int(time_since_last_save),
|
||||
"save_cycles": self._save_cycle_count,
|
||||
"total_saves": self._total_saves,
|
||||
"total_errors": self._save_errors,
|
||||
"error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2)
|
||||
}
|
||||
@@ -1,91 +0,0 @@
|
||||
"""
|
||||
Legacy format loader for url-watches.json.
|
||||
|
||||
Provides functions to detect and load from the legacy monolithic JSON format.
|
||||
Used during migration (update_26) to transition to individual watch.json files.
|
||||
"""
|
||||
|
||||
import os
|
||||
import json
|
||||
from loguru import logger
|
||||
|
||||
# Try to import orjson for faster JSON serialization
|
||||
try:
|
||||
import orjson
|
||||
HAS_ORJSON = True
|
||||
except ImportError:
|
||||
HAS_ORJSON = False
|
||||
|
||||
|
||||
def detect_format(datastore_path):
|
||||
"""
|
||||
Detect which datastore format is in use.
|
||||
|
||||
Returns:
|
||||
- 'new': changedetection.json exists (new format)
|
||||
- 'empty': No changedetection.json (first run or needs migration)
|
||||
|
||||
Note: Legacy url-watches.json detection is handled by update_26 during migration.
|
||||
Runtime only distinguishes between 'new' (already migrated) and 'empty' (needs setup/migration).
|
||||
|
||||
Args:
|
||||
datastore_path: Path to datastore directory
|
||||
|
||||
Returns:
|
||||
str: 'new' or 'empty'
|
||||
"""
|
||||
changedetection_json = os.path.join(datastore_path, "changedetection.json")
|
||||
|
||||
if os.path.exists(changedetection_json):
|
||||
return 'new'
|
||||
else:
|
||||
return 'empty'
|
||||
|
||||
|
||||
def has_legacy_datastore(datastore_path):
|
||||
"""
|
||||
Check if a legacy url-watches.json file exists.
|
||||
|
||||
This is used by update_26 to determine if migration is needed.
|
||||
|
||||
Args:
|
||||
datastore_path: Path to datastore directory
|
||||
|
||||
Returns:
|
||||
bool: True if url-watches.json exists
|
||||
"""
|
||||
url_watches_json = os.path.join(datastore_path, "url-watches.json")
|
||||
return os.path.exists(url_watches_json)
|
||||
|
||||
|
||||
def load_legacy_format(json_store_path):
|
||||
"""
|
||||
Load datastore from legacy url-watches.json format.
|
||||
|
||||
Args:
|
||||
json_store_path: Full path to url-watches.json file
|
||||
|
||||
Returns:
|
||||
dict: Loaded datastore data with 'watching', 'settings', etc.
|
||||
None: If file doesn't exist or loading failed
|
||||
"""
|
||||
logger.info(f"Loading from legacy format: {json_store_path}")
|
||||
|
||||
if not os.path.isfile(json_store_path):
|
||||
logger.warning(f"Legacy file not found: {json_store_path}")
|
||||
return None
|
||||
|
||||
try:
|
||||
if HAS_ORJSON:
|
||||
with open(json_store_path, 'rb') as f:
|
||||
data = orjson.loads(f.read())
|
||||
else:
|
||||
with open(json_store_path, 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
logger.info(f"Loaded {len(data.get('watching', {}))} watches from legacy format")
|
||||
return data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load legacy format: {e}")
|
||||
return None
|
||||
@@ -1,660 +0,0 @@
|
||||
"""
|
||||
Schema update migrations for the datastore.
|
||||
|
||||
This module contains all schema version upgrade methods (update_1 through update_N).
|
||||
These are mixed into ChangeDetectionStore to keep the main store file focused.
|
||||
|
||||
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
|
||||
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import shutil
|
||||
import tarfile
|
||||
import time
|
||||
from loguru import logger
|
||||
from copy import deepcopy
|
||||
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
from ..processors.restock_diff import Restock
|
||||
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
|
||||
from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from .file_saving_datastore import save_watch_atomic
|
||||
|
||||
|
||||
def create_backup_tarball(datastore_path, update_number):
|
||||
"""
|
||||
Create a tarball backup of the entire datastore structure before running an update.
|
||||
|
||||
Includes:
|
||||
- All {uuid}/watch.json files
|
||||
- changedetection.json (settings, if it exists)
|
||||
- url-watches.json (legacy format, if it exists)
|
||||
- Directory structure preserved
|
||||
|
||||
Args:
|
||||
datastore_path: Path to datastore directory
|
||||
update_number: Update number being applied
|
||||
|
||||
Returns:
|
||||
str: Path to created tarball, or None if backup failed
|
||||
|
||||
Restoration:
|
||||
To restore from a backup:
|
||||
cd /path/to/datastore
|
||||
tar -xzf before-update-N-timestamp.tar.gz
|
||||
This will restore all watch.json files and settings to their pre-update state.
|
||||
"""
|
||||
timestamp = int(time.time())
|
||||
backup_filename = f"before-update-{update_number}-{timestamp}.tar.gz"
|
||||
backup_path = os.path.join(datastore_path, backup_filename)
|
||||
|
||||
try:
|
||||
logger.info(f"Creating backup tarball: {backup_filename}")
|
||||
|
||||
with tarfile.open(backup_path, "w:gz") as tar:
|
||||
# Backup changedetection.json if it exists (new format)
|
||||
changedetection_json = os.path.join(datastore_path, "changedetection.json")
|
||||
if os.path.isfile(changedetection_json):
|
||||
tar.add(changedetection_json, arcname="changedetection.json")
|
||||
logger.debug("Added changedetection.json to backup")
|
||||
|
||||
# Backup url-watches.json if it exists (legacy format)
|
||||
url_watches_json = os.path.join(datastore_path, "url-watches.json")
|
||||
if os.path.isfile(url_watches_json):
|
||||
tar.add(url_watches_json, arcname="url-watches.json")
|
||||
logger.debug("Added url-watches.json to backup")
|
||||
|
||||
# Backup all watch directories with their watch.json files
|
||||
# This preserves the UUID directory structure
|
||||
watch_count = 0
|
||||
for entry in os.listdir(datastore_path):
|
||||
entry_path = os.path.join(datastore_path, entry)
|
||||
|
||||
# Skip if not a directory
|
||||
if not os.path.isdir(entry_path):
|
||||
continue
|
||||
|
||||
# Skip hidden directories and backup directories
|
||||
if entry.startswith('.') or entry.startswith('before-update-'):
|
||||
continue
|
||||
|
||||
# Check if this directory has a watch.json (indicates it's a watch UUID directory)
|
||||
watch_json = os.path.join(entry_path, "watch.json")
|
||||
if os.path.isfile(watch_json):
|
||||
# Add the watch.json file preserving directory structure
|
||||
tar.add(watch_json, arcname=f"{entry}/watch.json")
|
||||
watch_count += 1
|
||||
|
||||
if watch_count % 100 == 0:
|
||||
logger.debug(f"Backed up {watch_count} watch.json files...")
|
||||
|
||||
logger.success(f"Backup created: {backup_filename} ({watch_count} watches)")
|
||||
return backup_path
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create backup tarball: {e}")
|
||||
# Try to clean up partial backup
|
||||
if os.path.exists(backup_path):
|
||||
try:
|
||||
os.unlink(backup_path)
|
||||
except:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
class DatastoreUpdatesMixin:
|
||||
"""
|
||||
Mixin class containing all schema update methods.
|
||||
|
||||
This class is inherited by ChangeDetectionStore to provide schema migration functionality.
|
||||
Each update_N method upgrades the schema from version N-1 to version N.
|
||||
"""
|
||||
|
||||
def get_updates_available(self):
|
||||
"""
|
||||
Discover all available update methods.
|
||||
|
||||
Returns:
|
||||
list: Sorted list of update version numbers (e.g., [1, 2, 3, ..., 26])
|
||||
"""
|
||||
import inspect
|
||||
updates_available = []
|
||||
for i, o in inspect.getmembers(self, predicate=inspect.ismethod):
|
||||
m = re.search(r'update_(\d+)$', i)
|
||||
if m:
|
||||
updates_available.append(int(m.group(1)))
|
||||
updates_available.sort()
|
||||
|
||||
return updates_available
|
||||
|
||||
def run_updates(self):
|
||||
"""
|
||||
Run all pending schema updates sequentially.
|
||||
|
||||
IMPORTANT: Each update could be run even when they have a new install and the schema is correct.
|
||||
Therefore - each `update_n` should be very careful about checking if it needs to actually run.
|
||||
|
||||
Process:
|
||||
1. Get list of available updates
|
||||
2. For each update > current schema version:
|
||||
- Create backup of datastore
|
||||
- Run update method
|
||||
- Update schema version
|
||||
- Mark settings and watches dirty
|
||||
3. If any update fails, stop processing
|
||||
4. Save all changes immediately
|
||||
"""
|
||||
updates_available = self.get_updates_available()
|
||||
updates_ran = []
|
||||
|
||||
for update_n in updates_available:
|
||||
if update_n > self.data['settings']['application']['schema_version']:
|
||||
logger.critical(f"Applying update_{update_n}")
|
||||
|
||||
# Create tarball backup of entire datastore structure
|
||||
# This includes all watch.json files, settings, and preserves directory structure
|
||||
backup_path = create_backup_tarball(self.datastore_path, update_n)
|
||||
if backup_path:
|
||||
logger.info(f"Backup created at: {backup_path}")
|
||||
else:
|
||||
logger.warning("Backup creation failed, but continuing with update")
|
||||
|
||||
try:
|
||||
update_method = getattr(self, f"update_{update_n}")()
|
||||
except Exception as e:
|
||||
logger.error(f"Error while trying update_{update_n}")
|
||||
logger.error(e)
|
||||
# Don't run any more updates
|
||||
return
|
||||
else:
|
||||
# Bump the version, important
|
||||
self.data['settings']['application']['schema_version'] = update_n
|
||||
self.mark_settings_dirty()
|
||||
|
||||
# CRITICAL: Mark all watches as dirty so changes are persisted
|
||||
# Most updates modify watches, and in the new individual watch.json structure,
|
||||
# we need to ensure those changes are saved
|
||||
logger.info(f"Marking all {len(self.data['watching'])} watches as dirty after update_{update_n}")
|
||||
for uuid in self.data['watching'].keys():
|
||||
self.mark_watch_dirty(uuid)
|
||||
|
||||
# Track which updates ran
|
||||
updates_ran.append(update_n)
|
||||
|
||||
# If any updates ran, save all changes immediately
|
||||
if updates_ran:
|
||||
logger.critical(f"Saving all changes after running {len(updates_ran)} update(s): {updates_ran}")
|
||||
try:
|
||||
self._save_dirty_items()
|
||||
logger.success("All update changes saved successfully")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save updates: {e}")
|
||||
# Don't raise - updates already ran, just log the error
|
||||
|
||||
# ============================================================================
|
||||
# Individual Update Methods
|
||||
# ============================================================================
|
||||
|
||||
def update_1(self):
|
||||
"""Convert minutes to seconds on settings and each watch."""
|
||||
if self.data['settings']['requests'].get('minutes_between_check'):
|
||||
self.data['settings']['requests']['time_between_check']['minutes'] = self.data['settings']['requests']['minutes_between_check']
|
||||
# Remove the default 'hours' that is set from the model
|
||||
self.data['settings']['requests']['time_between_check']['hours'] = None
|
||||
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if 'minutes_between_check' in watch:
|
||||
# Only upgrade individual watch time if it was set
|
||||
if watch.get('minutes_between_check', False):
|
||||
self.data['watching'][uuid]['time_between_check']['minutes'] = watch['minutes_between_check']
|
||||
|
||||
def update_2(self):
|
||||
"""
|
||||
Move the history list to a flat text file index.
|
||||
Better than SQLite because this list is only appended to, and works across NAS / NFS type setups.
|
||||
"""
|
||||
# @todo test running this on a newly updated one (when this already ran)
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
history = []
|
||||
|
||||
if watch.get('history', False):
|
||||
for d, p in watch['history'].items():
|
||||
d = int(d) # Used to be keyed as str, we'll fix this now too
|
||||
history.append("{},{}\n".format(d, p))
|
||||
|
||||
if len(history):
|
||||
target_path = os.path.join(self.datastore_path, uuid)
|
||||
if os.path.exists(target_path):
|
||||
with open(os.path.join(target_path, "history.txt"), "w") as f:
|
||||
f.writelines(history)
|
||||
else:
|
||||
logger.warning(f"Datastore history directory {target_path} does not exist, skipping history import.")
|
||||
|
||||
# No longer needed, dynamically pulled from the disk when needed.
|
||||
# But we should set it back to a empty dict so we don't break if this schema runs on an earlier version.
|
||||
# In the distant future we can remove this entirely
|
||||
self.data['watching'][uuid]['history'] = {}
|
||||
|
||||
def update_3(self):
|
||||
"""We incorrectly stored last_changed when there was not a change, and then confused the output list table."""
|
||||
# see https://github.com/dgtlmoon/changedetection.io/pull/835
|
||||
return
|
||||
|
||||
def update_4(self):
|
||||
"""`last_changed` not needed, we pull that information from the history.txt index."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
# Remove it from the struct
|
||||
del(watch['last_changed'])
|
||||
except:
|
||||
continue
|
||||
return
|
||||
|
||||
def update_5(self):
|
||||
"""
|
||||
If the watch notification body, title look the same as the global one, unset it, so the watch defaults back to using the main settings.
|
||||
In other words - the watch notification_title and notification_body are not needed if they are the same as the default one.
|
||||
"""
|
||||
current_system_body = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
|
||||
current_system_title = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE)
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
watch_body = watch.get('notification_body', '')
|
||||
if watch_body and watch_body.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_body:
|
||||
# Looks the same as the default one, so unset it
|
||||
watch['notification_body'] = None
|
||||
|
||||
watch_title = watch.get('notification_title', '')
|
||||
if watch_title and watch_title.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_title:
|
||||
# Looks the same as the default one, so unset it
|
||||
watch['notification_title'] = None
|
||||
except Exception as e:
|
||||
continue
|
||||
return
|
||||
|
||||
def update_7(self):
|
||||
"""
|
||||
We incorrectly used common header overrides that should only apply to Requests.
|
||||
These are now handled in content_fetcher::html_requests and shouldnt be passed to Playwright/Selenium.
|
||||
"""
|
||||
# These were hard-coded in early versions
|
||||
for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']:
|
||||
if self.data['settings']['headers'].get(v):
|
||||
del self.data['settings']['headers'][v]
|
||||
|
||||
def update_8(self):
|
||||
"""Convert filters to a list of filters css_filter -> include_filters."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
existing_filter = watch.get('css_filter', '')
|
||||
if existing_filter:
|
||||
watch['include_filters'] = [existing_filter]
|
||||
except:
|
||||
continue
|
||||
return
|
||||
|
||||
def update_9(self):
|
||||
"""Convert old static notification tokens to jinja2 tokens."""
|
||||
# Each watch
|
||||
# only { } not {{ or }}
|
||||
r = r'(?<!{){(?!{)(\w+)(?<!})}(?!})'
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
n_body = watch.get('notification_body', '')
|
||||
if n_body:
|
||||
watch['notification_body'] = re.sub(r, r'{{\1}}', n_body)
|
||||
|
||||
n_title = watch.get('notification_title')
|
||||
if n_title:
|
||||
watch['notification_title'] = re.sub(r, r'{{\1}}', n_title)
|
||||
|
||||
n_urls = watch.get('notification_urls')
|
||||
if n_urls:
|
||||
for i, url in enumerate(n_urls):
|
||||
watch['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
|
||||
|
||||
except:
|
||||
continue
|
||||
|
||||
# System wide
|
||||
n_body = self.data['settings']['application'].get('notification_body')
|
||||
if n_body:
|
||||
self.data['settings']['application']['notification_body'] = re.sub(r, r'{{\1}}', n_body)
|
||||
|
||||
n_title = self.data['settings']['application'].get('notification_title')
|
||||
if n_body:
|
||||
self.data['settings']['application']['notification_title'] = re.sub(r, r'{{\1}}', n_title)
|
||||
|
||||
n_urls = self.data['settings']['application'].get('notification_urls')
|
||||
if n_urls:
|
||||
for i, url in enumerate(n_urls):
|
||||
self.data['settings']['application']['notification_urls'][i] = re.sub(r, r'{{\1}}', url)
|
||||
|
||||
return
|
||||
|
||||
def update_10(self):
|
||||
"""Some setups may have missed the correct default, so it shows the wrong config in the UI, although it will default to system-wide."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
if not watch.get('fetch_backend', ''):
|
||||
watch['fetch_backend'] = 'system'
|
||||
except:
|
||||
continue
|
||||
return
|
||||
|
||||
def update_12(self):
|
||||
"""Create tag objects and their references from existing tag text."""
|
||||
i = 0
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
# Split out and convert old tag string
|
||||
tag = watch.get('tag')
|
||||
if tag:
|
||||
tag_uuids = []
|
||||
for t in tag.split(','):
|
||||
tag_uuids.append(self.add_tag(title=t))
|
||||
|
||||
self.data['watching'][uuid]['tags'] = tag_uuids
|
||||
|
||||
def update_13(self):
|
||||
"""#1775 - Update 11 did not update the records correctly when adding 'date_created' values for sorting."""
|
||||
i = 0
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if not watch.get('date_created'):
|
||||
self.data['watching'][uuid]['date_created'] = i
|
||||
i += 1
|
||||
return
|
||||
|
||||
def update_14(self):
|
||||
"""#1774 - protect xpath1 against migration."""
|
||||
for awatch in self.data["watching"]:
|
||||
if self.data["watching"][awatch]['include_filters']:
|
||||
for num, selector in enumerate(self.data["watching"][awatch]['include_filters']):
|
||||
if selector.startswith('/'):
|
||||
self.data["watching"][awatch]['include_filters'][num] = 'xpath1:' + selector
|
||||
if selector.startswith('xpath:'):
|
||||
self.data["watching"][awatch]['include_filters'][num] = selector.replace('xpath:', 'xpath1:', 1)
|
||||
|
||||
def update_15(self):
|
||||
"""Use more obvious default time setting."""
|
||||
for uuid in self.data["watching"]:
|
||||
if self.data["watching"][uuid]['time_between_check'] == self.data['settings']['requests']['time_between_check']:
|
||||
# What the old logic was, which was pretty confusing
|
||||
self.data["watching"][uuid]['time_between_check_use_default'] = True
|
||||
elif all(value is None or value == 0 for value in self.data["watching"][uuid]['time_between_check'].values()):
|
||||
self.data["watching"][uuid]['time_between_check_use_default'] = True
|
||||
else:
|
||||
# Something custom here
|
||||
self.data["watching"][uuid]['time_between_check_use_default'] = False
|
||||
|
||||
def update_16(self):
|
||||
"""Correctly set datatype for older installs where 'tag' was string and update_12 did not catch it."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if isinstance(watch.get('tags'), str):
|
||||
self.data['watching'][uuid]['tags'] = []
|
||||
|
||||
def update_17(self):
|
||||
"""Migrate old 'in_stock' values to the new Restock."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if 'in_stock' in watch:
|
||||
watch['restock'] = Restock({'in_stock': watch.get('in_stock')})
|
||||
del watch['in_stock']
|
||||
|
||||
def update_18(self):
|
||||
"""Migrate old restock settings."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if not watch.get('restock_settings'):
|
||||
# So we enable price following by default
|
||||
self.data['watching'][uuid]['restock_settings'] = {'follow_price_changes': True}
|
||||
|
||||
# Migrate and cleanoff old value
|
||||
self.data['watching'][uuid]['restock_settings']['in_stock_processing'] = 'in_stock_only' if watch.get(
|
||||
'in_stock_only') else 'all_changes'
|
||||
|
||||
if self.data['watching'][uuid].get('in_stock_only'):
|
||||
del (self.data['watching'][uuid]['in_stock_only'])
|
||||
|
||||
def update_19(self):
|
||||
"""Compress old elements.json to elements.deflate, saving disk, this compression is pretty fast."""
|
||||
import zlib
|
||||
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
json_path = os.path.join(self.datastore_path, uuid, "elements.json")
|
||||
deflate_path = os.path.join(self.datastore_path, uuid, "elements.deflate")
|
||||
|
||||
if os.path.exists(json_path):
|
||||
with open(json_path, "rb") as f_j:
|
||||
with open(deflate_path, "wb") as f_d:
|
||||
logger.debug(f"Compressing {str(json_path)} to {str(deflate_path)}..")
|
||||
f_d.write(zlib.compress(f_j.read()))
|
||||
os.unlink(json_path)
|
||||
|
||||
def update_20(self):
|
||||
"""Migrate extract_title_as_title to use_page_title_in_list."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
if self.data['watching'][uuid].get('extract_title_as_title'):
|
||||
self.data['watching'][uuid]['use_page_title_in_list'] = self.data['watching'][uuid].get('extract_title_as_title')
|
||||
del self.data['watching'][uuid]['extract_title_as_title']
|
||||
|
||||
if self.data['settings']['application'].get('extract_title_as_title'):
|
||||
self.data['settings']['application']['ui']['use_page_title_in_list'] = self.data['settings']['application'].get('extract_title_as_title')
|
||||
|
||||
def update_21(self):
|
||||
"""Migrate timezone to scheduler_timezone_default."""
|
||||
if self.data['settings']['application'].get('timezone'):
|
||||
self.data['settings']['application']['scheduler_timezone_default'] = self.data['settings']['application'].get('timezone')
|
||||
del self.data['settings']['application']['timezone']
|
||||
|
||||
def update_23(self):
|
||||
"""Some notification formats got the wrong name type."""
|
||||
|
||||
def re_run(formats):
|
||||
sys_n_format = self.data['settings']['application'].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in formats.items() if v == sys_n_format), None)
|
||||
if key_exists_as_value: # key of "Plain text"
|
||||
logger.success(f"['settings']['application']['notification_format'] '{sys_n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['settings']['application']['notification_format'] = key_exists_as_value
|
||||
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
n_format = self.data['watching'][uuid].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
|
||||
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
|
||||
logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever
|
||||
|
||||
for uuid, tag in self.data['settings']['application']['tags'].items():
|
||||
n_format = self.data['settings']['application']['tags'][uuid].get('notification_format')
|
||||
key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None)
|
||||
if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text"
|
||||
logger.success(
|
||||
f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'")
|
||||
self.data['settings']['application']['tags'][uuid][
|
||||
'notification_format'] = key_exists_as_value # should be 'text' or whatever
|
||||
|
||||
from ..notification import valid_notification_formats
|
||||
formats = deepcopy(valid_notification_formats)
|
||||
re_run(formats)
|
||||
# And in previous versions, it was "text" instead of Plain text, Markdown instead of "Markdown to HTML"
|
||||
formats['text'] = 'Text'
|
||||
formats['markdown'] = 'Markdown'
|
||||
re_run(formats)
|
||||
|
||||
def update_24(self):
|
||||
"""RSS types should be inline with the same names as notification types."""
|
||||
rss_format = self.data['settings']['application'].get('rss_content_format')
|
||||
if not rss_format or 'text' in rss_format:
|
||||
# might have been 'plaintext, 'plain text' or something
|
||||
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
|
||||
elif 'html' in rss_format:
|
||||
self.data['settings']['application']['rss_content_format'] = 'htmlcolor'
|
||||
else:
|
||||
# safe fallback to text
|
||||
self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT
|
||||
|
||||
def update_25(self):
|
||||
"""Different processors now hold their own history.txt."""
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
processor = self.data['watching'][uuid].get('processor')
|
||||
if processor != 'text_json_diff':
|
||||
old_history_txt = os.path.join(self.datastore_path, "history.txt")
|
||||
target_history_name = f"history-{processor}.txt"
|
||||
if os.path.isfile(old_history_txt) and not os.path.isfile(target_history_name):
|
||||
new_history_txt = os.path.join(self.datastore_path, target_history_name)
|
||||
logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...")
|
||||
shutil.move(old_history_txt, new_history_txt)
|
||||
|
||||
def update_26(self):
|
||||
"""
|
||||
Migration: Individual watch persistence (COPY-based, safe rollback).
|
||||
|
||||
Loads legacy url-watches.json format and migrates to:
|
||||
- {uuid}/watch.json (per watch)
|
||||
- changedetection.json (settings only)
|
||||
|
||||
IMPORTANT:
|
||||
- A tarball backup (before-update-26-timestamp.tar.gz) is created before migration
|
||||
- url-watches.json is LEFT INTACT for rollback safety
|
||||
- Users can roll back by simply downgrading to the previous version
|
||||
- Or restore from tarball: tar -xzf before-update-26-*.tar.gz
|
||||
|
||||
This is a dedicated migration release - users upgrade at their own pace.
|
||||
"""
|
||||
logger.critical("=" * 80)
|
||||
logger.critical("Running migration: Individual watch persistence (update_26)")
|
||||
logger.critical("COPY-based migration: url-watches.json will remain intact for rollback")
|
||||
logger.critical("=" * 80)
|
||||
|
||||
# Check if already migrated
|
||||
changedetection_json = os.path.join(self.datastore_path, "changedetection.json")
|
||||
if os.path.exists(changedetection_json):
|
||||
logger.info("Migration already completed (changedetection.json exists), skipping")
|
||||
return
|
||||
|
||||
# Check if we need to load legacy data
|
||||
from .legacy_loader import has_legacy_datastore, load_legacy_format
|
||||
|
||||
if not has_legacy_datastore(self.datastore_path):
|
||||
logger.info("No legacy datastore found, nothing to migrate")
|
||||
return
|
||||
|
||||
# Load legacy data from url-watches.json
|
||||
logger.critical("Loading legacy datastore from url-watches.json...")
|
||||
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
|
||||
legacy_data = load_legacy_format(legacy_path)
|
||||
|
||||
if not legacy_data:
|
||||
raise Exception("Failed to load legacy datastore from url-watches.json")
|
||||
|
||||
# Populate settings from legacy data
|
||||
logger.info("Populating settings from legacy data...")
|
||||
if 'settings' in legacy_data:
|
||||
self.data['settings'] = legacy_data['settings']
|
||||
if 'app_guid' in legacy_data:
|
||||
self.data['app_guid'] = legacy_data['app_guid']
|
||||
if 'build_sha' in legacy_data:
|
||||
self.data['build_sha'] = legacy_data['build_sha']
|
||||
if 'version_tag' in legacy_data:
|
||||
self.data['version_tag'] = legacy_data['version_tag']
|
||||
|
||||
# Rehydrate watches from legacy data
|
||||
logger.info("Rehydrating watches from legacy data...")
|
||||
self.data['watching'] = {}
|
||||
for uuid, watch_data in legacy_data.get('watching', {}).items():
|
||||
try:
|
||||
self.data['watching'][uuid] = self.rehydrate_entity(uuid, watch_data)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to rehydrate watch {uuid}: {e}")
|
||||
raise Exception(f"Migration failed: Could not rehydrate watch {uuid}. Error: {e}")
|
||||
|
||||
watch_count = len(self.data['watching'])
|
||||
logger.success(f"Loaded {watch_count} watches from legacy format")
|
||||
|
||||
# Phase 1: Save all watches to individual files
|
||||
logger.critical(f"Phase 1/4: Saving {watch_count} watches to individual watch.json files...")
|
||||
|
||||
saved_count = 0
|
||||
for uuid, watch in self.data['watching'].items():
|
||||
try:
|
||||
watch_dict = dict(watch)
|
||||
watch_dir = os.path.join(self.datastore_path, uuid)
|
||||
save_watch_atomic(watch_dir, uuid, watch_dict)
|
||||
# Initialize hash
|
||||
self._watch_hashes[uuid] = self._compute_hash(watch_dict)
|
||||
saved_count += 1
|
||||
|
||||
if saved_count % 100 == 0:
|
||||
logger.info(f" Progress: {saved_count}/{watch_count} watches saved...")
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save watch {uuid}: {e}")
|
||||
raise Exception(
|
||||
f"Migration failed: Could not save watch {uuid}. "
|
||||
f"url-watches.json remains intact, safe to retry. Error: {e}"
|
||||
)
|
||||
|
||||
logger.critical(f"Phase 1 complete: Saved {saved_count} watches")
|
||||
|
||||
# Phase 2: Verify all files exist
|
||||
logger.critical("Phase 2/4: Verifying all watch.json files were created...")
|
||||
|
||||
missing = []
|
||||
for uuid in self.data['watching'].keys():
|
||||
watch_json = os.path.join(self.datastore_path, uuid, "watch.json")
|
||||
if not os.path.isfile(watch_json):
|
||||
missing.append(uuid)
|
||||
|
||||
if missing:
|
||||
raise Exception(
|
||||
f"Migration failed: {len(missing)} watch files missing: {missing[:5]}... "
|
||||
f"url-watches.json remains intact, safe to retry."
|
||||
)
|
||||
|
||||
logger.critical(f"Phase 2 complete: Verified {watch_count} watch files")
|
||||
|
||||
# Phase 3: Create new settings file
|
||||
logger.critical("Phase 3/4: Creating changedetection.json...")
|
||||
|
||||
try:
|
||||
self._save_settings()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to create changedetection.json: {e}")
|
||||
raise Exception(
|
||||
f"Migration failed: Could not create changedetection.json. "
|
||||
f"url-watches.json remains intact, safe to retry. Error: {e}"
|
||||
)
|
||||
|
||||
# Phase 4: Verify settings file exists
|
||||
logger.critical("Phase 4/4: Verifying changedetection.json exists...")
|
||||
|
||||
if not os.path.isfile(changedetection_json):
|
||||
raise Exception(
|
||||
"Migration failed: changedetection.json not found after save. "
|
||||
"url-watches.json remains intact, safe to retry."
|
||||
)
|
||||
|
||||
logger.critical("Phase 4 complete: Verified changedetection.json exists")
|
||||
|
||||
# Success! Now reload from new format
|
||||
logger.critical("Reloading datastore from new format...")
|
||||
self._load_state()
|
||||
logger.success("Datastore reloaded from new format successfully")
|
||||
|
||||
logger.critical("=" * 80)
|
||||
logger.critical("MIGRATION COMPLETED SUCCESSFULLY!")
|
||||
logger.critical("=" * 80)
|
||||
logger.info("")
|
||||
logger.info("New format:")
|
||||
logger.info(f" - {watch_count} individual watch.json files created")
|
||||
logger.info(f" - changedetection.json created (settings only)")
|
||||
logger.info("")
|
||||
logger.info("Rollback safety:")
|
||||
logger.info(" - url-watches.json preserved for rollback")
|
||||
logger.info(" - To rollback: downgrade to previous version and restart")
|
||||
logger.info(" - No manual file operations needed")
|
||||
logger.info("")
|
||||
logger.info("Optional cleanup (after testing new version):")
|
||||
logger.info(f" - rm {os.path.join(self.datastore_path, 'url-watches.json')}")
|
||||
logger.info("")
|
||||
|
||||
# Schema version will be updated by run_updates()
|
||||
@@ -53,21 +53,11 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
backup = ZipFile(io.BytesIO(res.data))
|
||||
l = backup.namelist()
|
||||
uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
|
||||
newlist = list(filter(uuid4hex.match, l)) # Read Note below
|
||||
|
||||
# Check for UUID-based txt files (history and snapshot)
|
||||
uuid4hex_txt = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I)
|
||||
txt_files = list(filter(uuid4hex_txt.match, l))
|
||||
# Should be two txt files in the archive (history and the snapshot)
|
||||
assert len(txt_files) == 2
|
||||
|
||||
# Check for watch.json files (new format)
|
||||
uuid4hex_json = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}/watch\.json$', re.I)
|
||||
json_files = list(filter(uuid4hex_json.match, l))
|
||||
# Should be one watch.json file in the archive (the imported watch)
|
||||
assert len(json_files) == 1, f"Expected 1 watch.json file, found {len(json_files)}: {json_files}"
|
||||
|
||||
# Check for changedetection.json (settings file)
|
||||
assert 'changedetection.json' in l, "changedetection.json should be in backup"
|
||||
assert len(newlist) == 2
|
||||
|
||||
# Get the latest one
|
||||
res = client.get(
|
||||
|
||||
@@ -59,29 +59,11 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
# Wait for the sync DB save to happen
|
||||
time.sleep(2)
|
||||
|
||||
# Check which format is being used
|
||||
datastore_path = live_server.app.config['DATASTORE'].datastore_path
|
||||
changedetection_json = os.path.join(datastore_path, 'changedetection.json')
|
||||
url_watches_json = os.path.join(datastore_path, 'url-watches.json')
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
|
||||
json_obj = {'watching': {}}
|
||||
|
||||
if os.path.exists(changedetection_json):
|
||||
# New format: individual watch.json files
|
||||
logger.info("Testing with new format (changedetection.json + individual watch.json)")
|
||||
|
||||
# Load each watch.json file
|
||||
for uuid in live_server.app.config['DATASTORE'].data['watching'].keys():
|
||||
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
|
||||
assert os.path.isfile(watch_json_file), f"watch.json should exist at {watch_json_file}"
|
||||
|
||||
with open(watch_json_file, 'r', encoding='utf-8') as f:
|
||||
json_obj['watching'][uuid] = json.load(f)
|
||||
else:
|
||||
# Legacy format: url-watches.json
|
||||
logger.info("Testing with legacy format (url-watches.json)")
|
||||
with open(url_watches_json, 'r', encoding='utf-8') as f:
|
||||
json_obj = json.load(f)
|
||||
json_obj = None
|
||||
with open(json_db_file, 'r', encoding='utf-8') as f:
|
||||
json_obj = json.load(f)
|
||||
|
||||
# assert the right amount of watches was found in the JSON
|
||||
assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON"
|
||||
@@ -106,7 +88,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
|
||||
# Find the snapshot one
|
||||
for fname in files_in_watch_dir:
|
||||
if fname != 'history.txt' and fname != 'watch.json' and 'html' not in fname:
|
||||
if fname != 'history.txt' and 'html' not in fname:
|
||||
if strtobool(os.getenv("TEST_WITH_BROTLI")):
|
||||
assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename"
|
||||
|
||||
@@ -123,23 +105,11 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore
|
||||
assert json_obj['watching'][w]['title'], "Watch should have a title set"
|
||||
assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'"
|
||||
|
||||
# With new format, we also have watch.json, so 4 files total
|
||||
if os.path.exists(changedetection_json):
|
||||
assert len(files_in_watch_dir) == 4, "Should be four files in the dir with new format: watch.json, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
else:
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir with legacy format: html.br snapshot, history.txt and the extracted text snapshot"
|
||||
assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot"
|
||||
|
||||
# Check that 'default' Watch vars aren't accidentally being saved
|
||||
if os.path.exists(changedetection_json):
|
||||
# New format: check all individual watch.json files
|
||||
for uuid in json_obj['watching'].keys():
|
||||
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
|
||||
with open(watch_json_file, 'r', encoding='utf-8') as f:
|
||||
assert '"default"' not in f.read(), f"'default' probably shouldnt be here in {watch_json_file}, it came from when the 'default' Watch vars were accidently being saved"
|
||||
else:
|
||||
# Legacy format: check url-watches.json
|
||||
with open(url_watches_json, 'r', encoding='utf-8') as f:
|
||||
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
|
||||
json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json')
|
||||
with open(json_db_file, 'r', encoding='utf-8') as f:
|
||||
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
|
||||
|
||||
|
||||
def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
@@ -142,14 +142,10 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
watches_with_body = 0
|
||||
|
||||
# Read individual watch.json files
|
||||
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
|
||||
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
|
||||
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
|
||||
with open(watch_json_file, 'r', encoding='utf-8') as f:
|
||||
watch_data = json.load(f)
|
||||
if watch_data.get('body') == body_value:
|
||||
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
|
||||
app_struct = json.load(f)
|
||||
for uuid in app_struct['watching']:
|
||||
if app_struct['watching'][uuid]['body']==body_value:
|
||||
watches_with_body += 1
|
||||
|
||||
# Should be only one with body set
|
||||
@@ -229,14 +225,10 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_
|
||||
wait_for_all_checks(client)
|
||||
|
||||
watches_with_method = 0
|
||||
|
||||
# Read individual watch.json files
|
||||
for uuid in client.application.config.get('DATASTORE').data['watching'].keys():
|
||||
watch_json_file = os.path.join(datastore_path, uuid, 'watch.json')
|
||||
assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}"
|
||||
with open(watch_json_file, 'r', encoding='utf-8') as f:
|
||||
watch_data = json.load(f)
|
||||
if watch_data.get('method') == 'PATCH':
|
||||
with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f:
|
||||
app_struct = json.load(f)
|
||||
for uuid in app_struct['watching']:
|
||||
if app_struct['watching'][uuid]['method'] == 'PATCH':
|
||||
watches_with_method += 1
|
||||
|
||||
# Should be only one with method set to PATCH
|
||||
|
||||
Binary file not shown.
File diff suppressed because it is too large
Load Diff
Reference in New Issue
Block a user