diff --git a/.github/workflows/pypi-release.yml b/.github/workflows/pypi-release.yml index 8ff60552..4dd260c8 100644 --- a/.github/workflows/pypi-release.yml +++ b/.github/workflows/pypi-release.yml @@ -61,8 +61,8 @@ jobs: # --- API test --- # This also means that the docs/api-spec.yml was shipped and could be read - test -f /tmp/url-watches.json - API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/url-watches.json) + test -f /tmp/changedetection.json + API_KEY=$(jq -r '.. | .api_access_token? // empty' /tmp/changedetection.json) echo Test API KEY is $API_KEY curl -X POST "http://127.0.0.1:10000/api/v1/watch" \ -H "x-api-key: ${API_KEY}" \ diff --git a/MANIFEST.in b/MANIFEST.in index b3e17ce6..dfcf0cdd 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -9,6 +9,7 @@ recursive-include changedetectionio/notification * recursive-include changedetectionio/processors * recursive-include changedetectionio/realtime * recursive-include changedetectionio/static * +recursive-include changedetectionio/store * recursive-include changedetectionio/templates * recursive-include changedetectionio/tests * recursive-include changedetectionio/translations * diff --git a/changedetectionio/__init__.py b/changedetectionio/__init__.py index ab6080bb..64018dec 100644 --- a/changedetectionio/__init__.py +++ b/changedetectionio/__init__.py @@ -124,12 +124,12 @@ def sigshutdown_handler(_signo, _stack_frame): except Exception as e: logger.error(f"Error shutting down Socket.IO server: {str(e)}") - # Save data quickly + # Save data quickly - force immediate save using abstract method try: - datastore.sync_to_json() - logger.success('Fast sync to disk complete.') + datastore.force_save_all() + logger.success('Fast sync to storage complete.') except Exception as e: - logger.error(f"Error syncing to disk: {str(e)}") + logger.error(f"Error syncing to storage: {str(e)}") sys.exit() diff --git a/changedetectionio/blueprint/backups/__init__.py b/changedetectionio/blueprint/backups/__init__.py index adcd775f..646b8037 100644 --- a/changedetectionio/blueprint/backups/__init__.py +++ b/changedetectionio/blueprint/backups/__init__.py @@ -27,11 +27,23 @@ def create_backup(datastore_path, watches: dict): compression=zipfile.ZIP_DEFLATED, compresslevel=8) as zipObj: - # Add the index - zipObj.write(os.path.join(datastore_path, "url-watches.json"), arcname="url-watches.json") + # Add the settings file (supports both formats) + # New format: changedetection.json + changedetection_json = os.path.join(datastore_path, "changedetection.json") + if os.path.isfile(changedetection_json): + zipObj.write(changedetection_json, arcname="changedetection.json") + logger.debug("Added changedetection.json to backup") - # Add the flask app secret - zipObj.write(os.path.join(datastore_path, "secret.txt"), arcname="secret.txt") + # Legacy format: url-watches.json (for backward compatibility) + url_watches_json = os.path.join(datastore_path, "url-watches.json") + if os.path.isfile(url_watches_json): + zipObj.write(url_watches_json, arcname="url-watches.json") + logger.debug("Added url-watches.json to backup") + + # Add the flask app secret (if it exists) + secret_file = os.path.join(datastore_path, "secret.txt") + if os.path.isfile(secret_file): + zipObj.write(secret_file, arcname="secret.txt") # Add any data in the watch data directory. for uuid, w in watches.items(): @@ -90,8 +102,8 @@ def construct_blueprint(datastore: ChangeDetectionStore): flash(gettext("Maximum number of backups reached, please remove some"), "error") return redirect(url_for('backups.index')) - # Be sure we're written fresh - datastore.sync_to_json() + # Be sure we're written fresh - force immediate save using abstract method + datastore.force_save_all() zip_thread = threading.Thread( target=create_backup, args=(datastore.datastore_path, datastore.data.get("watching")), diff --git a/changedetectionio/blueprint/imports/importer.py b/changedetectionio/blueprint/imports/importer.py index c0807ade..663a5c42 100644 --- a/changedetectionio/blueprint/imports/importer.py +++ b/changedetectionio/blueprint/imports/importer.py @@ -62,7 +62,7 @@ class import_url_list(Importer): extras = None if processor: extras = {'processor': processor} - new_uuid = datastore.add_watch(url=url.strip(), tag=tags, write_to_disk_now=False, extras=extras) + new_uuid = datastore.add_watch(url=url.strip(), tag=tags, save_immediately=False, extras=extras) if new_uuid: # Straight into the queue. @@ -129,7 +129,7 @@ class import_distill_io_json(Importer): new_uuid = datastore.add_watch(url=d['uri'].strip(), tag=",".join(d.get('tags', [])), extras=extras, - write_to_disk_now=False) + save_immediately=False) if new_uuid: # Straight into the queue. @@ -204,7 +204,7 @@ class import_xlsx_wachete(Importer): new_uuid = datastore.add_watch(url=data['url'].strip(), extras=extras, tag=data.get('folder'), - write_to_disk_now=False) + save_immediately=False) if new_uuid: # Straight into the queue. self.new_uuids.append(new_uuid) @@ -287,7 +287,7 @@ class import_xlsx_custom(Importer): new_uuid = datastore.add_watch(url=url, extras=extras, tag=tags, - write_to_disk_now=False) + save_immediately=False) if new_uuid: # Straight into the queue. self.new_uuids.append(new_uuid) diff --git a/changedetectionio/blueprint/ui/__init__.py b/changedetectionio/blueprint/ui/__init__.py index 7dd5483e..296533ba 100644 --- a/changedetectionio/blueprint/ui/__init__.py +++ b/changedetectionio/blueprint/ui/__init__.py @@ -24,6 +24,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): datastore.data['watching'][uuid]['paused'] = True + datastore.mark_watch_dirty(uuid) if emit_flash: flash(gettext("{} watches paused").format(len(uuids))) @@ -31,6 +32,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): datastore.data['watching'][uuid.strip()]['paused'] = False + datastore.mark_watch_dirty(uuid) if emit_flash: flash(gettext("{} watches unpaused").format(len(uuids))) @@ -45,6 +47,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): datastore.data['watching'][uuid]['notification_muted'] = True + datastore.mark_watch_dirty(uuid) if emit_flash: flash(gettext("{} watches muted").format(len(uuids))) @@ -52,6 +55,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): datastore.data['watching'][uuid]['notification_muted'] = False + datastore.mark_watch_dirty(uuid) if emit_flash: flash(gettext("{} watches un-muted").format(len(uuids))) @@ -67,6 +71,7 @@ def _handle_operations(op, uuids, datastore, worker_handler, update_q, queuedWat for uuid in uuids: if datastore.data['watching'].get(uuid): datastore.data['watching'][uuid]["last_error"] = False + datastore.mark_watch_dirty(uuid) if emit_flash: flash(gettext("{} watches errors cleared").format(len(uuids))) diff --git a/changedetectionio/store.py b/changedetectionio/store.py deleted file mode 100644 index 296faf78..00000000 --- a/changedetectionio/store.py +++ /dev/null @@ -1,1146 +0,0 @@ -import gc -import shutil - -from changedetectionio.strtobool import strtobool - -from changedetectionio.validate_url import is_safe_valid_url - -from flask import ( - flash -) -from flask_babel import gettext - -from .blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT -from .html_tools import TRANSLATE_WHITESPACE_TABLE -from .model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH -from copy import deepcopy, copy -from os import path, unlink -from threading import Lock -import json -import os -import re -import secrets -import sys -import threading -import time -import uuid as uuid_builder -from loguru import logger -from blinker import signal - -# Try to import orjson for faster JSON serialization -try: - import orjson - HAS_ORJSON = True -except ImportError: - HAS_ORJSON = False - -from .processors import get_custom_watch_obj_for_processor -from .processors.restock_diff import Restock - -# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification -BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)' - -dictfilt = lambda x, y: dict([ (i,x[i]) for i in x if i in set(y) ]) - -# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? -# Open a github issue if you know something :) -# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change -class ChangeDetectionStore: - lock = Lock() - # For general updates/writes that can wait a few seconds - needs_write = False - datastore_path = None - # For when we edit, we should write to disk - needs_write_urgent = False - - __version_check = True - save_data_thread = None - - def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"): - # Should only be active for docker - # logging.basicConfig(filename='/dev/stdout', level=logging.INFO) - self.datastore_path = datastore_path - self.needs_write = False - self.start_time = time.time() - self.stop_thread = False - self.save_version_copy_json_db(version_tag) - self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag) - - def save_version_copy_json_db(self, version_tag): - import re - - version_text = re.sub(r'\D+', '-', version_tag) - db_path = os.path.join(self.datastore_path, "url-watches.json") - db_path_version_backup = os.path.join(self.datastore_path, f"url-watches-{version_text}.json") - - if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path): - from shutil import copyfile - logger.info(f"Backing up JSON DB due to new version to '{db_path_version_backup}'.") - copyfile(db_path, db_path_version_backup) - - - def reload_state(self, datastore_path, include_default_watches, version_tag): - logger.info(f"Datastore path is '{datastore_path}'") - - self.__data = App.model() - self.json_store_path = os.path.join(self.datastore_path, "url-watches.json") - # Base definition for all watchers - # deepcopy part of #569 - not sure why its needed exactly - self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={})) - - if path.isfile('changedetectionio/source.txt'): - with open('changedetectionio/source.txt') as f: - # Should be set in Dockerfile to look for /source.txt , this will give us the git commit # - # So when someone gives us a backup file to examine, we know exactly what code they were running. - self.__data['build_sha'] = f.read() - - try: - if HAS_ORJSON: - # orjson.loads() expects UTF-8 encoded bytes #3611 - with open(self.json_store_path, 'rb') as json_file: - from_disk = orjson.loads(json_file.read()) - else: - with open(self.json_store_path, encoding='utf-8') as json_file: - from_disk = json.load(json_file) - - if not from_disk: - # No FileNotFound exception was thrown but somehow the JSON was empty - abort for safety. - logger.critical(f"JSON DB existed but was empty on load - empty JSON file? '{self.json_store_path}' Aborting") - raise Exception('JSON DB existed but was empty on load - Aborting') - - # @todo isnt there a way todo this dict.update recursively? - # Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore. - if 'watching' in from_disk: - self.__data['watching'].update(from_disk['watching']) - - if 'app_guid' in from_disk: - self.__data['app_guid'] = from_disk['app_guid'] - - if 'settings' in from_disk: - if 'headers' in from_disk['settings']: - self.__data['settings']['headers'].update(from_disk['settings']['headers']) - - if 'requests' in from_disk['settings']: - self.__data['settings']['requests'].update(from_disk['settings']['requests']) - - if 'application' in from_disk['settings']: - self.__data['settings']['application'].update(from_disk['settings']['application']) - - # from_disk no longer needed - free memory immediately - del from_disk - gc.collect() - - # Convert each existing watch back to the Watch.model object - for uuid, watch in self.__data['watching'].items(): - self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch) - logger.info(f"Watching: {uuid} {watch['url']}") - - # And for Tags also, should be Restock type because it has extra settings - for uuid, tag in self.__data['settings']['application']['tags'].items(): - self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff') - logger.info(f"Tag: {uuid} {tag['title']}") - - # First time ran, Create the datastore. - except (FileNotFoundError): - if include_default_watches: - logger.critical(f"No JSON DB found at {self.json_store_path}, creating JSON store at {self.datastore_path}") - self.add_watch(url='https://news.ycombinator.com/', - tag='Tech news', - extras={'fetch_backend': 'html_requests'}) - - self.add_watch(url='https://changedetection.io/CHANGELOG.txt', - tag='changedetection.io', - extras={'fetch_backend': 'html_requests'}) - - updates_available = self.get_updates_available() - self.__data['settings']['application']['schema_version'] = updates_available.pop() - - else: - # Bump the update version by running updates - self.run_updates() - - self.__data['version_tag'] = version_tag - - # Just to test that proxies.json if it exists, doesnt throw a parsing error on startup - test_list = self.proxy_list - - # Helper to remove password protection - password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock") - if path.isfile(password_reset_lockfile): - self.__data['settings']['application']['password'] = False - unlink(password_reset_lockfile) - - if not 'app_guid' in self.__data: - if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ: - self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) - else: - self.__data['app_guid'] = str(uuid_builder.uuid4()) - - # Generate the URL access token for RSS feeds - if not self.__data['settings']['application'].get('rss_access_token'): - secret = secrets.token_hex(16) - self.__data['settings']['application']['rss_access_token'] = secret - - # Generate the API access token - if not self.__data['settings']['application'].get('api_access_token'): - secret = secrets.token_hex(16) - self.__data['settings']['application']['api_access_token'] = secret - - self.needs_write = True - - # Finally start the thread that will manage periodic data saves to JSON - # Only start if thread is not already running (reload_state might be called multiple times) - if not self.save_data_thread or not self.save_data_thread.is_alive(): - self.save_data_thread = threading.Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver") - self.save_data_thread.start() - - def rehydrate_entity(self, uuid, entity, processor_override=None): - """Set the dict back to the dict Watch object""" - entity['uuid'] = uuid - - if processor_override: - watch_class = get_custom_watch_obj_for_processor(processor_override) - entity['processor']=processor_override - else: - watch_class = get_custom_watch_obj_for_processor(entity.get('processor')) - - if entity.get('uuid') != 'text_json_diff': - logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}") - - entity = watch_class(datastore_path=self.datastore_path, default=entity) - return entity - - def set_last_viewed(self, uuid, timestamp): - logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}") - self.data['watching'][uuid].update({'last_viewed': int(timestamp)}) - self.needs_write = True - - watch_check_update = signal('watch_check_update') - if watch_check_update: - watch_check_update.send(watch_uuid=uuid) - - def remove_password(self): - self.__data['settings']['application']['password'] = False - self.needs_write = True - - def update_watch(self, uuid, update_obj): - - # It's possible that the watch could be deleted before update - if not self.__data['watching'].get(uuid): - return - - with self.lock: - - # In python 3.9 we have the |= dict operator, but that still will lose data on nested structures... - for dict_key, d in self.generic_definition.items(): - if isinstance(d, dict): - if update_obj is not None and dict_key in update_obj: - self.__data['watching'][uuid][dict_key].update(update_obj[dict_key]) - del (update_obj[dict_key]) - - self.__data['watching'][uuid].update(update_obj) - self.needs_write = True - - @property - def threshold_seconds(self): - seconds = 0 - for m, n in Watch.mtable.items(): - x = self.__data['settings']['requests']['time_between_check'].get(m) - if x: - seconds += x * n - return seconds - - @property - def unread_changes_count(self): - unread_changes_count = 0 - for uuid, watch in self.__data['watching'].items(): - if watch.history_n >= 2 and watch.viewed == False: - unread_changes_count += 1 - - return unread_changes_count - - @property - def data(self): - # Re #152, Return env base_url if not overriden - # Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services - # like 'Join', so it's always best to atleast set something obvious so that they are not broken. - - active_base_url = BASE_URL_NOT_SET_TEXT - if self.__data['settings']['application'].get('base_url'): - active_base_url = self.__data['settings']['application'].get('base_url') - elif os.getenv('BASE_URL'): - active_base_url = os.getenv('BASE_URL') - - # I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable - # even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something - d = self.__data - d['settings']['application']['active_base_url'] = active_base_url.strip('" ') - return d - - from pathlib import Path - - def delete_path(self, path: Path): - import shutil - """Delete a file or directory tree, including the path itself.""" - if not path.exists(): - return - if path.is_file() or path.is_symlink(): - path.unlink(missing_ok=True) # deletes a file or symlink - else: - shutil.rmtree(path, ignore_errors=True) # deletes dir *and* its contents - - # Delete a single watch by UUID - def delete(self, uuid): - import pathlib - - with self.lock: - if uuid == 'all': - self.__data['watching'] = {} - time.sleep(1) # Mainly used for testing to allow all items to flush before running next test - for uuid in self.data['watching']: - path = pathlib.Path( - os.path.join(self.datastore_path, uuid)) - if os.path.exists(path): - self.delete(uuid) - - else: - path = pathlib.Path(os.path.join(self.datastore_path, uuid)) - if os.path.exists(path): - self.delete_path(path) - - del self.data['watching'][uuid] - - self.needs_write_urgent = True - watch_delete_signal = signal('watch_deleted') - if watch_delete_signal: - watch_delete_signal.send(watch_uuid=uuid) - - # Clone a watch by UUID - def clone(self, uuid): - url = self.data['watching'][uuid].get('url') - extras = deepcopy(self.data['watching'][uuid]) - new_uuid = self.add_watch(url=url, extras=extras) - watch = self.data['watching'][new_uuid] - return new_uuid - - def url_exists(self, url): - - # Probably their should be dict... - for watch in self.data['watching'].values(): - if watch['url'].lower() == url.lower(): - return True - - return False - - # Remove a watchs data but keep the entry (URL etc) - def clear_watch_history(self, uuid): - self.__data['watching'][uuid].clear_watch() - self.needs_write_urgent = True - - def add_watch(self, url, tag='', extras=None, tag_uuids=None, write_to_disk_now=True): - - if extras is None: - extras = {} - - # Incase these are copied across, assume it's a reference and deepcopy() - apply_extras = deepcopy(extras) - apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags') - - # Was it a share link? try to fetch the data - if (url.startswith("https://changedetection.io/share/")): - import requests - - try: - r = requests.request(method="GET", - url=url, - # So we know to return the JSON instead of the human-friendly "help" page - headers={'App-Guid': self.__data['app_guid']}, - timeout=5.0) # 5 second timeout to prevent blocking - res = r.json() - - # List of permissible attributes we accept from the wild internet - for k in [ - 'body', - 'browser_steps', - 'css_filter', - 'extract_text', - 'headers', - 'ignore_text', - 'include_filters', - 'method', - 'paused', - 'previous_md5', - 'processor', - 'subtractive_selectors', - 'tag', - 'tags', - 'text_should_not_be_present', - 'title', - 'trigger_text', - 'url', - 'use_page_title_in_list', - 'webdriver_js_execute_code', - ]: - if res.get(k): - if k != 'css_filter': - apply_extras[k] = res[k] - else: - # We renamed the field and made it a list - apply_extras['include_filters'] = [res['css_filter']] - - except Exception as e: - logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") - flash(gettext("Error fetching metadata for {}").format(url), 'error') - return False - - if not is_safe_valid_url(url): - flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error') - - return None - - if tag and type(tag) == str: - # Then it's probably a string of the actual tag by name, split and add it - for t in tag.split(','): - # for each stripped tag, add tag as UUID - for a_t in t.split(','): - tag_uuid = self.add_tag(a_t) - apply_extras['tags'].append(tag_uuid) - - # Or if UUIDs given directly - if tag_uuids: - for t in tag_uuids: - apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()])) - - # Make any uuids unique - if apply_extras.get('tags'): - apply_extras['tags'] = list(set(apply_extras.get('tags'))) - - # If the processor also has its own Watch implementation - watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor')) - new_watch = watch_class(datastore_path=self.datastore_path, url=url) - - new_uuid = new_watch.get('uuid') - - logger.debug(f"Adding URL '{url}' - {new_uuid}") - - for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']: - if k in apply_extras: - del apply_extras[k] - - if not apply_extras.get('date_created'): - apply_extras['date_created'] = int(time.time()) - - new_watch.update(apply_extras) - new_watch.ensure_data_dir_exists() - self.__data['watching'][new_uuid] = new_watch - - if write_to_disk_now: - self.sync_to_json() - - logger.debug(f"Added '{url}'") - - return new_uuid - - def visualselector_data_is_ready(self, watch_uuid): - output_path = os.path.join(self.datastore_path, watch_uuid) - screenshot_filename = os.path.join(output_path, "last-screenshot.png") - elements_index_filename = os.path.join(output_path, "elements.deflate") - if path.isfile(screenshot_filename) and path.isfile(elements_index_filename) : - return True - - return False - - def sync_to_json(self): - logger.info("Saving JSON..") - try: - data = deepcopy(self.__data) - except RuntimeError as e: - # Try again in 15 seconds - time.sleep(1) - logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}") - self.sync_to_json() - return - else: - try: - # Re #286 - First write to a temp file, then confirm it looks OK and rename it - # This is a fairly basic strategy to deal with the case that the file is corrupted, - # system was out of memory, out of RAM etc - if HAS_ORJSON: - # Use orjson for faster serialization - # orjson.dumps() always returns UTF-8 encoded bytes #3611 - with open(self.json_store_path+".tmp", 'wb') as json_file: - json_file.write(orjson.dumps(data, option=orjson.OPT_INDENT_2)) - else: - # Fallback to standard json module - with open(self.json_store_path+".tmp", 'w', encoding='utf-8') as json_file: - json.dump(data, json_file, indent=2, ensure_ascii=False) - os.replace(self.json_store_path+".tmp", self.json_store_path) - except Exception as e: - logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}") - - self.needs_write = False - self.needs_write_urgent = False - - # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON - # by just running periodically in one thread, according to python, dict updates are threadsafe. - def save_datastore(self): - - while True: - if self.stop_thread: - # Suppressing "Logging error in Loguru Handler #0" during CICD. - # Not a meaningful difference for a real use-case just for CICD. - # the side effect is a "Shutting down datastore thread" message - # at the end of each test. - # But still more looking better. - import sys - logger.remove() - logger.add(sys.stderr) - - logger.info(f"Shutting down datastore '{self.datastore_path}' thread") - return - - if self.needs_write or self.needs_write_urgent: - self.sync_to_json() - - # Once per minute is enough, more and it can cause high CPU usage - # better here is to use something like self.app.config.exit.wait(1), but we cant get to 'app' from here - for i in range(120): - time.sleep(0.5) - if self.stop_thread or self.needs_write_urgent: - break - - # Go through the datastore path and remove any snapshots that are not mentioned in the index - # This usually is not used, but can be handy. - def remove_unused_snapshots(self): - logger.info("Removing snapshots from datastore that are not in the index..") - - index=[] - for uuid in self.data['watching']: - for id in self.data['watching'][uuid].history: - index.append(self.data['watching'][uuid].history[str(id)]) - - import pathlib - - # Only in the sub-directories - for uuid in self.data['watching']: - for item in pathlib.Path(self.datastore_path).rglob(uuid+"/*.txt"): - if not str(item) in index: - logger.info(f"Removing {item}") - unlink(item) - - @property - def proxy_list(self): - proxy_list = {} - proxy_list_file = os.path.join(self.datastore_path, 'proxies.json') - - # Load from external config file - if path.isfile(proxy_list_file): - if HAS_ORJSON: - # orjson.loads() expects UTF-8 encoded bytes #3611 - with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f: - proxy_list = orjson.loads(f.read()) - else: - with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f: - proxy_list = json.load(f) - - # Mapping from UI config if available - extras = self.data['settings']['requests'].get('extra_proxies') - if extras: - i=0 - for proxy in extras: - i += 0 - if proxy.get('proxy_name') and proxy.get('proxy_url'): - k = "ui-" + str(i) + proxy.get('proxy_name') - proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')} - - if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')): - proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''} - - return proxy_list if len(proxy_list) else None - - - def get_preferred_proxy_for_watch(self, uuid): - """ - Returns the preferred proxy by ID key - :param uuid: UUID - :return: proxy "key" id - """ - - if self.proxy_list is None: - return None - - # If it's a valid one - watch = self.data['watching'].get(uuid) - - if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy": - return None - - if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()): - return watch.get('proxy') - - # not valid (including None), try the system one - else: - system_proxy_id = self.data['settings']['requests'].get('proxy') - # Is not None and exists - if self.proxy_list.get(system_proxy_id): - return system_proxy_id - - - # Fallback - Did not resolve anything, or doesnt exist, use the first available - if system_proxy_id is None or not self.proxy_list.get(system_proxy_id): - first_default = list(self.proxy_list)[0] - return first_default - - return None - - @property - def has_extra_headers_file(self): - filepath = os.path.join(self.datastore_path, 'headers.txt') - return os.path.isfile(filepath) - - def get_all_base_headers(self): - headers = {} - # Global app settings - headers.update(self.data['settings'].get('headers', {})) - - return headers - - def get_all_headers_in_textfile_for_watch(self, uuid): - from .model.App import parse_headers_from_text_file - headers = {} - - # Global in /datastore/headers.txt - filepath = os.path.join(self.datastore_path, 'headers.txt') - try: - if os.path.isfile(filepath): - headers.update(parse_headers_from_text_file(filepath)) - except Exception as e: - logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") - - watch = self.data['watching'].get(uuid) - if watch: - - # In /datastore/xyz-xyz/headers.txt - filepath = os.path.join(watch.watch_data_dir, 'headers.txt') - try: - if os.path.isfile(filepath): - headers.update(parse_headers_from_text_file(filepath)) - except Exception as e: - logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") - - # In /datastore/tag-name.txt - tags = self.get_all_tags_for_watch(uuid=uuid) - for tag_uuid, tag in tags.items(): - fname = "headers-"+re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt" - filepath = os.path.join(self.datastore_path, fname) - try: - if os.path.isfile(filepath): - headers.update(parse_headers_from_text_file(filepath)) - except Exception as e: - logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") - - return headers - - def get_tag_overrides_for_watch(self, uuid, attr): - tags = self.get_all_tags_for_watch(uuid=uuid) - ret = [] - - if tags: - for tag_uuid, tag in tags.items(): - if attr in tag and tag[attr]: - ret=[*ret, *tag[attr]] - - return ret - - def add_tag(self, title): - # If name exists, return that - n = title.strip().lower() - logger.debug(f">>> Adding new tag - '{n}'") - if not n: - return False - - for uuid, tag in self.__data['settings']['application'].get('tags', {}).items(): - if n == tag.get('title', '').lower().strip(): - logger.warning(f"Tag '{title}' already exists, skipping creation.") - return uuid - - # Eventually almost everything todo with a watch will apply as a Tag - # So we use the same model as a Watch - with self.lock: - from .model import Tag - new_tag = Tag.model(datastore_path=self.datastore_path, default={ - 'title': title.strip(), - 'date_created': int(time.time()) - }) - - new_uuid = new_tag.get('uuid') - - self.__data['settings']['application']['tags'][new_uuid] = new_tag - - return new_uuid - - def get_all_tags_for_watch(self, uuid): - """This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet""" - watch = self.data['watching'].get(uuid) - - # Should return a dict of full tag info linked by UUID - if watch: - return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', [])) - - return {} - - @property - def extra_browsers(self): - res = [] - p = list(filter( - lambda s: (s.get('browser_name') and s.get('browser_connection_url')), - self.__data['settings']['requests'].get('extra_browsers', []))) - if p: - for i in p: - res.append(("extra_browser_"+i['browser_name'], i['browser_name'])) - - return res - - def tag_exists_by_name(self, tag_name): - # Check if any tag dictionary has a 'title' attribute matching the provided tag_name - tags = self.__data['settings']['application']['tags'].values() - return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()), - None) - - def any_watches_have_processor_by_name(self, processor_name): - for watch in self.data['watching'].values(): - if watch.get('processor') == processor_name: - return True - return False - - def search_watches_for_url(self, query, tag_limit=None, partial=False): - """Search watches by URL, title, or error messages - - Args: - query (str): Search term to match against watch URLs, titles, and error messages - tag_limit (str, optional): Optional tag name to limit search results - partial: (bool, optional): sub-string matching - - Returns: - list: List of UUIDs of watches that match the search criteria - """ - matching_uuids = [] - query = query.lower().strip() - tag = self.tag_exists_by_name(tag_limit) if tag_limit else False - - for uuid, watch in self.data['watching'].items(): - # Filter by tag if requested - if tag_limit: - if not tag.get('uuid') in watch.get('tags', []): - continue - - # Search in URL, title, or error messages - if partial: - if ((watch.get('title') and query in watch.get('title').lower()) or - query in watch.get('url', '').lower() or - (watch.get('last_error') and query in watch.get('last_error').lower())): - matching_uuids.append(uuid) - else: - if ((watch.get('title') and query == watch.get('title').lower()) or - query == watch.get('url', '').lower() or - (watch.get('last_error') and query == watch.get('last_error').lower())): - matching_uuids.append(uuid) - - return matching_uuids - - def get_unique_notification_tokens_available(self): - # Ask each type of watch if they have any extra notification token to add to the validation - extra_notification_tokens = {} - watch_processors_checked = set() - - for watch_uuid, watch in self.__data['watching'].items(): - processor = watch.get('processor') - if processor not in watch_processors_checked: - extra_notification_tokens.update(watch.extra_notification_token_values()) - watch_processors_checked.add(processor) - - return extra_notification_tokens - - def get_unique_notification_token_placeholders_available(self): - # The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice - extra_notification_tokens = [] - watch_processors_checked = set() - - for watch_uuid, watch in self.__data['watching'].items(): - processor = watch.get('processor') - if processor not in watch_processors_checked: - extra_notification_tokens+=watch.extra_notification_token_placeholder_info() - watch_processors_checked.add(processor) - - return extra_notification_tokens - - - def get_updates_available(self): - import inspect - updates_available = [] - for i, o in inspect.getmembers(self, predicate=inspect.ismethod): - m = re.search(r'update_(\d+)$', i) - if m: - updates_available.append(int(m.group(1))) - updates_available.sort() - - return updates_available - - def add_notification_url(self, notification_url): - - logger.debug(f">>> Adding new notification_url - '{notification_url}'") - - notification_urls = self.data['settings']['application'].get('notification_urls', []) - - if notification_url in notification_urls: - return notification_url - - with self.lock: - notification_urls = self.__data['settings']['application'].get('notification_urls', []) - - if notification_url in notification_urls: - return notification_url - - # Append and update the datastore - notification_urls.append(notification_url) - self.__data['settings']['application']['notification_urls'] = notification_urls - self.needs_write = True - - return notification_url - - # Run all updates - # IMPORTANT - Each update could be run even when they have a new install and the schema is correct - # So therefor - each `update_n` should be very careful about checking if it needs to actually run - # Probably we should bump the current update schema version with each tag release version? - def run_updates(self): - import shutil - updates_available = self.get_updates_available() - for update_n in updates_available: - if update_n > self.__data['settings']['application']['schema_version']: - logger.critical(f"Applying update_{update_n}") - # Wont exist on fresh installs - if os.path.exists(self.json_store_path): - i = 0 - while True: - i+=1 - dest = os.path.join(self.datastore_path, f"url-watches-before-{update_n}-{i}.json") - if not os.path.exists(dest): - logger.debug(f"Copying url-watches.json DB to '{dest}' backup.") - shutil.copyfile(self.json_store_path, dest) - break - else: - logger.warning(f"Backup of url-watches.json '{dest}', DB already exists, trying {i+1}.. ") - - try: - update_method = getattr(self, f"update_{update_n}")() - except Exception as e: - logger.error(f"Error while trying update_{update_n}") - logger.error(e) - # Don't run any more updates - return - else: - # Bump the version, important - self.__data['settings']['application']['schema_version'] = update_n - - # Convert minutes to seconds on settings and each watch - def update_1(self): - if self.data['settings']['requests'].get('minutes_between_check'): - self.data['settings']['requests']['time_between_check']['minutes'] = self.data['settings']['requests']['minutes_between_check'] - # Remove the default 'hours' that is set from the model - self.data['settings']['requests']['time_between_check']['hours'] = None - - for uuid, watch in self.data['watching'].items(): - if 'minutes_between_check' in watch: - # Only upgrade individual watch time if it was set - if watch.get('minutes_between_check', False): - self.data['watching'][uuid]['time_between_check']['minutes'] = watch['minutes_between_check'] - - # Move the history list to a flat text file index - # Better than SQLite because this list is only appended to, and works across NAS / NFS type setups - def update_2(self): - # @todo test running this on a newly updated one (when this already ran) - for uuid, watch in self.data['watching'].items(): - history = [] - - if watch.get('history', False): - for d, p in watch['history'].items(): - d = int(d) # Used to be keyed as str, we'll fix this now too - history.append("{},{}\n".format(d,p)) - - if len(history): - target_path = os.path.join(self.datastore_path, uuid) - if os.path.exists(target_path): - with open(os.path.join(target_path, "history.txt"), "w") as f: - f.writelines(history) - else: - logger.warning(f"Datastore history directory {target_path} does not exist, skipping history import.") - - # No longer needed, dynamically pulled from the disk when needed. - # But we should set it back to a empty dict so we don't break if this schema runs on an earlier version. - # In the distant future we can remove this entirely - self.data['watching'][uuid]['history'] = {} - - # We incorrectly stored last_changed when there was not a change, and then confused the output list table - def update_3(self): - # see https://github.com/dgtlmoon/changedetection.io/pull/835 - return - - # `last_changed` not needed, we pull that information from the history.txt index - def update_4(self): - for uuid, watch in self.data['watching'].items(): - try: - # Remove it from the struct - del(watch['last_changed']) - except: - continue - return - - def update_5(self): - # If the watch notification body, title look the same as the global one, unset it, so the watch defaults back to using the main settings - # In other words - the watch notification_title and notification_body are not needed if they are the same as the default one - current_system_body = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE) - current_system_title = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE) - for uuid, watch in self.data['watching'].items(): - try: - watch_body = watch.get('notification_body', '') - if watch_body and watch_body.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_body: - # Looks the same as the default one, so unset it - watch['notification_body'] = None - - watch_title = watch.get('notification_title', '') - if watch_title and watch_title.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_title: - # Looks the same as the default one, so unset it - watch['notification_title'] = None - except Exception as e: - continue - return - - - # We incorrectly used common header overrides that should only apply to Requests - # These are now handled in content_fetcher::html_requests and shouldnt be passed to Playwright/Selenium - def update_7(self): - # These were hard-coded in early versions - for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']: - if self.data['settings']['headers'].get(v): - del self.data['settings']['headers'][v] - - # Convert filters to a list of filters css_filter -> include_filters - def update_8(self): - for uuid, watch in self.data['watching'].items(): - try: - existing_filter = watch.get('css_filter', '') - if existing_filter: - watch['include_filters'] = [existing_filter] - except: - continue - return - - # Convert old static notification tokens to jinja2 tokens - def update_9(self): - # Each watch - import re - # only { } not {{ or }} - r = r'(? '{key_exists_as_value}'") - self.data['settings']['application']['notification_format'] = key_exists_as_value - - for uuid, watch in self.data['watching'].items(): - n_format = self.data['watching'][uuid].get('notification_format') - key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None) - if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text" - logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") - self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever - - for uuid, tag in self.data['settings']['application']['tags'].items(): - n_format = self.data['settings']['application']['tags'][uuid].get('notification_format') - key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None) - if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text" - logger.success( - f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") - self.data['settings']['application']['tags'][uuid][ - 'notification_format'] = key_exists_as_value # should be 'text' or whatever - - from .notification import valid_notification_formats - formats = deepcopy(valid_notification_formats) - re_run(formats) - # And in previous versions, it was "text" instead of Plain text, Markdown instead of "Markdown to HTML" - formats['text'] = 'Text' - formats['markdown'] = 'Markdown' - re_run(formats) - - - # RSS types should be inline with the same names as notification types - def update_24(self): - rss_format = self.data['settings']['application'].get('rss_content_format') - if not rss_format or 'text' in rss_format: - # might have been 'plaintext, 'plain text' or something - self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT - elif 'html' in rss_format: - self.data['settings']['application']['rss_content_format'] = 'htmlcolor' - else: - # safe fallback to text - self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT - - # Different processors now hold their own history.txt - def update_25(self): - for uuid, watch in self.data['watching'].items(): - processor = self.data['watching'][uuid].get('processor') - if processor != 'text_json_diff': - old_history_txt = os.path.join(self.datastore_path, "history.txt") - target_history_name = f"history-{processor}.txt" - if os.path.isfile(old_history_txt) and not os.path.isfile(target_history_name): - new_history_txt = os.path.join(self.datastore_path, target_history_name) - logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...") - shutil.move(old_history_txt, new_history_txt) diff --git a/changedetectionio/store/__init__.py b/changedetectionio/store/__init__.py new file mode 100644 index 00000000..0353432d --- /dev/null +++ b/changedetectionio/store/__init__.py @@ -0,0 +1,973 @@ +import shutil + +from changedetectionio.strtobool import strtobool + +from changedetectionio.validate_url import is_safe_valid_url + +from flask import ( + flash +) +from flask_babel import gettext + +from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT +from ..html_tools import TRANSLATE_WHITESPACE_TABLE +from ..model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH +from copy import deepcopy, copy +from os import path, unlink +from threading import Lock +import json +import os +import re +import secrets +import sys +import threading +import time +import uuid as uuid_builder +from loguru import logger +from blinker import signal + +# Try to import orjson for faster JSON serialization +try: + import orjson + + HAS_ORJSON = True +except ImportError: + HAS_ORJSON = False + +from ..processors import get_custom_watch_obj_for_processor +from ..processors.restock_diff import Restock + +# Import the base class and helpers +from .file_saving_datastore import FileSavingDataStore, load_all_watches, save_watch_atomic, save_json_atomic +from .updates import DatastoreUpdatesMixin +from .legacy_loader import has_legacy_datastore + +# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification +BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)' + +dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) + + +# Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? +# Open a github issue if you know something :) +# https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change +class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore): + __version_check = True + + def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"): + # Initialize parent class + super().__init__() + + # Should only be active for docker + # logging.basicConfig(filename='/dev/stdout', level=logging.INFO) + self.datastore_path = datastore_path + self.needs_write = False + self.start_time = time.time() + self.stop_thread = False + self.save_version_copy_json_db(version_tag) + self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag) + + def save_version_copy_json_db(self, version_tag): + """ + Create version-tagged backup of changedetection.json. + + This is called on version upgrades to preserve a backup in case + the new version has issues. + """ + import re + + version_text = re.sub(r'\D+', '-', version_tag) + db_path = os.path.join(self.datastore_path, "changedetection.json") + db_path_version_backup = os.path.join(self.datastore_path, f"changedetection-{version_text}.json") + + if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path): + from shutil import copyfile + logger.info(f"Backing up changedetection.json due to new version to '{db_path_version_backup}'.") + copyfile(db_path, db_path_version_backup) + + def _load_settings(self): + """ + Load settings from storage. + + File backend implementation: reads from changedetection.json + + Returns: + dict: Settings data loaded from storage + """ + changedetection_json = os.path.join(self.datastore_path, "changedetection.json") + + logger.info(f"Loading settings from {changedetection_json}") + + if HAS_ORJSON: + with open(changedetection_json, 'rb') as f: + return orjson.loads(f.read()) + else: + with open(changedetection_json, 'r', encoding='utf-8') as f: + return json.load(f) + + def _apply_settings(self, settings_data): + """ + Apply loaded settings data to internal data structure. + + Args: + settings_data: Dictionary loaded from changedetection.json + """ + # Apply top-level fields + if 'app_guid' in settings_data: + self.__data['app_guid'] = settings_data['app_guid'] + if 'build_sha' in settings_data: + self.__data['build_sha'] = settings_data['build_sha'] + if 'version_tag' in settings_data: + self.__data['version_tag'] = settings_data['version_tag'] + + # Apply settings sections + if 'settings' in settings_data: + if 'headers' in settings_data['settings']: + self.__data['settings']['headers'].update(settings_data['settings']['headers']) + if 'requests' in settings_data['settings']: + self.__data['settings']['requests'].update(settings_data['settings']['requests']) + if 'application' in settings_data['settings']: + self.__data['settings']['application'].update(settings_data['settings']['application']) + + def _rehydrate_tags(self): + """Rehydrate tag entities from stored data.""" + for uuid, tag in self.__data['settings']['application']['tags'].items(): + self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity( + uuid, tag, processor_override='restock_diff' + ) + logger.info(f"Tag: {uuid} {tag['title']}") + + + def _load_state(self): + """ + Load complete datastore state from storage. + + Orchestrates loading of settings and watches using polymorphic methods. + """ + # Load settings + settings_data = self._load_settings() + self._apply_settings(settings_data) + + # Load watches (polymorphic - parent class method) + self._load_watches() + + # Rehydrate tags + self._rehydrate_tags() + + def reload_state(self, datastore_path, include_default_watches, version_tag): + """ + Load datastore from storage or create new one. + + Supports two scenarios: + 1. NEW format: changedetection.json exists → load and run updates if needed + 2. EMPTY: No changedetection.json → create new OR trigger migration from legacy + + Note: Legacy url-watches.json migration happens in update_26, not here. + """ + logger.info(f"Datastore path is '{datastore_path}'") + + # Initialize data structure + self.__data = App.model() + self.json_store_path = os.path.join(self.datastore_path, "changedetection.json") + + # Base definition for all watchers (deepcopy part of #569) + self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={})) + + # Load build SHA if available (Docker deployments) + if path.isfile('changedetectionio/source.txt'): + with open('changedetectionio/source.txt') as f: + self.__data['build_sha'] = f.read() + + # Check if datastore already exists + changedetection_json = os.path.join(self.datastore_path, "changedetection.json") + + if os.path.exists(changedetection_json): + # Load existing datastore (changedetection.json + watch.json files) + logger.info("Loading existing datastore") + try: + self._load_state() + except Exception as e: + logger.critical(f"Failed to load datastore: {e}") + raise + + # Run schema updates if needed + # Pass current schema version from loaded datastore (defaults to 0 if not set) + current_schema = self.__data['settings']['application'].get('schema_version', 0) + self.run_updates(current_schema_version=current_schema) + + else: + # No datastore yet - check if this is a fresh install or legacy migration + # Generate app_guid FIRST (required for all operations) + if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ: + self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) + else: + self.__data['app_guid'] = str(uuid_builder.uuid4()) + + # Generate RSS access token + self.__data['settings']['application']['rss_access_token'] = secrets.token_hex(16) + + # Generate API access token + self.__data['settings']['application']['api_access_token'] = secrets.token_hex(16) + + # Check if legacy datastore exists (url-watches.json) + if has_legacy_datastore(self.datastore_path): + # Legacy datastore detected - trigger migration + logger.critical(f"Legacy datastore detected at {self.datastore_path}/url-watches.json") + logger.critical("Migration will be triggered via update_26") + + # Load the legacy datastore to get its schema_version + from .legacy_loader import load_legacy_format + legacy_path = os.path.join(self.datastore_path, "url-watches.json") + legacy_data = load_legacy_format(legacy_path) + + if not legacy_data: + raise Exception("Failed to load legacy datastore from url-watches.json") + + # Get the schema version from legacy datastore (defaults to 0 if not present) + legacy_schema_version = legacy_data.get('settings', {}).get('application', {}).get('schema_version', 0) + logger.info(f"Legacy datastore schema version: {legacy_schema_version}") + + # Set our schema version to match the legacy one + self.__data['settings']['application']['schema_version'] = legacy_schema_version + + # update_26 will load the legacy data again and migrate to new format + # Only run updates AFTER the legacy schema version (e.g., if legacy is at 25, only run 26+) + self.run_updates(current_schema_version=legacy_schema_version) + + else: + # Fresh install - create new datastore + logger.critical(f"No datastore found, creating new datastore at {self.datastore_path}") + + # Set schema version to latest (no updates needed) + updates_available = self.get_updates_available() + self.__data['settings']['application']['schema_version'] = updates_available.pop() if updates_available else 26 + + # Add default watches if requested + if include_default_watches: + self.add_watch( + url='https://news.ycombinator.com/', + tag='Tech news', + extras={'fetch_backend': 'html_requests'} + ) + self.add_watch( + url='https://changedetection.io/CHANGELOG.txt', + tag='changedetection.io', + extras={'fetch_backend': 'html_requests'} + ) + + # Create changedetection.json immediately + try: + self._save_settings() + logger.info("Created changedetection.json for new datastore") + except Exception as e: + logger.error(f"Failed to create initial changedetection.json: {e}") + + # Set version tag + self.__data['version_tag'] = version_tag + + # Validate proxies.json if it exists + _ = self.proxy_list # Just to test parsing + + # Ensure app_guid exists (for datastores loaded from existing files) + if 'app_guid' not in self.__data: + if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ: + self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) + else: + self.__data['app_guid'] = str(uuid_builder.uuid4()) + self.mark_settings_dirty() + + # Ensure RSS access token exists + if not self.__data['settings']['application'].get('rss_access_token'): + secret = secrets.token_hex(16) + self.__data['settings']['application']['rss_access_token'] = secret + self.mark_settings_dirty() + + # Ensure API access token exists + if not self.__data['settings']['application'].get('api_access_token'): + secret = secrets.token_hex(16) + self.__data['settings']['application']['api_access_token'] = secret + self.mark_settings_dirty() + + # Handle password reset lockfile + password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock") + if path.isfile(password_reset_lockfile): + self.remove_password() + unlink(password_reset_lockfile) + + # Start the background save thread + self.start_save_thread() + + def rehydrate_entity(self, uuid, entity, processor_override=None): + """Set the dict back to the dict Watch object""" + entity['uuid'] = uuid + + if processor_override: + watch_class = get_custom_watch_obj_for_processor(processor_override) + entity['processor'] = processor_override + else: + watch_class = get_custom_watch_obj_for_processor(entity.get('processor')) + + if entity.get('uuid') != 'text_json_diff': + logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}") + + entity = watch_class(datastore_path=self.datastore_path, default=entity) + return entity + + # ============================================================================ + # FileSavingDataStore Abstract Method Implementations + # ============================================================================ + + def _watch_exists(self, uuid): + """Check if watch exists in datastore.""" + return uuid in self.__data['watching'] + + def _get_watch_dict(self, uuid): + """Get watch as dictionary.""" + return dict(self.__data['watching'][uuid]) + + def _build_settings_data(self): + """ + Build settings data structure for saving. + + Returns: + dict: Settings data ready for serialization + """ + return { + 'note': 'Settings file - watches are stored in individual {uuid}/watch.json files', + 'app_guid': self.__data['app_guid'], + 'settings': self.__data['settings'], + 'build_sha': self.__data.get('build_sha'), + 'version_tag': self.__data.get('version_tag') + } + + def _save_settings(self): + """ + Save settings to storage. + + File backend implementation: saves to changedetection.json + Implementation of abstract method from FileSavingDataStore. + Uses the generic save_json_atomic helper. + + Raises: + OSError: If disk is full or other I/O error + """ + settings_data = self._build_settings_data() + changedetection_json = os.path.join(self.datastore_path, "changedetection.json") + save_json_atomic(changedetection_json, settings_data, label="settings", max_size_mb=10) + + def _load_watches(self): + """ + Load all watches from storage. + + File backend implementation: reads individual watch.json files + Implementation of abstract method from FileSavingDataStore. + Delegates to helper function and stores results in internal data structure. + """ + watching, watch_hashes = load_all_watches( + self.datastore_path, + self.rehydrate_entity, + self._compute_hash + ) + + # Store loaded data + self.__data['watching'] = watching + self._watch_hashes = watch_hashes + + def _delete_watch(self, uuid): + """ + Delete a watch from storage. + + File backend implementation: deletes entire {uuid}/ directory recursively. + Implementation of abstract method from FileSavingDataStore. + + Args: + uuid: Watch UUID to delete + """ + watch_dir = os.path.join(self.datastore_path, uuid) + if os.path.exists(watch_dir): + shutil.rmtree(watch_dir) + logger.info(f"Deleted watch directory: {watch_dir}") + + # ============================================================================ + # Watch Management Methods + # ============================================================================ + + def set_last_viewed(self, uuid, timestamp): + logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}") + self.data['watching'][uuid].update({'last_viewed': int(timestamp)}) + self.mark_watch_dirty(uuid) + + watch_check_update = signal('watch_check_update') + if watch_check_update: + watch_check_update.send(watch_uuid=uuid) + + def remove_password(self): + self.__data['settings']['application']['password'] = False + self.mark_settings_dirty() + + def update_watch(self, uuid, update_obj): + + # It's possible that the watch could be deleted before update + if not self.__data['watching'].get(uuid): + return + + with self.lock: + + # In python 3.9 we have the |= dict operator, but that still will lose data on nested structures... + for dict_key, d in self.generic_definition.items(): + if isinstance(d, dict): + if update_obj is not None and dict_key in update_obj: + self.__data['watching'][uuid][dict_key].update(update_obj[dict_key]) + del (update_obj[dict_key]) + + self.__data['watching'][uuid].update(update_obj) + + self.mark_watch_dirty(uuid) + + @property + def threshold_seconds(self): + seconds = 0 + for m, n in Watch.mtable.items(): + x = self.__data['settings']['requests']['time_between_check'].get(m) + if x: + seconds += x * n + return seconds + + @property + def unread_changes_count(self): + unread_changes_count = 0 + for uuid, watch in self.__data['watching'].items(): + if watch.history_n >= 2 and watch.viewed == False: + unread_changes_count += 1 + + return unread_changes_count + + @property + def data(self): + # Re #152, Return env base_url if not overriden + # Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services + # like 'Join', so it's always best to atleast set something obvious so that they are not broken. + + active_base_url = BASE_URL_NOT_SET_TEXT + if self.__data['settings']['application'].get('base_url'): + active_base_url = self.__data['settings']['application'].get('base_url') + elif os.getenv('BASE_URL'): + active_base_url = os.getenv('BASE_URL') + + # I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable + # even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something + d = self.__data + d['settings']['application']['active_base_url'] = active_base_url.strip('" ') + return d + + # Delete a single watch by UUID + def delete(self, uuid): + """ + Delete a watch by UUID. + + Uses abstracted storage method for backend-agnostic deletion. + Supports 'all' to delete all watches (mainly for testing). + + Args: + uuid: Watch UUID to delete, or 'all' to delete all watches + """ + with self.lock: + if uuid == 'all': + # Delete all watches - capture UUIDs first before modifying dict + all_uuids = list(self.__data['watching'].keys()) + + for watch_uuid in all_uuids: + # Delete from storage using polymorphic method + try: + self._delete_watch(watch_uuid) + except Exception as e: + logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}") + + # Clean up tracking data + self._watch_hashes.pop(watch_uuid, None) + self._dirty_watches.discard(watch_uuid) + + # Send delete signal + watch_delete_signal = signal('watch_deleted') + if watch_delete_signal: + watch_delete_signal.send(watch_uuid=watch_uuid) + + # Clear the dict + self.__data['watching'] = {} + + # Mainly used for testing to allow all items to flush before running next test + time.sleep(1) + + else: + # Delete single watch from storage using polymorphic method + try: + self._delete_watch(uuid) + except Exception as e: + logger.error(f"Failed to delete watch {uuid} from storage: {e}") + + # Remove from watching dict + del self.data['watching'][uuid] + + # Clean up tracking data + self._watch_hashes.pop(uuid, None) + self._dirty_watches.discard(uuid) + + # Send delete signal + watch_delete_signal = signal('watch_deleted') + if watch_delete_signal: + watch_delete_signal.send(watch_uuid=uuid) + + self.needs_write_urgent = True + + # Clone a watch by UUID + def clone(self, uuid): + url = self.data['watching'][uuid].get('url') + extras = deepcopy(self.data['watching'][uuid]) + new_uuid = self.add_watch(url=url, extras=extras) + watch = self.data['watching'][new_uuid] + return new_uuid + + def url_exists(self, url): + + # Probably their should be dict... + for watch in self.data['watching'].values(): + if watch['url'].lower() == url.lower(): + return True + + return False + + # Remove a watchs data but keep the entry (URL etc) + def clear_watch_history(self, uuid): + self.__data['watching'][uuid].clear_watch() + self.needs_write_urgent = True + + def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True): + + if extras is None: + extras = {} + + # Incase these are copied across, assume it's a reference and deepcopy() + apply_extras = deepcopy(extras) + apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags') + + # Was it a share link? try to fetch the data + if (url.startswith("https://changedetection.io/share/")): + import requests + + try: + r = requests.request(method="GET", + url=url, + # So we know to return the JSON instead of the human-friendly "help" page + headers={'App-Guid': self.__data['app_guid']}, + timeout=5.0) # 5 second timeout to prevent blocking + res = r.json() + + # List of permissible attributes we accept from the wild internet + for k in [ + 'body', + 'browser_steps', + 'css_filter', + 'extract_text', + 'headers', + 'ignore_text', + 'include_filters', + 'method', + 'paused', + 'previous_md5', + 'processor', + 'subtractive_selectors', + 'tag', + 'tags', + 'text_should_not_be_present', + 'title', + 'trigger_text', + 'url', + 'use_page_title_in_list', + 'webdriver_js_execute_code', + ]: + if res.get(k): + if k != 'css_filter': + apply_extras[k] = res[k] + else: + # We renamed the field and made it a list + apply_extras['include_filters'] = [res['css_filter']] + + except Exception as e: + logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") + flash(gettext("Error fetching metadata for {}").format(url), 'error') + return False + + if not is_safe_valid_url(url): + flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error') + + return None + + if tag and type(tag) == str: + # Then it's probably a string of the actual tag by name, split and add it + for t in tag.split(','): + # for each stripped tag, add tag as UUID + for a_t in t.split(','): + tag_uuid = self.add_tag(a_t) + apply_extras['tags'].append(tag_uuid) + + # Or if UUIDs given directly + if tag_uuids: + for t in tag_uuids: + apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()])) + + # Make any uuids unique + if apply_extras.get('tags'): + apply_extras['tags'] = list(set(apply_extras.get('tags'))) + + # If the processor also has its own Watch implementation + watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor')) + new_watch = watch_class(datastore_path=self.datastore_path, url=url) + + new_uuid = new_watch.get('uuid') + + logger.debug(f"Adding URL '{url}' - {new_uuid}") + + for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']: + if k in apply_extras: + del apply_extras[k] + + if not apply_extras.get('date_created'): + apply_extras['date_created'] = int(time.time()) + + new_watch.update(apply_extras) + new_watch.ensure_data_dir_exists() + self.__data['watching'][new_uuid] = new_watch + + if save_immediately: + # Save immediately using polymorphic method + try: + self.save_watch(new_uuid, force=True) + logger.debug(f"Saved new watch {new_uuid}") + except Exception as e: + logger.error(f"Failed to save new watch {new_uuid}: {e}") + # Mark dirty for retry + self.mark_watch_dirty(new_uuid) + else: + self.mark_watch_dirty(new_uuid) + + logger.debug(f"Added '{url}'") + + return new_uuid + + def _watch_resource_exists(self, watch_uuid, resource_name): + """ + Check if a watch-related resource exists. + + File backend implementation: checks if file exists in watch directory. + + Args: + watch_uuid: Watch UUID + resource_name: Name of resource (e.g., "last-screenshot.png") + + Returns: + bool: True if resource exists + """ + resource_path = os.path.join(self.datastore_path, watch_uuid, resource_name) + return path.isfile(resource_path) + + def visualselector_data_is_ready(self, watch_uuid): + """ + Check if visual selector data (screenshot + elements) is ready. + + Returns: + bool: True if both screenshot and elements data exist + """ + has_screenshot = self._watch_resource_exists(watch_uuid, "last-screenshot.png") + has_elements = self._watch_resource_exists(watch_uuid, "elements.deflate") + return has_screenshot and has_elements + + # Old sync_to_json and save_datastore methods removed - now handled by FileSavingDataStore parent class + + # Go through the datastore path and remove any snapshots that are not mentioned in the index + # This usually is not used, but can be handy. + def remove_unused_snapshots(self): + logger.info("Removing snapshots from datastore that are not in the index..") + + index = [] + for uuid in self.data['watching']: + for id in self.data['watching'][uuid].history: + index.append(self.data['watching'][uuid].history[str(id)]) + + import pathlib + + # Only in the sub-directories + for uuid in self.data['watching']: + for item in pathlib.Path(self.datastore_path).rglob(uuid + "/*.txt"): + if not str(item) in index: + logger.info(f"Removing {item}") + unlink(item) + + @property + def proxy_list(self): + proxy_list = {} + proxy_list_file = os.path.join(self.datastore_path, 'proxies.json') + + # Load from external config file + if path.isfile(proxy_list_file): + if HAS_ORJSON: + # orjson.loads() expects UTF-8 encoded bytes #3611 + with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f: + proxy_list = orjson.loads(f.read()) + else: + with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f: + proxy_list = json.load(f) + + # Mapping from UI config if available + extras = self.data['settings']['requests'].get('extra_proxies') + if extras: + i = 0 + for proxy in extras: + i += 0 + if proxy.get('proxy_name') and proxy.get('proxy_url'): + k = "ui-" + str(i) + proxy.get('proxy_name') + proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')} + + if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')): + proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''} + + return proxy_list if len(proxy_list) else None + + def get_preferred_proxy_for_watch(self, uuid): + """ + Returns the preferred proxy by ID key + :param uuid: UUID + :return: proxy "key" id + """ + + if self.proxy_list is None: + return None + + # If it's a valid one + watch = self.data['watching'].get(uuid) + + if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy": + return None + + if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()): + return watch.get('proxy') + + # not valid (including None), try the system one + else: + system_proxy_id = self.data['settings']['requests'].get('proxy') + # Is not None and exists + if self.proxy_list.get(system_proxy_id): + return system_proxy_id + + # Fallback - Did not resolve anything, or doesnt exist, use the first available + if system_proxy_id is None or not self.proxy_list.get(system_proxy_id): + first_default = list(self.proxy_list)[0] + return first_default + + return None + + @property + def has_extra_headers_file(self): + filepath = os.path.join(self.datastore_path, 'headers.txt') + return os.path.isfile(filepath) + + def get_all_base_headers(self): + headers = {} + # Global app settings + headers.update(self.data['settings'].get('headers', {})) + + return headers + + def get_all_headers_in_textfile_for_watch(self, uuid): + from ..model.App import parse_headers_from_text_file + headers = {} + + # Global in /datastore/headers.txt + filepath = os.path.join(self.datastore_path, 'headers.txt') + try: + if os.path.isfile(filepath): + headers.update(parse_headers_from_text_file(filepath)) + except Exception as e: + logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") + + watch = self.data['watching'].get(uuid) + if watch: + + # In /datastore/xyz-xyz/headers.txt + filepath = os.path.join(watch.watch_data_dir, 'headers.txt') + try: + if os.path.isfile(filepath): + headers.update(parse_headers_from_text_file(filepath)) + except Exception as e: + logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") + + # In /datastore/tag-name.txt + tags = self.get_all_tags_for_watch(uuid=uuid) + for tag_uuid, tag in tags.items(): + fname = "headers-" + re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt" + filepath = os.path.join(self.datastore_path, fname) + try: + if os.path.isfile(filepath): + headers.update(parse_headers_from_text_file(filepath)) + except Exception as e: + logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") + + return headers + + def get_tag_overrides_for_watch(self, uuid, attr): + tags = self.get_all_tags_for_watch(uuid=uuid) + ret = [] + + if tags: + for tag_uuid, tag in tags.items(): + if attr in tag and tag[attr]: + ret = [*ret, *tag[attr]] + + return ret + + def add_tag(self, title): + # If name exists, return that + n = title.strip().lower() + logger.debug(f">>> Adding new tag - '{n}'") + if not n: + return False + + for uuid, tag in self.__data['settings']['application'].get('tags', {}).items(): + if n == tag.get('title', '').lower().strip(): + logger.warning(f"Tag '{title}' already exists, skipping creation.") + return uuid + + # Eventually almost everything todo with a watch will apply as a Tag + # So we use the same model as a Watch + with self.lock: + from ..model import Tag + new_tag = Tag.model(datastore_path=self.datastore_path, default={ + 'title': title.strip(), + 'date_created': int(time.time()) + }) + + new_uuid = new_tag.get('uuid') + + self.__data['settings']['application']['tags'][new_uuid] = new_tag + + self.mark_settings_dirty() + return new_uuid + + def get_all_tags_for_watch(self, uuid): + """This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet""" + watch = self.data['watching'].get(uuid) + + # Should return a dict of full tag info linked by UUID + if watch: + return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', [])) + + return {} + + @property + def extra_browsers(self): + res = [] + p = list(filter( + lambda s: (s.get('browser_name') and s.get('browser_connection_url')), + self.__data['settings']['requests'].get('extra_browsers', []))) + if p: + for i in p: + res.append(("extra_browser_" + i['browser_name'], i['browser_name'])) + + return res + + def tag_exists_by_name(self, tag_name): + # Check if any tag dictionary has a 'title' attribute matching the provided tag_name + tags = self.__data['settings']['application']['tags'].values() + return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()), + None) + + def any_watches_have_processor_by_name(self, processor_name): + for watch in self.data['watching'].values(): + if watch.get('processor') == processor_name: + return True + return False + + def search_watches_for_url(self, query, tag_limit=None, partial=False): + """Search watches by URL, title, or error messages + + Args: + query (str): Search term to match against watch URLs, titles, and error messages + tag_limit (str, optional): Optional tag name to limit search results + partial: (bool, optional): sub-string matching + + Returns: + list: List of UUIDs of watches that match the search criteria + """ + matching_uuids = [] + query = query.lower().strip() + tag = self.tag_exists_by_name(tag_limit) if tag_limit else False + + for uuid, watch in self.data['watching'].items(): + # Filter by tag if requested + if tag_limit: + if not tag.get('uuid') in watch.get('tags', []): + continue + + # Search in URL, title, or error messages + if partial: + if ((watch.get('title') and query in watch.get('title').lower()) or + query in watch.get('url', '').lower() or + (watch.get('last_error') and query in watch.get('last_error').lower())): + matching_uuids.append(uuid) + else: + if ((watch.get('title') and query == watch.get('title').lower()) or + query == watch.get('url', '').lower() or + (watch.get('last_error') and query == watch.get('last_error').lower())): + matching_uuids.append(uuid) + + return matching_uuids + + def get_unique_notification_tokens_available(self): + # Ask each type of watch if they have any extra notification token to add to the validation + extra_notification_tokens = {} + watch_processors_checked = set() + + for watch_uuid, watch in self.__data['watching'].items(): + processor = watch.get('processor') + if processor not in watch_processors_checked: + extra_notification_tokens.update(watch.extra_notification_token_values()) + watch_processors_checked.add(processor) + + return extra_notification_tokens + + def get_unique_notification_token_placeholders_available(self): + # The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice + extra_notification_tokens = [] + watch_processors_checked = set() + + for watch_uuid, watch in self.__data['watching'].items(): + processor = watch.get('processor') + if processor not in watch_processors_checked: + extra_notification_tokens += watch.extra_notification_token_placeholder_info() + watch_processors_checked.add(processor) + + return extra_notification_tokens + + def add_notification_url(self, notification_url): + + logger.debug(f">>> Adding new notification_url - '{notification_url}'") + + notification_urls = self.data['settings']['application'].get('notification_urls', []) + + if notification_url in notification_urls: + return notification_url + + with self.lock: + notification_urls = self.__data['settings']['application'].get('notification_urls', []) + + if notification_url in notification_urls: + return notification_url + + # Append and update the datastore + notification_urls.append(notification_url) + self.__data['settings']['application']['notification_urls'] = notification_urls + + self.mark_settings_dirty() + return notification_url + + # Schema update methods moved to store/updates.py (DatastoreUpdatesMixin) + # This includes: get_updates_available(), run_updates(), and update_1() through update_26() diff --git a/changedetectionio/store/base.py b/changedetectionio/store/base.py new file mode 100644 index 00000000..c0c9e47c --- /dev/null +++ b/changedetectionio/store/base.py @@ -0,0 +1,100 @@ +""" +Base classes for the datastore. + +This module defines the abstract interfaces that all datastore implementations must follow. +""" + +from abc import ABC, abstractmethod +from threading import Lock +from loguru import logger + + +class DataStore(ABC): + """ + Abstract base class for all datastore implementations. + + Defines the core interface that all datastores must implement for: + - Loading and saving data + - Managing watches + - Handling settings + - Providing data access + """ + + lock = Lock() + datastore_path = None + + @abstractmethod + def reload_state(self, datastore_path, include_default_watches, version_tag): + """ + Load data from persistent storage. + + Args: + datastore_path: Path to the datastore directory + include_default_watches: Whether to create default watches if none exist + version_tag: Application version string + """ + pass + + @abstractmethod + def add_watch(self, url, **kwargs): + """ + Add a new watch. + + Args: + url: URL to watch + **kwargs: Additional watch parameters + + Returns: + UUID of the created watch + """ + pass + + @abstractmethod + def update_watch(self, uuid, update_obj): + """ + Update an existing watch. + + Args: + uuid: Watch UUID + update_obj: Dictionary of fields to update + """ + pass + + @abstractmethod + def delete(self, uuid): + """ + Delete a watch. + + Args: + uuid: Watch UUID to delete + """ + pass + + @property + @abstractmethod + def data(self): + """ + Access to the underlying data structure. + + Returns: + Dictionary containing all datastore data + """ + pass + + @abstractmethod + def force_save_all(self): + """ + Force immediate synchronous save of all data to storage. + + This is the abstract method for forcing a complete save. + Different backends implement this differently: + - File backend: Mark all watches/settings dirty, then save + - Redis backend: SAVE command or pipeline flush + - SQL backend: COMMIT transaction + + Used by: + - Backup creation (ensure everything is saved before backup) + - Shutdown (ensure all changes are persisted) + - Manual save operations + """ + pass diff --git a/changedetectionio/store/file_saving_datastore.py b/changedetectionio/store/file_saving_datastore.py new file mode 100644 index 00000000..d45c8c48 --- /dev/null +++ b/changedetectionio/store/file_saving_datastore.py @@ -0,0 +1,897 @@ +""" +File-based datastore with individual watch persistence and dirty tracking. + +This module provides the FileSavingDataStore abstract class that implements: +- Individual watch.json file persistence +- Hash-based change detection (only save what changed) +- Periodic audit scan (catches unmarked changes) +- Background save thread with batched parallel saves +- Atomic file writes safe for NFS/NAS +""" + +import glob +import hashlib +import json +import os +import tempfile +import time +from concurrent.futures import ThreadPoolExecutor, as_completed +from distutils.util import strtobool +from threading import Thread +from loguru import logger + +from .base import DataStore + +# Try to import orjson for faster JSON serialization +try: + import orjson + HAS_ORJSON = True +except ImportError: + HAS_ORJSON = False + +# Fsync configuration: Force file data to disk for crash safety +# Default False to match legacy behavior (write-and-rename without fsync) +# Set to True for mission-critical deployments requiring crash consistency +FORCE_FSYNC_DATA_IS_CRITICAL = bool(strtobool(os.getenv('FORCE_FSYNC_DATA_IS_CRITICAL', 'False'))) + +# Save interval configuration: How often the background thread saves dirty items +# Default 10 seconds - increase for less frequent saves, decrease for more frequent +DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS = int(os.getenv('DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS', '10')) + +# Rolling audit configuration: Scans a fraction of watches each cycle +# Default: Run audit every 10s, split into 5 shards +# Full audit completes every 50s (10s × 5 shards) +# With 56k watches: 56k / 5 = ~11k watches per cycle (~60ms vs 316ms for all) +# Handles dynamic watch count - recalculates shard boundaries each cycle +DATASTORE_AUDIT_INTERVAL_SECONDS = int(os.getenv('DATASTORE_AUDIT_INTERVAL_SECONDS', '10')) +DATASTORE_AUDIT_SHARDS = int(os.getenv('DATASTORE_AUDIT_SHARDS', '5')) + + +# ============================================================================ +# Helper Functions for Atomic File Operations +# ============================================================================ + +def save_json_atomic(file_path, data_dict, label="file", max_size_mb=10): + """ + Save JSON data to disk using atomic write pattern. + + Generic helper for saving any JSON data (settings, watches, etc.) with: + - Atomic write (temp file + rename) + - Directory fsync for crash consistency (only for new files) + - Size validation + - Proper error handling + + Args: + file_path: Full path to target JSON file + data_dict: Dictionary to serialize + label: Human-readable label for error messages (e.g., "watch", "settings") + max_size_mb: Maximum allowed file size in MB + + Raises: + ValueError: If serialized data exceeds max_size_mb + OSError: If disk is full (ENOSPC) or other I/O error + """ + # Check if file already exists (before we start writing) + # Directory fsync only needed for NEW files to persist the filename + file_exists = os.path.exists(file_path) + + # Ensure parent directory exists + parent_dir = os.path.dirname(file_path) + os.makedirs(parent_dir, exist_ok=True) + + # Create temp file in same directory (required for NFS atomicity) + fd, temp_path = tempfile.mkstemp( + suffix='.tmp', + prefix='json-', + dir=parent_dir, + text=False + ) + + fd_closed = False + try: + # Serialize data + t0 = time.time() + if HAS_ORJSON: + data = orjson.dumps(data_dict, option=orjson.OPT_INDENT_2) + else: + data = json.dumps(data_dict, indent=2, ensure_ascii=False).encode('utf-8') + serialize_ms = (time.time() - t0) * 1000 + + # Safety check: validate size + MAX_SIZE = max_size_mb * 1024 * 1024 + data_size = len(data) + if data_size > MAX_SIZE: + raise ValueError( + f"{label.capitalize()} data is unexpectedly large: {data_size / 1024 / 1024:.2f}MB " + f"(max: {max_size_mb}MB). This indicates a bug or data corruption." + ) + + # Write to temp file + t1 = time.time() + os.write(fd, data) + write_ms = (time.time() - t1) * 1000 + + # Optional fsync: Force file data to disk for crash safety + # Only if FORCE_FSYNC_DATA_IS_CRITICAL=True (default: False, matches legacy behavior) + t2 = time.time() + if FORCE_FSYNC_DATA_IS_CRITICAL: + os.fsync(fd) + file_fsync_ms = (time.time() - t2) * 1000 + + os.close(fd) + fd_closed = True + + # Atomic rename + t3 = time.time() + os.replace(temp_path, file_path) + rename_ms = (time.time() - t3) * 1000 + + # Sync directory to ensure filename metadata is durable + # OPTIMIZATION: Only needed for NEW files. Existing files already have + # directory entry persisted, so we only need file fsync for data durability. + dir_fsync_ms = 0 + if not file_exists: + try: + dir_fd = os.open(parent_dir, os.O_RDONLY) + try: + t4 = time.time() + os.fsync(dir_fd) + dir_fsync_ms = (time.time() - t4) * 1000 + finally: + os.close(dir_fd) + except (OSError, AttributeError): + # Windows doesn't support fsync on directories + pass + + # Log timing breakdown for slow saves +# total_ms = serialize_ms + write_ms + file_fsync_ms + rename_ms + dir_fsync_ms +# if total_ms: # Log if save took more than 10ms +# file_status = "new" if not file_exists else "update" +# logger.trace( +# f"Save timing breakdown ({total_ms:.1f}ms total, {file_status}): " +# f"serialize={serialize_ms:.1f}ms, write={write_ms:.1f}ms, " +# f"file_fsync={file_fsync_ms:.1f}ms, rename={rename_ms:.1f}ms, " +# f"dir_fsync={dir_fsync_ms:.1f}ms, using_orjson={HAS_ORJSON}" +# ) + + except OSError as e: + # Cleanup temp file + if not fd_closed: + try: + os.close(fd) + except: + pass + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except: + pass + + # Provide helpful error messages + if e.errno == 28: # ENOSPC + raise OSError(f"Disk full: Cannot save {label}") from e + elif e.errno == 122: # EDQUOT + raise OSError(f"Disk quota exceeded: Cannot save {label}") from e + else: + raise OSError(f"I/O error saving {label}: {e}") from e + + except Exception as e: + # Cleanup temp file + if not fd_closed: + try: + os.close(fd) + except: + pass + if os.path.exists(temp_path): + try: + os.unlink(temp_path) + except: + pass + raise e + + +def save_watch_atomic(watch_dir, uuid, watch_dict): + """ + Save a watch to disk using atomic write pattern. + + Convenience wrapper around save_json_atomic for watches. + + Args: + watch_dir: Directory for this watch (e.g., /datastore/{uuid}) + uuid: Watch UUID (for logging) + watch_dict: Dictionary representation of the watch + + Raises: + ValueError: If serialized data exceeds 10MB (indicates bug or corruption) + OSError: If disk is full (ENOSPC) or other I/O error + """ + watch_json = os.path.join(watch_dir, "watch.json") + save_json_atomic(watch_json, watch_dict, label=f"watch {uuid}", max_size_mb=10) + + +def load_watch_from_file(watch_json, uuid, rehydrate_entity_func): + """ + Load a watch from its JSON file. + + Args: + watch_json: Path to the watch.json file + uuid: Watch UUID + rehydrate_entity_func: Function to convert dict to Watch object + + Returns: + Tuple of (Watch object, raw_data_dict) or (None, None) if failed + The raw_data_dict is needed to compute the hash before rehydration + """ + try: + # Check file size before reading + file_size = os.path.getsize(watch_json) + MAX_WATCH_SIZE = 10 * 1024 * 1024 # 10MB + if file_size > MAX_WATCH_SIZE: + logger.critical( + f"CORRUPTED WATCH DATA: Watch {uuid} file is unexpectedly large: " + f"{file_size / 1024 / 1024:.2f}MB (max: {MAX_WATCH_SIZE / 1024 / 1024}MB). " + f"File: {watch_json}. This indicates a bug or data corruption. " + f"Watch will be skipped." + ) + return None, None + + if HAS_ORJSON: + with open(watch_json, 'rb') as f: + watch_data = orjson.loads(f.read()) + else: + with open(watch_json, 'r', encoding='utf-8') as f: + watch_data = json.load(f) + + if watch_data.get('time_schedule_limit'): + del watch_data['time_schedule_limit'] + if watch_data.get('time_between_check'): + del watch_data['time_between_check'] + + # Return both the raw data and the rehydrated watch + # Raw data is needed to compute hash before rehydration changes anything + watch_obj = rehydrate_entity_func(uuid, watch_data) + return watch_obj, watch_data + + except json.JSONDecodeError as e: + logger.critical( + f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. " + f"File: {watch_json}. Error: {e}. " + f"Watch will be skipped and may need manual recovery from backup." + ) + return None, None + except ValueError as e: + # orjson raises ValueError for invalid JSON + if "invalid json" in str(e).lower() or HAS_ORJSON: + logger.critical( + f"CORRUPTED WATCH DATA: Failed to parse JSON for watch {uuid}. " + f"File: {watch_json}. Error: {e}. " + f"Watch will be skipped and may need manual recovery from backup." + ) + return None, None + # Re-raise if it's not a JSON parsing error + raise + except FileNotFoundError: + logger.error(f"Watch file not found: {watch_json} for watch {uuid}") + return None, None + except Exception as e: + logger.error(f"Failed to load watch {uuid} from {watch_json}: {e}") + return None, None + + +def load_all_watches(datastore_path, rehydrate_entity_func, compute_hash_func): + """ + Load all watches from individual watch.json files. + + SYNCHRONOUS loading: Blocks until all watches are loaded. + This ensures data consistency - web server won't accept requests + until all watches are available. Progress logged every 100 watches. + + Args: + datastore_path: Path to the datastore directory + rehydrate_entity_func: Function to convert dict to Watch object + compute_hash_func: Function to compute hash from raw watch dict + + Returns: + Tuple of (watching_dict, hashes_dict) + - watching_dict: uuid -> Watch object + - hashes_dict: uuid -> hash string (computed from raw data) + """ + start_time = time.time() + logger.info("Loading watches from individual watch.json files...") + + watching = {} + watch_hashes = {} + + if not os.path.exists(datastore_path): + return watching, watch_hashes + + # Find all watch.json files using glob (faster than manual directory traversal) + glob_start = time.time() + watch_files = glob.glob(os.path.join(datastore_path, "*", "watch.json")) + glob_time = time.time() - glob_start + + total = len(watch_files) + logger.debug(f"Found {total} watch.json files in {glob_time:.3f}s") + + loaded = 0 + failed = 0 + + for watch_json in watch_files: + # Extract UUID from path: /datastore/{uuid}/watch.json + uuid_dir = os.path.basename(os.path.dirname(watch_json)) + watch, raw_data = load_watch_from_file(watch_json, uuid_dir, rehydrate_entity_func) + if watch and raw_data: + watching[uuid_dir] = watch + # Compute hash from raw data BEFORE rehydration to match saved hash + watch_hashes[uuid_dir] = compute_hash_func(raw_data) + loaded += 1 + + if loaded % 100 == 0: + logger.info(f"Loaded {loaded}/{total} watches...") + else: + # load_watch_from_file already logged the specific error + failed += 1 + + elapsed = time.time() - start_time + + if failed > 0: + logger.critical( + f"LOAD COMPLETE: {loaded} watches loaded successfully, " + f"{failed} watches FAILED to load (corrupted or invalid) " + f"in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)" + ) + else: + logger.info(f"Loaded {loaded} watches from disk in {elapsed:.2f}s ({loaded/elapsed:.0f} watches/sec)") + + return watching, watch_hashes + + +# ============================================================================ +# FileSavingDataStore Class +# ============================================================================ + +class FileSavingDataStore(DataStore): + """ + Abstract datastore that provides file persistence with change tracking. + + Features: + - Individual watch.json files (one per watch) + - Dirty tracking: Only saves items that have changed + - Hash-based change detection: Prevents unnecessary writes + - Background save thread: Non-blocking persistence + - Two-tier urgency: Standard (60s) and urgent (immediate) saves + + Subclasses must implement: + - rehydrate_entity(): Convert dict to Watch object + - Access to internal __data structure for watch management + """ + + needs_write = False + needs_write_urgent = False + stop_thread = False + + # Change tracking + _dirty_watches = set() # Watch UUIDs that need saving + _dirty_settings = False # Settings changed + _watch_hashes = {} # UUID -> SHA256 hash for change detection + + # Health monitoring + _last_save_time = 0 # Timestamp of last successful save + _last_audit_time = 0 # Timestamp of last audit scan + _save_cycle_count = 0 # Number of save cycles completed + _total_saves = 0 # Total watches saved (lifetime) + _save_errors = 0 # Total save errors (lifetime) + _audit_count = 0 # Number of audit scans completed + _audit_found_changes = 0 # Total unmarked changes found by audits + _audit_shard_index = 0 # Current shard being audited (rolling audit) + + def __init__(self): + super().__init__() + self.save_data_thread = None + self._last_save_time = time.time() + self._last_audit_time = time.time() + + + def mark_watch_dirty(self, uuid): + """ + Mark a watch as needing save. + + Args: + uuid: Watch UUID + """ + with self.lock: + self._dirty_watches.add(uuid) + dirty_count = len(self._dirty_watches) + + # Backpressure detection - warn if dirty set grows too large + if dirty_count > 1000: + logger.critical( + f"BACKPRESSURE WARNING: {dirty_count} watches pending save! " + f"Save thread may not be keeping up with write rate. " + f"This could indicate disk I/O bottleneck or save thread failure." + ) + elif dirty_count > 500: + logger.warning( + f"Dirty watch count high: {dirty_count} watches pending save. " + f"Monitoring for potential backpressure." + ) + + self.needs_write = True + + def mark_settings_dirty(self): + """Mark settings as needing save.""" + with self.lock: + self._dirty_settings = True + self.needs_write = True + + def _compute_hash(self, watch_dict): + """ + Compute SHA256 hash of watch for change detection. + + Args: + watch_dict: Dictionary representation of watch + + Returns: + Hex string of SHA256 hash + """ + # Use orjson for deterministic serialization if available + if HAS_ORJSON: + json_bytes = orjson.dumps(watch_dict, option=orjson.OPT_SORT_KEYS) + else: + json_str = json.dumps(watch_dict, sort_keys=True, ensure_ascii=False) + json_bytes = json_str.encode('utf-8') + + return hashlib.sha256(json_bytes).hexdigest() + + def save_watch(self, uuid, force=False, watch_dict=None, current_hash=None): + """ + Save a single watch if it has changed (polymorphic method). + + Args: + uuid: Watch UUID + force: If True, skip hash check and save anyway + watch_dict: Pre-computed watch dictionary (optimization) + current_hash: Pre-computed hash (optimization) + + Returns: + True if saved, False if skipped (unchanged) + """ + if not self._watch_exists(uuid): + logger.warning(f"Cannot save watch {uuid} - does not exist") + return False + + # Get watch dict if not provided + if watch_dict is None: + watch_dict = self._get_watch_dict(uuid) + + # Compute hash if not provided + if current_hash is None: + current_hash = self._compute_hash(watch_dict) + + # Skip save if unchanged (unless forced) + if not force and current_hash == self._watch_hashes.get(uuid): + return False + + try: + self._save_watch(uuid, watch_dict) + self._watch_hashes[uuid] = current_hash + logger.debug(f"Saved watch {uuid}") + return True + except Exception as e: + logger.error(f"Failed to save watch {uuid}: {e}") + raise + + def _save_watch(self, uuid, watch_dict): + """ + Save a single watch to storage (polymorphic). + + Backend-specific implementation. Subclasses override for different storage: + - File backend: Writes to {uuid}/watch.json + - Redis backend: SET watch:{uuid} + - SQL backend: UPDATE watches WHERE uuid=? + + Args: + uuid: Watch UUID + watch_dict: Dictionary representation of watch + """ + # Default file implementation + watch_dir = os.path.join(self.datastore_path, uuid) + save_watch_atomic(watch_dir, uuid, watch_dict) + + def _save_settings(self): + """ + Save settings to storage (polymorphic). + + Subclasses must implement for their backend. + - File: changedetection.json + - Redis: SET settings + - SQL: UPDATE settings table + """ + raise NotImplementedError("Subclass must implement _save_settings") + + def _load_watches(self): + """ + Load all watches from storage (polymorphic). + + Subclasses must implement for their backend. + - File: Read individual watch.json files + - Redis: SCAN watch:* keys + - SQL: SELECT * FROM watches + """ + raise NotImplementedError("Subclass must implement _load_watches") + + def _delete_watch(self, uuid): + """ + Delete a watch from storage (polymorphic). + + Subclasses must implement for their backend. + - File: Delete {uuid}/ directory recursively + - Redis: DEL watch:{uuid} + - SQL: DELETE FROM watches WHERE uuid=? + + Args: + uuid: Watch UUID to delete + """ + raise NotImplementedError("Subclass must implement _delete_watch") + + def _save_dirty_items(self): + """ + Save dirty watches and settings. + + This is the core optimization: instead of saving the entire datastore, + we only save watches that were marked dirty and settings if changed. + """ + start_time = time.time() + + # Capture dirty sets under lock + with self.lock: + dirty_watches = list(self._dirty_watches) + dirty_settings = self._dirty_settings + self._dirty_watches.clear() + self._dirty_settings = False + + if not dirty_watches and not dirty_settings: + return + + logger.trace(f"Saving {len(dirty_watches)} dirty watches, settings_dirty={dirty_settings}") + + # Save each dirty watch using the polymorphic save method + saved_count = 0 + error_count = 0 + skipped_unchanged = 0 + + # Process in batches of 50, using thread pool for parallel saves + BATCH_SIZE = 50 + MAX_WORKERS = 20 # Number of parallel save threads + + def save_single_watch(uuid): + """Helper function for thread pool execution.""" + try: + # Check if watch still exists (might have been deleted) + if not self._watch_exists(uuid): + # Watch was deleted, remove hash + self._watch_hashes.pop(uuid, None) + return {'status': 'deleted', 'uuid': uuid} + + # Pre-check hash to avoid unnecessary save_watch() calls + watch_dict = self._get_watch_dict(uuid) + current_hash = self._compute_hash(watch_dict) + + if current_hash == self._watch_hashes.get(uuid): + # Watch hasn't actually changed, skip + return {'status': 'unchanged', 'uuid': uuid} + + # Pass pre-computed values to avoid redundant serialization/hashing + if self.save_watch(uuid, force=True, watch_dict=watch_dict, current_hash=current_hash): + return {'status': 'saved', 'uuid': uuid} + else: + return {'status': 'skipped', 'uuid': uuid} + except Exception as e: + logger.error(f"Error saving watch {uuid}: {e}") + return {'status': 'error', 'uuid': uuid, 'error': e} + + # Process dirty watches in batches + for batch_start in range(0, len(dirty_watches), BATCH_SIZE): + batch = dirty_watches[batch_start:batch_start + BATCH_SIZE] + batch_num = (batch_start // BATCH_SIZE) + 1 + total_batches = (len(dirty_watches) + BATCH_SIZE - 1) // BATCH_SIZE + + if len(dirty_watches) > BATCH_SIZE: + logger.trace(f"Save batch {batch_num}/{total_batches} ({len(batch)} watches)") + + # Use thread pool to save watches in parallel + with ThreadPoolExecutor(max_workers=MAX_WORKERS) as executor: + # Submit all save tasks + future_to_uuid = {executor.submit(save_single_watch, uuid): uuid for uuid in batch} + + # Collect results as they complete + for future in as_completed(future_to_uuid): + result = future.result() + status = result['status'] + + if status == 'saved': + saved_count += 1 + elif status == 'unchanged': + skipped_unchanged += 1 + elif status == 'error': + error_count += 1 + # Re-mark for retry + with self.lock: + self._dirty_watches.add(result['uuid']) + # 'deleted' and 'skipped' don't need special handling + + # Save settings if changed + if dirty_settings: + try: + self._save_settings() + logger.debug("Saved settings") + except Exception as e: + logger.error(f"Failed to save settings: {e}") + error_count += 1 + with self.lock: + self._dirty_settings = True + + # Update metrics + elapsed = time.time() - start_time + self._save_cycle_count += 1 + self._total_saves += saved_count + self._save_errors += error_count + self._last_save_time = time.time() + + # Log performance metrics + if saved_count > 0: + avg_time_per_watch = (elapsed / saved_count) * 1000 # milliseconds + skipped_msg = f", {skipped_unchanged} unchanged" if skipped_unchanged > 0 else "" + parallel_msg = f" [parallel: {MAX_WORKERS} workers]" if saved_count > 1 else "" + logger.info( + f"Successfully saved {saved_count} watches in {elapsed:.2f}s " + f"(avg {avg_time_per_watch:.1f}ms per watch{skipped_msg}){parallel_msg}. " + f"Total: {self._total_saves} saves, {self._save_errors} errors (lifetime)" + ) + elif skipped_unchanged > 0: + logger.debug(f"Save cycle: {skipped_unchanged} watches verified unchanged (hash match), nothing saved") + + if error_count > 0: + logger.error(f"Save cycle completed with {error_count} errors") + + self.needs_write = False + self.needs_write_urgent = False + + def _watch_exists(self, uuid): + """ + Check if watch exists. Subclass must implement. + + Args: + uuid: Watch UUID + + Returns: + bool + """ + raise NotImplementedError("Subclass must implement _watch_exists") + + def _get_watch_dict(self, uuid): + """ + Get watch as dictionary. Subclass must implement. + + Args: + uuid: Watch UUID + + Returns: + Dictionary representation of watch + """ + raise NotImplementedError("Subclass must implement _get_watch_dict") + + def _audit_all_watches(self): + """ + Rolling audit: Scans a fraction of watches to detect unmarked changes. + + Instead of scanning ALL watches at once, this scans 1/N shards per cycle. + The shard rotates each cycle, completing a full audit every N cycles. + + Handles dynamic watch count - recalculates shard boundaries each cycle, + so newly added watches will be audited in subsequent cycles. + + Benefits: + - Lower CPU per cycle (56k / 5 = ~11k watches vs all 56k) + - More frequent audits overall (every 50s vs every 10s) + - Spreads load evenly across time + """ + audit_start = time.time() + + # Get list of all watch UUIDs (read-only, no lock needed) + try: + all_uuids = list(self.data['watching'].keys()) + except (KeyError, AttributeError, RuntimeError): + # Data structure not ready or being modified + return + + if not all_uuids: + return + + total_watches = len(all_uuids) + + # Calculate this cycle's shard boundaries + # Example: 56,278 watches / 5 shards = 11,255 watches per shard + # Shard 0: [0:11255], Shard 1: [11255:22510], etc. + shard_size = (total_watches + DATASTORE_AUDIT_SHARDS - 1) // DATASTORE_AUDIT_SHARDS + start_idx = self._audit_shard_index * shard_size + end_idx = min(start_idx + shard_size, total_watches) + + # Handle wrap-around (shouldn't happen normally, but defensive) + if start_idx >= total_watches: + self._audit_shard_index = 0 + start_idx = 0 + end_idx = min(shard_size, total_watches) + + # Audit only this shard's watches + shard_uuids = all_uuids[start_idx:end_idx] + + changes_found = 0 + errors = 0 + + for uuid in shard_uuids: + try: + # Get current watch dict and compute hash + watch_dict = self._get_watch_dict(uuid) + current_hash = self._compute_hash(watch_dict) + stored_hash = self._watch_hashes.get(uuid) + + # If hash changed and not already marked dirty, mark it + if current_hash != stored_hash: + with self.lock: + if uuid not in self._dirty_watches: + self._dirty_watches.add(uuid) + changes_found += 1 + logger.warning( + f"Audit detected unmarked change in watch {uuid[:8]}... " + f"(hash changed but not marked dirty)" + ) + self.needs_write = True + except Exception as e: + errors += 1 + logger.trace(f"Audit error for watch {uuid[:8]}...: {e}") + + audit_elapsed = (time.time() - audit_start) * 1000 # milliseconds + + # Advance to next shard (wrap around after last shard) + self._audit_shard_index = (self._audit_shard_index + 1) % DATASTORE_AUDIT_SHARDS + + # Update metrics + self._audit_count += 1 + self._audit_found_changes += changes_found + self._last_audit_time = time.time() + + if changes_found > 0: + logger.warning( + f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS} found {changes_found} " + f"unmarked changes in {len(shard_uuids)}/{total_watches} watches ({audit_elapsed:.1f}ms)" + ) + else: + logger.trace( + f"Audit shard {self._audit_shard_index}/{DATASTORE_AUDIT_SHARDS}: " + f"{len(shard_uuids)}/{total_watches} watches checked, 0 changes ({audit_elapsed:.1f}ms)" + ) + + def save_datastore(self): + """ + Background thread that periodically saves dirty items and audits watches. + + Runs two independent cycles: + 1. Save dirty items every DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS (default 10s) + 2. Rolling audit: every DATASTORE_AUDIT_INTERVAL_SECONDS (default 10s) + - Scans 1/DATASTORE_AUDIT_SHARDS watches per cycle (default 1/5) + - Full audit completes every 50s (10s × 5 shards) + - Automatically handles new/deleted watches + + Uses 0.5s sleep intervals for responsiveness to urgent saves. + """ + while True: + if self.stop_thread: + # Graceful shutdown: flush any remaining dirty items before stopping + if self.needs_write or self._dirty_watches or self._dirty_settings: + logger.warning("Datastore save thread stopping - flushing remaining dirty items...") + try: + self._save_dirty_items() + logger.info("Graceful shutdown complete - all data saved") + except Exception as e: + logger.critical(f"FAILED to save dirty items during shutdown: {e}") + else: + logger.info("Datastore save thread stopping - no dirty items") + return + + # Check if it's time to run audit scan (every N seconds) + if time.time() - self._last_audit_time >= DATASTORE_AUDIT_INTERVAL_SECONDS: + try: + self._audit_all_watches() + except Exception as e: + logger.error(f"Error in audit cycle: {e}") + + # Save dirty items if needed + if self.needs_write or self.needs_write_urgent: + try: + self._save_dirty_items() + except Exception as e: + logger.error(f"Error in save cycle: {e}") + + # Timer with early break for urgent saves + # Each iteration is 0.5 seconds, so iterations = DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2 + for i in range(DATASTORE_SCAN_DIRTY_SAVE_INTERVAL_SECONDS * 2): + time.sleep(0.5) + if self.stop_thread or self.needs_write_urgent: + break + + def start_save_thread(self): + """Start the background save thread.""" + if not self.save_data_thread or not self.save_data_thread.is_alive(): + self.save_data_thread = Thread(target=self.save_datastore, daemon=True, name="DatastoreSaver") + self.save_data_thread.start() + logger.info("Datastore save thread started") + + def force_save_all(self): + """ + Force immediate synchronous save of all changes to storage. + + File backend implementation of the abstract force_save_all() method. + Marks all watches and settings as dirty, then saves immediately. + + Used by: + - Backup creation (ensure everything is saved before backup) + - Shutdown (ensure all changes are persisted) + - Manual save operations + """ + logger.info("Force saving all data to storage...") + + # Mark everything as dirty to ensure complete save + for uuid in self.data['watching'].keys(): + self.mark_watch_dirty(uuid) + self.mark_settings_dirty() + + # Save immediately (synchronous) + self._save_dirty_items() + + logger.success("All data saved to storage") + + def get_health_status(self): + """ + Get datastore health status for monitoring. + + Returns: + dict with health metrics and status + """ + now = time.time() + time_since_last_save = now - self._last_save_time + + with self.lock: + dirty_count = len(self._dirty_watches) + + is_thread_alive = self.save_data_thread and self.save_data_thread.is_alive() + + # Determine health status + if not is_thread_alive: + status = "CRITICAL" + message = "Save thread is DEAD" + elif time_since_last_save > 300: # 5 minutes + status = "WARNING" + message = f"No save activity for {time_since_last_save:.0f}s" + elif dirty_count > 1000: + status = "WARNING" + message = f"High backpressure: {dirty_count} watches pending" + elif self._save_errors > 0 and (self._save_errors / max(self._total_saves, 1)) > 0.01: + status = "WARNING" + message = f"High error rate: {self._save_errors} errors" + else: + status = "HEALTHY" + message = "Operating normally" + + return { + "status": status, + "message": message, + "thread_alive": is_thread_alive, + "dirty_watches": dirty_count, + "dirty_settings": self._dirty_settings, + "last_save_seconds_ago": int(time_since_last_save), + "save_cycles": self._save_cycle_count, + "total_saves": self._total_saves, + "total_errors": self._save_errors, + "error_rate_percent": round((self._save_errors / max(self._total_saves, 1)) * 100, 2) + } diff --git a/changedetectionio/store/legacy_loader.py b/changedetectionio/store/legacy_loader.py new file mode 100644 index 00000000..fe21549b --- /dev/null +++ b/changedetectionio/store/legacy_loader.py @@ -0,0 +1,66 @@ +""" +Legacy format loader for url-watches.json. + +Provides functions to detect and load from the legacy monolithic JSON format. +Used during migration (update_26) to transition to individual watch.json files. +""" + +import os +import json +from loguru import logger + +# Try to import orjson for faster JSON serialization +try: + import orjson + HAS_ORJSON = True +except ImportError: + HAS_ORJSON = False + + +def has_legacy_datastore(datastore_path): + """ + Check if a legacy url-watches.json file exists. + + This is used by update_26 to determine if migration is needed. + + Args: + datastore_path: Path to datastore directory + + Returns: + bool: True if url-watches.json exists + """ + url_watches_json = os.path.join(datastore_path, "url-watches.json") + return os.path.exists(url_watches_json) + + +def load_legacy_format(json_store_path): + """ + Load datastore from legacy url-watches.json format. + + Args: + json_store_path: Full path to url-watches.json file + + Returns: + dict: Loaded datastore data with 'watching', 'settings', etc. + None: If file doesn't exist or loading failed + """ + logger.info(f"Loading from legacy format: {json_store_path}") + + if not os.path.isfile(json_store_path): + logger.warning(f"Legacy file not found: {json_store_path}") + return None + + try: + if HAS_ORJSON: + with open(json_store_path, 'rb') as f: + data = orjson.loads(f.read()) + else: + with open(json_store_path, 'r', encoding='utf-8') as f: + data = json.load(f) + + logger.info(f"Loaded {len(data.get('watching', {}))} watches from legacy format") + return data + + except Exception as e: + logger.error(f"Failed to load legacy format: {e}") + return None diff --git a/changedetectionio/store/updates.py b/changedetectionio/store/updates.py new file mode 100644 index 00000000..68d4fc2f --- /dev/null +++ b/changedetectionio/store/updates.py @@ -0,0 +1,690 @@ +""" +Schema update migrations for the datastore. + +This module contains all schema version upgrade methods (update_1 through update_N). +These are mixed into ChangeDetectionStore to keep the main store file focused. + +IMPORTANT: Each update could be run even when they have a new install and the schema is correct. +Therefore - each `update_n` should be very careful about checking if it needs to actually run. +""" + +import os +import re +import shutil +import tarfile +import time +from loguru import logger +from copy import deepcopy + +from ..html_tools import TRANSLATE_WHITESPACE_TABLE +from ..processors.restock_diff import Restock +from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT +from ..model import USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH +from .file_saving_datastore import save_watch_atomic + + +def create_backup_tarball(datastore_path, update_number): + """ + Create a tarball backup of the entire datastore structure before running an update. + + Includes: + - All {uuid}/watch.json files + - changedetection.json (settings, if it exists) + - url-watches.json (legacy format, if it exists) + - Directory structure preserved + + Args: + datastore_path: Path to datastore directory + update_number: Update number being applied + + Returns: + str: Path to created tarball, or None if backup failed + + Restoration: + To restore from a backup: + cd /path/to/datastore + tar -xzf before-update-N-timestamp.tar.gz + This will restore all watch.json files and settings to their pre-update state. + """ + timestamp = int(time.time()) + backup_filename = f"before-update-{update_number}-{timestamp}.tar.gz" + backup_path = os.path.join(datastore_path, backup_filename) + + try: + logger.info(f"Creating backup tarball: {backup_filename}") + + with tarfile.open(backup_path, "w:gz") as tar: + # Backup changedetection.json if it exists (new format) + changedetection_json = os.path.join(datastore_path, "changedetection.json") + if os.path.isfile(changedetection_json): + tar.add(changedetection_json, arcname="changedetection.json") + logger.debug("Added changedetection.json to backup") + + # Backup url-watches.json if it exists (legacy format) + url_watches_json = os.path.join(datastore_path, "url-watches.json") + if os.path.isfile(url_watches_json): + tar.add(url_watches_json, arcname="url-watches.json") + logger.debug("Added url-watches.json to backup") + + # Backup all watch directories with their watch.json files + # This preserves the UUID directory structure + watch_count = 0 + for entry in os.listdir(datastore_path): + entry_path = os.path.join(datastore_path, entry) + + # Skip if not a directory + if not os.path.isdir(entry_path): + continue + + # Skip hidden directories and backup directories + if entry.startswith('.') or entry.startswith('before-update-'): + continue + + # Check if this directory has a watch.json (indicates it's a watch UUID directory) + watch_json = os.path.join(entry_path, "watch.json") + if os.path.isfile(watch_json): + # Add the watch.json file preserving directory structure + tar.add(watch_json, arcname=f"{entry}/watch.json") + watch_count += 1 + + if watch_count % 100 == 0: + logger.debug(f"Backed up {watch_count} watch.json files...") + + logger.success(f"Backup created: {backup_filename} ({watch_count} watches)") + return backup_path + + except Exception as e: + logger.error(f"Failed to create backup tarball: {e}") + # Try to clean up partial backup + if os.path.exists(backup_path): + try: + os.unlink(backup_path) + except: + pass + return None + + +class DatastoreUpdatesMixin: + """ + Mixin class containing all schema update methods. + + This class is inherited by ChangeDetectionStore to provide schema migration functionality. + Each update_N method upgrades the schema from version N-1 to version N. + """ + + def get_updates_available(self): + """ + Discover all available update methods. + + Returns: + list: Sorted list of update version numbers (e.g., [1, 2, 3, ..., 26]) + """ + import inspect + updates_available = [] + for i, o in inspect.getmembers(self, predicate=inspect.ismethod): + m = re.search(r'update_(\d+)$', i) + if m: + updates_available.append(int(m.group(1))) + updates_available.sort() + + return updates_available + + def run_updates(self, current_schema_version=None): + """ + Run all pending schema updates sequentially. + + Args: + current_schema_version: Optional current schema version. If provided, only run updates + greater than this version. If None, uses the schema version from + the datastore. If no schema version exists in datastore and it appears + to be a fresh install, sets to latest update number (no updates needed). + + IMPORTANT: Each update could be run even when they have a new install and the schema is correct. + Therefore - each `update_n` should be very careful about checking if it needs to actually run. + + Process: + 1. Get list of available updates + 2. For each update > current schema version: + - Create backup of datastore + - Run update method + - Update schema version + - Mark settings and watches dirty + 3. If any update fails, stop processing + 4. Save all changes immediately + """ + updates_available = self.get_updates_available() + + # Determine current schema version + if current_schema_version is None: + # Check if schema_version exists in datastore + current_schema_version = self.data['settings']['application'].get('schema_version') + + if current_schema_version is None: + # No schema version found - could be a fresh install or very old datastore + # If this is a fresh/new config with no watches, assume it's up-to-date + # and set to latest update number (no updates needed) + if len(self.data['watching']) == 0: + # Get the highest update number from available update methods + latest_update = updates_available[-1] if updates_available else 0 + logger.info(f"No schema version found and no watches exist - assuming fresh install, setting schema_version to {latest_update}") + self.data['settings']['application']['schema_version'] = latest_update + self.mark_settings_dirty() + return # No updates needed for fresh install + else: + # Has watches but no schema version - likely old datastore, run all updates + logger.warning("No schema version found but watches exist - running all updates from version 0") + current_schema_version = 0 + + logger.info(f"Current schema version: {current_schema_version}") + + updates_ran = [] + + for update_n in updates_available: + if update_n > current_schema_version: + logger.critical(f"Applying update_{update_n}") + + # Create tarball backup of entire datastore structure + # This includes all watch.json files, settings, and preserves directory structure + backup_path = create_backup_tarball(self.datastore_path, update_n) + if backup_path: + logger.info(f"Backup created at: {backup_path}") + else: + logger.warning("Backup creation failed, but continuing with update") + + try: + update_method = getattr(self, f"update_{update_n}")() + except Exception as e: + logger.error(f"Error while trying update_{update_n}") + logger.error(e) + # Don't run any more updates + return + else: + # Bump the version, important + self.data['settings']['application']['schema_version'] = update_n + self.mark_settings_dirty() + + # CRITICAL: Mark all watches as dirty so changes are persisted + # Most updates modify watches, and in the new individual watch.json structure, + # we need to ensure those changes are saved + logger.info(f"Marking all {len(self.data['watching'])} watches as dirty after update_{update_n} (so that it saves them to disk)") + for uuid in self.data['watching'].keys(): + self.mark_watch_dirty(uuid) + + # Save changes immediately after each update (more resilient than batching) + logger.critical(f"Saving all changes after update_{update_n}") + try: + self._save_dirty_items() + logger.success(f"Update {update_n} changes saved successfully") + except Exception as e: + logger.error(f"Failed to save update_{update_n} changes: {e}") + # Don't raise - update already ran, but changes might not be persisted + # The update will try to run again on next startup + + # Track which updates ran + updates_ran.append(update_n) + + # ============================================================================ + # Individual Update Methods + # ============================================================================ + + def update_1(self): + """Convert minutes to seconds on settings and each watch.""" + if self.data['settings']['requests'].get('minutes_between_check'): + self.data['settings']['requests']['time_between_check']['minutes'] = self.data['settings']['requests']['minutes_between_check'] + # Remove the default 'hours' that is set from the model + self.data['settings']['requests']['time_between_check']['hours'] = None + + for uuid, watch in self.data['watching'].items(): + if 'minutes_between_check' in watch: + # Only upgrade individual watch time if it was set + if watch.get('minutes_between_check', False): + self.data['watching'][uuid]['time_between_check']['minutes'] = watch['minutes_between_check'] + + def update_2(self): + """ + Move the history list to a flat text file index. + Better than SQLite because this list is only appended to, and works across NAS / NFS type setups. + """ + # @todo test running this on a newly updated one (when this already ran) + for uuid, watch in self.data['watching'].items(): + history = [] + + if watch.get('history', False): + for d, p in watch['history'].items(): + d = int(d) # Used to be keyed as str, we'll fix this now too + history.append("{},{}\n".format(d, p)) + + if len(history): + target_path = os.path.join(self.datastore_path, uuid) + if os.path.exists(target_path): + with open(os.path.join(target_path, "history.txt"), "w") as f: + f.writelines(history) + else: + logger.warning(f"Datastore history directory {target_path} does not exist, skipping history import.") + + # No longer needed, dynamically pulled from the disk when needed. + # But we should set it back to a empty dict so we don't break if this schema runs on an earlier version. + # In the distant future we can remove this entirely + self.data['watching'][uuid]['history'] = {} + + def update_3(self): + """We incorrectly stored last_changed when there was not a change, and then confused the output list table.""" + # see https://github.com/dgtlmoon/changedetection.io/pull/835 + return + + def update_4(self): + """`last_changed` not needed, we pull that information from the history.txt index.""" + for uuid, watch in self.data['watching'].items(): + try: + # Remove it from the struct + del(watch['last_changed']) + except: + continue + return + + def update_5(self): + """ + If the watch notification body, title look the same as the global one, unset it, so the watch defaults back to using the main settings. + In other words - the watch notification_title and notification_body are not needed if they are the same as the default one. + """ + current_system_body = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE) + current_system_title = self.data['settings']['application']['notification_body'].translate(TRANSLATE_WHITESPACE_TABLE) + for uuid, watch in self.data['watching'].items(): + try: + watch_body = watch.get('notification_body', '') + if watch_body and watch_body.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_body: + # Looks the same as the default one, so unset it + watch['notification_body'] = None + + watch_title = watch.get('notification_title', '') + if watch_title and watch_title.translate(TRANSLATE_WHITESPACE_TABLE) == current_system_title: + # Looks the same as the default one, so unset it + watch['notification_title'] = None + except Exception as e: + continue + return + + def update_7(self): + """ + We incorrectly used common header overrides that should only apply to Requests. + These are now handled in content_fetcher::html_requests and shouldnt be passed to Playwright/Selenium. + """ + # These were hard-coded in early versions + for v in ['User-Agent', 'Accept', 'Accept-Encoding', 'Accept-Language']: + if self.data['settings']['headers'].get(v): + del self.data['settings']['headers'][v] + + def update_8(self): + """Convert filters to a list of filters css_filter -> include_filters.""" + for uuid, watch in self.data['watching'].items(): + try: + existing_filter = watch.get('css_filter', '') + if existing_filter: + watch['include_filters'] = [existing_filter] + except: + continue + return + + def update_9(self): + """Convert old static notification tokens to jinja2 tokens.""" + # Each watch + # only { } not {{ or }} + r = r'(? '{key_exists_as_value}'") + self.data['settings']['application']['notification_format'] = key_exists_as_value + + for uuid, watch in self.data['watching'].items(): + n_format = self.data['watching'][uuid].get('notification_format') + key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None) + if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text" + logger.success(f"['watching'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") + self.data['watching'][uuid]['notification_format'] = key_exists_as_value # should be 'text' or whatever + + for uuid, tag in self.data['settings']['application']['tags'].items(): + n_format = self.data['settings']['application']['tags'][uuid].get('notification_format') + key_exists_as_value = next((k for k, v in formats.items() if v == n_format), None) + if key_exists_as_value and key_exists_as_value != USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH: # key of "Plain text" + logger.success( + f"['settings']['application']['tags'][{uuid}]['notification_format'] '{n_format}' -> '{key_exists_as_value}'") + self.data['settings']['application']['tags'][uuid][ + 'notification_format'] = key_exists_as_value # should be 'text' or whatever + + from ..notification import valid_notification_formats + formats = deepcopy(valid_notification_formats) + re_run(formats) + # And in previous versions, it was "text" instead of Plain text, Markdown instead of "Markdown to HTML" + formats['text'] = 'Text' + formats['markdown'] = 'Markdown' + re_run(formats) + + def update_24(self): + """RSS types should be inline with the same names as notification types.""" + rss_format = self.data['settings']['application'].get('rss_content_format') + if not rss_format or 'text' in rss_format: + # might have been 'plaintext, 'plain text' or something + self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT + elif 'html' in rss_format: + self.data['settings']['application']['rss_content_format'] = 'htmlcolor' + else: + # safe fallback to text + self.data['settings']['application']['rss_content_format'] = RSS_CONTENT_FORMAT_DEFAULT + + def update_25(self): + """Different processors now hold their own history.txt.""" + for uuid, watch in self.data['watching'].items(): + processor = self.data['watching'][uuid].get('processor') + if processor != 'text_json_diff': + old_history_txt = os.path.join(self.datastore_path, "history.txt") + target_history_name = f"history-{processor}.txt" + if os.path.isfile(old_history_txt) and not os.path.isfile(target_history_name): + new_history_txt = os.path.join(self.datastore_path, target_history_name) + logger.debug(f"Renaming history index {old_history_txt} to {new_history_txt}...") + shutil.move(old_history_txt, new_history_txt) + + def update_26(self): + """ + Migration: Individual watch persistence (COPY-based, safe rollback). + + Loads legacy url-watches.json format and migrates to: + - {uuid}/watch.json (per watch) + - changedetection.json (settings only) + + IMPORTANT: + - A tarball backup (before-update-26-timestamp.tar.gz) is created before migration + - url-watches.json is LEFT INTACT for rollback safety + - Users can roll back by simply downgrading to the previous version + - Or restore from tarball: tar -xzf before-update-26-*.tar.gz + + This is a dedicated migration release - users upgrade at their own pace. + """ + logger.critical("=" * 80) + logger.critical("Running migration: Individual watch persistence (update_26)") + logger.critical("COPY-based migration: url-watches.json will remain intact for rollback") + logger.critical("=" * 80) + + # Check if already migrated + changedetection_json = os.path.join(self.datastore_path, "changedetection.json") + if os.path.exists(changedetection_json): + logger.info("Migration already completed (changedetection.json exists), skipping") + return + + # Check if we need to load legacy data + from .legacy_loader import has_legacy_datastore, load_legacy_format + + if not has_legacy_datastore(self.datastore_path): + logger.info("No legacy datastore found, nothing to migrate") + return + + # Load legacy data from url-watches.json + logger.critical("Loading legacy datastore from url-watches.json...") + legacy_path = os.path.join(self.datastore_path, "url-watches.json") + legacy_data = load_legacy_format(legacy_path) + + if not legacy_data: + raise Exception("Failed to load legacy datastore from url-watches.json") + + # Populate settings from legacy data + logger.info("Populating settings from legacy data...") + if 'settings' in legacy_data: + self.data['settings'] = legacy_data['settings'] + if 'app_guid' in legacy_data: + self.data['app_guid'] = legacy_data['app_guid'] + if 'build_sha' in legacy_data: + self.data['build_sha'] = legacy_data['build_sha'] + if 'version_tag' in legacy_data: + self.data['version_tag'] = legacy_data['version_tag'] + + # Rehydrate watches from legacy data + logger.info("Rehydrating watches from legacy data...") + self.data['watching'] = {} + for uuid, watch_data in legacy_data.get('watching', {}).items(): + try: + self.data['watching'][uuid] = self.rehydrate_entity(uuid, watch_data) + except Exception as e: + logger.error(f"Failed to rehydrate watch {uuid}: {e}") + raise Exception(f"Migration failed: Could not rehydrate watch {uuid}. Error: {e}") + + watch_count = len(self.data['watching']) + logger.success(f"Loaded {watch_count} watches from legacy format") + + # Phase 1: Save all watches to individual files + logger.critical(f"Phase 1/4: Saving {watch_count} watches to individual watch.json files...") + + saved_count = 0 + for uuid, watch in self.data['watching'].items(): + try: + watch_dict = dict(watch) + watch_dir = os.path.join(self.datastore_path, uuid) + save_watch_atomic(watch_dir, uuid, watch_dict) + # Initialize hash + self._watch_hashes[uuid] = self._compute_hash(watch_dict) + saved_count += 1 + + if saved_count % 100 == 0: + logger.info(f" Progress: {saved_count}/{watch_count} watches saved...") + + except Exception as e: + logger.error(f"Failed to save watch {uuid}: {e}") + raise Exception( + f"Migration failed: Could not save watch {uuid}. " + f"url-watches.json remains intact, safe to retry. Error: {e}" + ) + + logger.critical(f"Phase 1 complete: Saved {saved_count} watches") + + # Phase 2: Verify all files exist + logger.critical("Phase 2/4: Verifying all watch.json files were created...") + + missing = [] + for uuid in self.data['watching'].keys(): + watch_json = os.path.join(self.datastore_path, uuid, "watch.json") + if not os.path.isfile(watch_json): + missing.append(uuid) + + if missing: + raise Exception( + f"Migration failed: {len(missing)} watch files missing: {missing[:5]}... " + f"url-watches.json remains intact, safe to retry." + ) + + logger.critical(f"Phase 2 complete: Verified {watch_count} watch files") + + # Phase 3: Create new settings file + logger.critical("Phase 3/4: Creating changedetection.json...") + + try: + self._save_settings() + except Exception as e: + logger.error(f"Failed to create changedetection.json: {e}") + raise Exception( + f"Migration failed: Could not create changedetection.json. " + f"url-watches.json remains intact, safe to retry. Error: {e}" + ) + + # Phase 4: Verify settings file exists + logger.critical("Phase 4/4: Verifying changedetection.json exists...") + + if not os.path.isfile(changedetection_json): + raise Exception( + "Migration failed: changedetection.json not found after save. " + "url-watches.json remains intact, safe to retry." + ) + + logger.critical("Phase 4 complete: Verified changedetection.json exists") + + # Success! Now reload from new format + logger.critical("Reloading datastore from new format...") + self._load_state() + logger.success("Datastore reloaded from new format successfully") + + logger.critical("=" * 80) + logger.critical("MIGRATION COMPLETED SUCCESSFULLY!") + logger.critical("=" * 80) + logger.info("") + logger.info("New format:") + logger.info(f" - {watch_count} individual watch.json files created") + logger.info(f" - changedetection.json created (settings only)") + logger.info("") + logger.info("Rollback safety:") + logger.info(" - url-watches.json preserved for rollback") + logger.info(" - To rollback: downgrade to previous version and restart") + logger.info(" - No manual file operations needed") + logger.info("") + logger.info("Optional cleanup (after testing new version):") + logger.info(f" - rm {os.path.join(self.datastore_path, 'url-watches.json')}") + logger.info("") + + # Schema version will be updated by run_updates() diff --git a/changedetectionio/tests/test_backup.py b/changedetectionio/tests/test_backup.py index e7bbc7a1..52652019 100644 --- a/changedetectionio/tests/test_backup.py +++ b/changedetectionio/tests/test_backup.py @@ -53,11 +53,21 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path): backup = ZipFile(io.BytesIO(res.data)) l = backup.namelist() - uuid4hex = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I) - newlist = list(filter(uuid4hex.match, l)) # Read Note below + # Check for UUID-based txt files (history and snapshot) + uuid4hex_txt = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}.*txt', re.I) + txt_files = list(filter(uuid4hex_txt.match, l)) # Should be two txt files in the archive (history and the snapshot) - assert len(newlist) == 2 + assert len(txt_files) == 2 + + # Check for watch.json files (new format) + uuid4hex_json = re.compile('^[a-f0-9]{8}-?[a-f0-9]{4}-?4[a-f0-9]{3}-?[89ab][a-f0-9]{3}-?[a-f0-9]{12}/watch\.json$', re.I) + json_files = list(filter(uuid4hex_json.match, l)) + # Should be one watch.json file in the archive (the imported watch) + assert len(json_files) == 1, f"Expected 1 watch.json file, found {len(json_files)}: {json_files}" + + # Check for changedetection.json (settings file) + assert 'changedetection.json' in l, "changedetection.json should be in backup" # Get the latest one res = client.get( diff --git a/changedetectionio/tests/test_history_consistency.py b/changedetectionio/tests/test_history_consistency.py index 5ce8f549..3a344102 100644 --- a/changedetectionio/tests/test_history_consistency.py +++ b/changedetectionio/tests/test_history_consistency.py @@ -59,11 +59,29 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore # Wait for the sync DB save to happen time.sleep(2) - json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json') + # Check which format is being used + datastore_path = live_server.app.config['DATASTORE'].datastore_path + changedetection_json = os.path.join(datastore_path, 'changedetection.json') + url_watches_json = os.path.join(datastore_path, 'url-watches.json') - json_obj = None - with open(json_db_file, 'r', encoding='utf-8') as f: - json_obj = json.load(f) + json_obj = {'watching': {}} + + if os.path.exists(changedetection_json): + # New format: individual watch.json files + logger.info("Testing with new format (changedetection.json + individual watch.json)") + + # Load each watch.json file + for uuid in live_server.app.config['DATASTORE'].data['watching'].keys(): + watch_json_file = os.path.join(datastore_path, uuid, 'watch.json') + assert os.path.isfile(watch_json_file), f"watch.json should exist at {watch_json_file}" + + with open(watch_json_file, 'r', encoding='utf-8') as f: + json_obj['watching'][uuid] = json.load(f) + else: + # Legacy format: url-watches.json + logger.info("Testing with legacy format (url-watches.json)") + with open(url_watches_json, 'r', encoding='utf-8') as f: + json_obj = json.load(f) # assert the right amount of watches was found in the JSON assert len(json_obj['watching']) == len(workers), "Correct number of watches was found in the JSON" @@ -88,7 +106,7 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore # Find the snapshot one for fname in files_in_watch_dir: - if fname != 'history.txt' and 'html' not in fname: + if fname != 'history.txt' and fname != 'watch.json' and 'html' not in fname: if strtobool(os.getenv("TEST_WITH_BROTLI")): assert fname.endswith('.br'), "Forced TEST_WITH_BROTLI then it should be a .br filename" @@ -105,11 +123,23 @@ def test_consistent_history(client, live_server, measure_memory_usage, datastore assert json_obj['watching'][w]['title'], "Watch should have a title set" assert contents.startswith(watch_title + "x"), f"Snapshot contents in file {fname} should start with '{watch_title}x', got '{contents}'" - assert len(files_in_watch_dir) == 3, "Should be just three files in the dir, html.br snapshot, history.txt and the extracted text snapshot" + # With new format, we also have watch.json, so 4 files total + if os.path.exists(changedetection_json): + assert len(files_in_watch_dir) == 4, "Should be four files in the dir with new format: watch.json, html.br snapshot, history.txt and the extracted text snapshot" + else: + assert len(files_in_watch_dir) == 3, "Should be just three files in the dir with legacy format: html.br snapshot, history.txt and the extracted text snapshot" - json_db_file = os.path.join(live_server.app.config['DATASTORE'].datastore_path, 'url-watches.json') - with open(json_db_file, 'r', encoding='utf-8') as f: - assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved" + # Check that 'default' Watch vars aren't accidentally being saved + if os.path.exists(changedetection_json): + # New format: check all individual watch.json files + for uuid in json_obj['watching'].keys(): + watch_json_file = os.path.join(datastore_path, uuid, 'watch.json') + with open(watch_json_file, 'r', encoding='utf-8') as f: + assert '"default"' not in f.read(), f"'default' probably shouldnt be here in {watch_json_file}, it came from when the 'default' Watch vars were accidently being saved" + else: + # Legacy format: check url-watches.json + with open(url_watches_json, 'r', encoding='utf-8') as f: + assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved" def test_check_text_history_view(client, live_server, measure_memory_usage, datastore_path): diff --git a/changedetectionio/tests/test_request.py b/changedetectionio/tests/test_request.py index 71305c18..782b04aa 100644 --- a/changedetectionio/tests/test_request.py +++ b/changedetectionio/tests/test_request.py @@ -142,10 +142,14 @@ def test_body_in_request(client, live_server, measure_memory_usage, datastore_pa client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) wait_for_all_checks(client) watches_with_body = 0 - with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f: - app_struct = json.load(f) - for uuid in app_struct['watching']: - if app_struct['watching'][uuid]['body']==body_value: + + # Read individual watch.json files + for uuid in client.application.config.get('DATASTORE').data['watching'].keys(): + watch_json_file = os.path.join(datastore_path, uuid, 'watch.json') + assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}" + with open(watch_json_file, 'r', encoding='utf-8') as f: + watch_data = json.load(f) + if watch_data.get('body') == body_value: watches_with_body += 1 # Should be only one with body set @@ -225,10 +229,14 @@ def test_method_in_request(client, live_server, measure_memory_usage, datastore_ wait_for_all_checks(client) watches_with_method = 0 - with open(os.path.join(datastore_path, 'url-watches.json'), encoding='utf-8') as f: - app_struct = json.load(f) - for uuid in app_struct['watching']: - if app_struct['watching'][uuid]['method'] == 'PATCH': + + # Read individual watch.json files + for uuid in client.application.config.get('DATASTORE').data['watching'].keys(): + watch_json_file = os.path.join(datastore_path, uuid, 'watch.json') + assert os.path.exists(watch_json_file), f"watch.json should exist at {watch_json_file}" + with open(watch_json_file, 'r', encoding='utf-8') as f: + watch_data = json.load(f) + if watch_data.get('method') == 'PATCH': watches_with_method += 1 # Should be only one with method set to PATCH