mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 06:37:41 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			browser-no
			...
			abstracted
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 80434fa16a | ||
|   | db10422415 | ||
|   | 380e571ded | 
| @@ -5,6 +5,7 @@ recursive-include changedetectionio/content_fetchers * | ||||
| recursive-include changedetectionio/model * | ||||
| recursive-include changedetectionio/processors * | ||||
| recursive-include changedetectionio/static * | ||||
| recursive-include changedetectionio/storage * | ||||
| recursive-include changedetectionio/templates * | ||||
| recursive-include changedetectionio/tests * | ||||
| prune changedetectionio/static/package-lock.json | ||||
|   | ||||
| @@ -299,34 +299,17 @@ class model(watch_base): | ||||
|     # Save some text file to the appropriate path and bump the history | ||||
|     # result_obj from fetch_site_status.run() | ||||
|     def save_history_text(self, contents, timestamp, snapshot_id): | ||||
|         import brotli | ||||
|         from changedetectionio.store import ChangeDetectionStore | ||||
|         from changedetectionio.storage.filesystem_storage import FileSystemStorage | ||||
|  | ||||
|         logger.trace(f"{self.get('uuid')} - Updating history.txt with timestamp {timestamp}") | ||||
|  | ||||
|         self.ensure_data_dir_exists() | ||||
|  | ||||
|         threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024)) | ||||
|         skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False')) | ||||
|  | ||||
|         if not skip_brotli and len(contents) > threshold: | ||||
|             snapshot_fname = f"{snapshot_id}.txt.br" | ||||
|             dest = os.path.join(self.watch_data_dir, snapshot_fname) | ||||
|             if not os.path.exists(dest): | ||||
|                 with open(dest, 'wb') as f: | ||||
|                     f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)) | ||||
|         else: | ||||
|             snapshot_fname = f"{snapshot_id}.txt" | ||||
|             dest = os.path.join(self.watch_data_dir, snapshot_fname) | ||||
|             if not os.path.exists(dest): | ||||
|                 with open(dest, 'wb') as f: | ||||
|                     f.write(contents.encode('utf-8')) | ||||
|  | ||||
|         # Append to index | ||||
|         # @todo check last char was \n | ||||
|         index_fname = os.path.join(self.watch_data_dir, "history.txt") | ||||
|         with open(index_fname, 'a') as f: | ||||
|             f.write("{},{}\n".format(timestamp, snapshot_fname)) | ||||
|             f.close() | ||||
|         # Get storage from singleton store or create a filesystem storage as default | ||||
|         store = ChangeDetectionStore.instance if hasattr(ChangeDetectionStore, 'instance') else None | ||||
|         storage = store.storage if store and hasattr(store, 'storage') else FileSystemStorage(self.__datastore_path) | ||||
|          | ||||
|         # Use the storage backend to save the history text | ||||
|         snapshot_fname = storage.save_history_text(self.get('uuid'), contents, timestamp, snapshot_id) | ||||
|  | ||||
|         self.__newest_history_key = timestamp | ||||
|         self.__history_n += 1 | ||||
|   | ||||
							
								
								
									
										61
									
								
								changedetectionio/storage/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								changedetectionio/storage/README.md
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,61 @@ | ||||
| # Storage Backends for changedetection.io | ||||
|  | ||||
| This module provides different storage backends for changedetection.io, allowing you to store data in various systems: | ||||
|  | ||||
| - **FileSystemStorage**: The default storage backend that stores data on the local filesystem. | ||||
| - **MongoDBStorage**: Stores data in a MongoDB database. | ||||
| - **S3Storage**: Stores data in an Amazon S3 bucket. | ||||
|  | ||||
| ## Usage | ||||
|  | ||||
| The storage backend is automatically selected based on the datastore path provided when initializing the application: | ||||
|  | ||||
| - For filesystem storage (default): `/datastore` | ||||
| - For MongoDB storage: `mongodb://username:password@host:port/database` | ||||
| - For S3 storage: `s3://bucket-name/optional-prefix` | ||||
|  | ||||
| ## Configuration | ||||
|  | ||||
| ### Filesystem Storage | ||||
|  | ||||
| The default storage backend. Simply specify a directory path: | ||||
|  | ||||
| ``` | ||||
| changedetection.io -d /path/to/datastore | ||||
| ``` | ||||
|  | ||||
| ### MongoDB Storage | ||||
|  | ||||
| To use MongoDB storage, specify a MongoDB connection URI: | ||||
|  | ||||
| ``` | ||||
| changedetection.io -d mongodb://username:password@host:port/database | ||||
| ``` | ||||
|  | ||||
| Make sure to install the required dependencies: | ||||
|  | ||||
| ``` | ||||
| pip install -r requirements-storage.txt | ||||
| ``` | ||||
|  | ||||
| ### Amazon S3 Storage | ||||
|  | ||||
| To use S3 storage, specify an S3 URI: | ||||
|  | ||||
| ``` | ||||
| changedetection.io -d s3://bucket-name/optional-prefix | ||||
| ``` | ||||
|  | ||||
| Make sure to: | ||||
| 1. Install the required dependencies: `pip install -r requirements-storage.txt` | ||||
| 2. Configure AWS credentials using environment variables or IAM roles: | ||||
|    - Set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables | ||||
|    - Or use an IAM role when running on AWS EC2/ECS/EKS | ||||
|  | ||||
| ## Custom Storage Backends | ||||
|  | ||||
| You can create custom storage backends by: | ||||
|  | ||||
| 1. Subclassing the `StorageBase` abstract class in `storage_base.py` | ||||
| 2. Implementing all required methods | ||||
| 3. Adding your backend to the `storage_factory.py` file | ||||
							
								
								
									
										1
									
								
								changedetectionio/storage/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								changedetectionio/storage/__init__.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1 @@ | ||||
| # This module contains storage backend implementations | ||||
							
								
								
									
										449
									
								
								changedetectionio/storage/filesystem_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										449
									
								
								changedetectionio/storage/filesystem_storage.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,449 @@ | ||||
| import os | ||||
| import shutil | ||||
| import json | ||||
| import brotli | ||||
| import zlib | ||||
| import pathlib | ||||
| from loguru import logger | ||||
| from os import path | ||||
|  | ||||
| from .storage_base import StorageBase | ||||
|  | ||||
| class FileSystemStorage(StorageBase): | ||||
|     """File system storage backend""" | ||||
|      | ||||
|     def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"): | ||||
|         """Initialize the file system storage backend | ||||
|          | ||||
|         Args: | ||||
|             datastore_path (str): Path to the datastore | ||||
|             include_default_watches (bool): Whether to include default watches | ||||
|             version_tag (str): Version tag | ||||
|         """ | ||||
|         self.datastore_path = datastore_path | ||||
|         self.json_store_path = "{}/url-watches.json".format(self.datastore_path) | ||||
|         logger.info(f"Datastore path is '{self.json_store_path}'") | ||||
|          | ||||
|     def load_data(self): | ||||
|         """Load data from the file system | ||||
|          | ||||
|         Returns: | ||||
|             dict: The loaded data | ||||
|         """ | ||||
|         if not path.isfile(self.json_store_path): | ||||
|             return None | ||||
|              | ||||
|         with open(self.json_store_path) as json_file: | ||||
|             return json.load(json_file) | ||||
|  | ||||
|     def save_data(self, data): | ||||
|         """Save data to the file system | ||||
|          | ||||
|         Args: | ||||
|             data (dict): The data to save | ||||
|         """ | ||||
|         try: | ||||
|             # Re #286 - First write to a temp file, then confirm it looks OK and rename it | ||||
|             # This is a fairly basic strategy to deal with the case that the file is corrupted, | ||||
|             # system was out of memory, out of RAM etc | ||||
|             with open(self.json_store_path+".tmp", 'w') as json_file: | ||||
|                 json.dump(data, json_file, indent=4) | ||||
|             os.replace(self.json_store_path+".tmp", self.json_store_path) | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}") | ||||
|             raise e | ||||
|      | ||||
|     def get_watch_dir(self, watch_uuid): | ||||
|         """Get the directory for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The watch directory | ||||
|         """ | ||||
|         return os.path.join(self.datastore_path, watch_uuid) | ||||
|      | ||||
|     def ensure_data_dir_exists(self, watch_uuid): | ||||
|         """Ensure the data directory exists for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         if not os.path.isdir(watch_dir): | ||||
|             logger.debug(f"> Creating data dir {watch_dir}") | ||||
|             os.makedirs(watch_dir, exist_ok=True) | ||||
|      | ||||
|     def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id): | ||||
|         """Save history text to the file system | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Contents to save | ||||
|             timestamp (int): Timestamp | ||||
|             snapshot_id (str): Snapshot ID | ||||
|              | ||||
|         Returns: | ||||
|             str: Snapshot filename | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|          | ||||
|         threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024)) | ||||
|         skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't') | ||||
|          | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         if not skip_brotli and len(contents) > threshold: | ||||
|             snapshot_fname = f"{snapshot_id}.txt.br" | ||||
|             dest = os.path.join(watch_dir, snapshot_fname) | ||||
|             if not os.path.exists(dest): | ||||
|                 with open(dest, 'wb') as f: | ||||
|                     f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)) | ||||
|         else: | ||||
|             snapshot_fname = f"{snapshot_id}.txt" | ||||
|             dest = os.path.join(watch_dir, snapshot_fname) | ||||
|             if not os.path.exists(dest): | ||||
|                 with open(dest, 'wb') as f: | ||||
|                     f.write(contents.encode('utf-8')) | ||||
|          | ||||
|         # Append to index | ||||
|         index_fname = os.path.join(watch_dir, "history.txt") | ||||
|         with open(index_fname, 'a') as f: | ||||
|             f.write("{},{}\n".format(timestamp, snapshot_fname)) | ||||
|          | ||||
|         return snapshot_fname | ||||
|      | ||||
|     def get_history(self, watch_uuid): | ||||
|         """Get history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             dict: The history with timestamp keys and snapshot IDs as values | ||||
|         """ | ||||
|         tmp_history = {} | ||||
|          | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         if not os.path.isdir(watch_dir): | ||||
|             return tmp_history | ||||
|          | ||||
|         # Read the history file as a dict | ||||
|         fname = os.path.join(watch_dir, "history.txt") | ||||
|         if os.path.isfile(fname): | ||||
|             logger.debug(f"Reading watch history index for {watch_uuid}") | ||||
|             with open(fname, "r") as f: | ||||
|                 for i in f.readlines(): | ||||
|                     if ',' in i: | ||||
|                         k, v = i.strip().split(',', 2) | ||||
|                          | ||||
|                         # The index history could contain a relative path, so we need to make the fullpath | ||||
|                         # so that python can read it | ||||
|                         if not '/' in v and not '\'' in v: | ||||
|                             v = os.path.join(watch_dir, v) | ||||
|                         else: | ||||
|                             # It's possible that they moved the datadir on older versions | ||||
|                             # So the snapshot exists but is in a different path | ||||
|                             snapshot_fname = v.split('/')[-1] | ||||
|                             proposed_new_path = os.path.join(watch_dir, snapshot_fname) | ||||
|                             if not os.path.exists(v) and os.path.exists(proposed_new_path): | ||||
|                                 v = proposed_new_path | ||||
|                          | ||||
|                         tmp_history[k] = v | ||||
|          | ||||
|         return tmp_history | ||||
|      | ||||
|     def get_history_snapshot(self, watch_uuid, timestamp): | ||||
|         """Get a history snapshot from the file system | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str: The snapshot content | ||||
|         """ | ||||
|         history = self.get_history(watch_uuid) | ||||
|         if not timestamp in history: | ||||
|             return None | ||||
|              | ||||
|         filepath = history[timestamp] | ||||
|          | ||||
|         # See if a brotli versions exists and switch to that | ||||
|         if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"): | ||||
|             filepath = f"{filepath}.br" | ||||
|          | ||||
|         # OR in the backup case that the .br does not exist, but the plain one does | ||||
|         if filepath.endswith('.br') and not os.path.isfile(filepath): | ||||
|             if os.path.isfile(filepath.replace('.br', '')): | ||||
|                 filepath = filepath.replace('.br', '') | ||||
|          | ||||
|         if filepath.endswith('.br'): | ||||
|             # Brotli doesnt have a fileheader to detect it, so we rely on filename | ||||
|             # https://www.rfc-editor.org/rfc/rfc7932 | ||||
|             with open(filepath, 'rb') as f: | ||||
|                 return(brotli.decompress(f.read()).decode('utf-8')) | ||||
|          | ||||
|         with open(filepath, 'r', encoding='utf-8', errors='ignore') as f: | ||||
|             return f.read() | ||||
|      | ||||
|     def save_screenshot(self, watch_uuid, screenshot, as_error=False): | ||||
|         """Save a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             screenshot (bytes): Screenshot data | ||||
|             as_error (bool): Whether this is an error screenshot | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         if as_error: | ||||
|             target_path = os.path.join(watch_dir, "last-error-screenshot.png") | ||||
|         else: | ||||
|             target_path = os.path.join(watch_dir, "last-screenshot.png") | ||||
|          | ||||
|         with open(target_path, 'wb') as f: | ||||
|             f.write(screenshot) | ||||
|      | ||||
|     def get_screenshot(self, watch_uuid, is_error=False): | ||||
|         """Get a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get the error screenshot | ||||
|              | ||||
|         Returns: | ||||
|             str or None: The screenshot path or None if not available | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         if is_error: | ||||
|             fname = os.path.join(watch_dir, "last-error-screenshot.png") | ||||
|         else: | ||||
|             fname = os.path.join(watch_dir, "last-screenshot.png") | ||||
|              | ||||
|         if os.path.isfile(fname): | ||||
|             return fname | ||||
|          | ||||
|         return None | ||||
|      | ||||
|     def save_error_text(self, watch_uuid, contents): | ||||
|         """Save error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Error contents | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         target_path = os.path.join(watch_dir, "last-error.txt") | ||||
|         with open(target_path, 'w', encoding='utf-8') as f: | ||||
|             f.write(contents) | ||||
|      | ||||
|     def get_error_text(self, watch_uuid): | ||||
|         """Get error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The error text or False if not available | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         fname = os.path.join(watch_dir, "last-error.txt") | ||||
|          | ||||
|         if os.path.isfile(fname): | ||||
|             with open(fname, 'r') as f: | ||||
|                 return f.read() | ||||
|                  | ||||
|         return False | ||||
|      | ||||
|     def save_xpath_data(self, watch_uuid, data, as_error=False): | ||||
|         """Save XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             data (dict): XPath data | ||||
|             as_error (bool): Whether this is error data | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         if as_error: | ||||
|             target_path = os.path.join(watch_dir, "elements-error.deflate") | ||||
|         else: | ||||
|             target_path = os.path.join(watch_dir, "elements.deflate") | ||||
|          | ||||
|         with open(target_path, 'wb') as f: | ||||
|             f.write(zlib.compress(json.dumps(data).encode())) | ||||
|      | ||||
|     def get_xpath_data(self, watch_uuid, is_error=False): | ||||
|         """Get XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get error data | ||||
|              | ||||
|         Returns: | ||||
|             dict or None: The XPath data or None if not available | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         if is_error: | ||||
|             path = os.path.join(watch_dir, "elements-error.deflate") | ||||
|         else: | ||||
|             path = os.path.join(watch_dir, "elements.deflate") | ||||
|              | ||||
|         if not os.path.isfile(path): | ||||
|             return None | ||||
|              | ||||
|         with open(path, 'rb') as f: | ||||
|             return json.loads(zlib.decompress(f.read()).decode('utf-8')) | ||||
|      | ||||
|     def save_last_fetched_html(self, watch_uuid, timestamp, contents): | ||||
|         """Save last fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|             contents (str): HTML contents | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         snapshot_fname = f"{timestamp}.html.br" | ||||
|         filepath = os.path.join(watch_dir, snapshot_fname) | ||||
|          | ||||
|         with open(filepath, 'wb') as f: | ||||
|             contents = contents.encode('utf-8') if isinstance(contents, str) else contents | ||||
|             try: | ||||
|                 f.write(brotli.compress(contents)) | ||||
|             except Exception as e: | ||||
|                 logger.warning(f"{watch_uuid} - Unable to compress snapshot, saving as raw data to {filepath}") | ||||
|                 logger.warning(e) | ||||
|                 f.write(contents) | ||||
|                  | ||||
|         # Prune old snapshots - keep only the newest 2 | ||||
|         self._prune_last_fetched_html_snapshots(watch_uuid) | ||||
|      | ||||
|     def _prune_last_fetched_html_snapshots(self, watch_uuid): | ||||
|         """Prune old HTML snapshots | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         history = self.get_history(watch_uuid) | ||||
|          | ||||
|         dates = list(history.keys()) | ||||
|         dates.reverse() | ||||
|          | ||||
|         for index, timestamp in enumerate(dates): | ||||
|             snapshot_fname = f"{timestamp}.html.br" | ||||
|             filepath = os.path.join(watch_dir, snapshot_fname) | ||||
|              | ||||
|             # Keep only the first 2 | ||||
|             if index > 1 and os.path.isfile(filepath): | ||||
|                 os.remove(filepath) | ||||
|      | ||||
|     def get_fetched_html(self, watch_uuid, timestamp): | ||||
|         """Get fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The HTML or False if not available | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         snapshot_fname = f"{timestamp}.html.br" | ||||
|         filepath = os.path.join(watch_dir, snapshot_fname) | ||||
|          | ||||
|         if os.path.isfile(filepath): | ||||
|             with open(filepath, 'rb') as f: | ||||
|                 return brotli.decompress(f.read()).decode('utf-8') | ||||
|                  | ||||
|         return False | ||||
|      | ||||
|     def save_last_text_fetched_before_filters(self, watch_uuid, contents): | ||||
|         """Save the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Text contents | ||||
|         """ | ||||
|         self.ensure_data_dir_exists(watch_uuid) | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|          | ||||
|         filepath = os.path.join(watch_dir, 'last-fetched.br') | ||||
|         with open(filepath, 'wb') as f: | ||||
|             f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)) | ||||
|      | ||||
|     def get_last_fetched_text_before_filters(self, watch_uuid): | ||||
|         """Get the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The text | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         filepath = os.path.join(watch_dir, 'last-fetched.br') | ||||
|          | ||||
|         if not os.path.isfile(filepath): | ||||
|             # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead | ||||
|             history = self.get_history(watch_uuid) | ||||
|             dates = list(history.keys()) | ||||
|              | ||||
|             if len(dates): | ||||
|                 return self.get_history_snapshot(watch_uuid, dates[-1]) | ||||
|             else: | ||||
|                 return '' | ||||
|          | ||||
|         with open(filepath, 'rb') as f: | ||||
|             return brotli.decompress(f.read()).decode('utf-8') | ||||
|      | ||||
|     def visualselector_data_is_ready(self, watch_uuid): | ||||
|         """Check if visual selector data is ready | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             bool: Whether visual selector data is ready | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         screenshot_filename = os.path.join(watch_dir, "last-screenshot.png") | ||||
|         elements_index_filename = os.path.join(watch_dir, "elements.deflate") | ||||
|          | ||||
|         return path.isfile(screenshot_filename) and path.isfile(elements_index_filename) | ||||
|      | ||||
|     def clear_watch_history(self, watch_uuid): | ||||
|         """Clear history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         if not os.path.exists(watch_dir): | ||||
|             return | ||||
|              | ||||
|         # Delete all files but keep the directory | ||||
|         for item in pathlib.Path(watch_dir).glob("*.*"): | ||||
|             os.unlink(item) | ||||
|      | ||||
|     def delete_watch(self, watch_uuid): | ||||
|         """Delete a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_dir = self.get_watch_dir(watch_uuid) | ||||
|         if os.path.exists(watch_dir): | ||||
|             shutil.rmtree(watch_dir) | ||||
							
								
								
									
										466
									
								
								changedetectionio/storage/mongodb_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										466
									
								
								changedetectionio/storage/mongodb_storage.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,466 @@ | ||||
| import os | ||||
| from copy import deepcopy | ||||
|  | ||||
| import brotli | ||||
| import zlib | ||||
| import json | ||||
| import time | ||||
| from loguru import logger | ||||
| from pymongo import MongoClient | ||||
| from urllib.parse import urlparse | ||||
| import base64 | ||||
|  | ||||
| from .storage_base import StorageBase | ||||
|  | ||||
| class MongoDBStorage(StorageBase): | ||||
|     """MongoDB storage backend""" | ||||
|      | ||||
|     def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"): | ||||
|         """Initialize the MongoDB storage backend | ||||
|          | ||||
|         Args: | ||||
|             datastore_path (str): MongoDB connection URI | ||||
|             include_default_watches (bool): Whether to include default watches | ||||
|             version_tag (str): Version tag | ||||
|         """ | ||||
|         # Parse MongoDB URI from datastore_path | ||||
|         parsed_uri = urlparse(datastore_path) | ||||
|         self.db_name = parsed_uri.path.lstrip('/') | ||||
|         if not self.db_name: | ||||
|             self.db_name = 'changedetection' | ||||
|              | ||||
|         # Connect to MongoDB | ||||
|         self.client = MongoClient(datastore_path) | ||||
|         self.db = self.client[self.db_name] | ||||
|          | ||||
|         # Collections | ||||
|         self.app_collection = self.db['app'] | ||||
|         self.watches_collection = self.db['watches'] | ||||
|         self.snapshots_collection = self.db['snapshots'] | ||||
|         self.history_collection = self.db['history'] | ||||
|         self.error_collection = self.db['errors'] | ||||
|         self.xpath_collection = self.db['xpath'] | ||||
|         self.html_collection = self.db['html'] | ||||
|          | ||||
|         logger.info(f"MongoDB storage initialized, connected to {datastore_path}") | ||||
|      | ||||
|     def load_data(self): | ||||
|         """Load data from MongoDB | ||||
|          | ||||
|         Returns: | ||||
|             dict: The loaded data | ||||
|         """ | ||||
|         app_data = self.app_collection.find_one({'_id': 'app_data'}) | ||||
|         if not app_data: | ||||
|             return None | ||||
|              | ||||
|         # Remove MongoDB _id field | ||||
|         if '_id' in app_data: | ||||
|             del app_data['_id'] | ||||
|              | ||||
|         return app_data | ||||
|      | ||||
|     def save_data(self, data): | ||||
|         """Save data to MongoDB | ||||
|          | ||||
|         Args: | ||||
|             data (dict): The data to save | ||||
|         """ | ||||
|         try: | ||||
|             # Create a copy to modify | ||||
|             data_copy = deepcopy(data) | ||||
|              | ||||
|             # Set _id for app data | ||||
|             data_copy['_id'] = 'app_data' | ||||
|              | ||||
|             # Insert or update app data | ||||
|             self.app_collection.replace_one({'_id': 'app_data'}, data_copy, upsert=True) | ||||
|              | ||||
|             # Also store watches separately for more granular access | ||||
|             # This provides a safety net in case of corrupted app_data | ||||
|             watches = data.get('watching', {}) | ||||
|             for uuid, watch in watches.items(): | ||||
|                 if isinstance(watch, dict):  # Handle case where watch is a Watch object | ||||
|                     watch_copy = deepcopy(dict(watch)) | ||||
|                 else: | ||||
|                     watch_copy = deepcopy(watch) | ||||
|                 watch_copy['_id'] = uuid | ||||
|                 self.watches_collection.replace_one({'_id': uuid}, watch_copy, upsert=True) | ||||
|                  | ||||
|             # Also store tags separately | ||||
|             if 'settings' in data and 'application' in data['settings'] and 'tags' in data['settings']['application']: | ||||
|                 tags = data['settings']['application']['tags'] | ||||
|                 for uuid, tag in tags.items(): | ||||
|                     if isinstance(tag, dict):  # Handle case where tag is a Tag object | ||||
|                         tag_copy = deepcopy(dict(tag)) | ||||
|                     else: | ||||
|                         tag_copy = deepcopy(tag) | ||||
|                     tag_copy['_id'] = uuid | ||||
|                     self.db['tags'].replace_one({'_id': uuid}, tag_copy, upsert=True) | ||||
|                      | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error writing to MongoDB: {str(e)}") | ||||
|             raise e | ||||
|      | ||||
|     def ensure_data_dir_exists(self, watch_uuid): | ||||
|         """Ensure the data directory exists for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # MongoDB doesn't need directories, this is a no-op | ||||
|         pass | ||||
|      | ||||
|     def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id): | ||||
|         """Save history text to MongoDB | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Contents to save | ||||
|             timestamp (int): Timestamp | ||||
|             snapshot_id (str): Snapshot ID | ||||
|              | ||||
|         Returns: | ||||
|             str: Snapshot ID | ||||
|         """ | ||||
|         # Compress the contents | ||||
|         compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT) | ||||
|          | ||||
|         # Store the snapshot | ||||
|         snapshot_data = { | ||||
|             '_id': f"{watch_uuid}:{timestamp}", | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'timestamp': timestamp, | ||||
|             'snapshot_id': snapshot_id, | ||||
|             'contents': base64.b64encode(compressed_contents).decode('ascii'), | ||||
|             'compressed': True | ||||
|         } | ||||
|          | ||||
|         self.snapshots_collection.replace_one({'_id': snapshot_data['_id']}, snapshot_data, upsert=True) | ||||
|          | ||||
|         # Update history index | ||||
|         history_entry = { | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'timestamp': timestamp, | ||||
|             'snapshot_id': snapshot_id | ||||
|         } | ||||
|          | ||||
|         self.history_collection.replace_one( | ||||
|             {'watch_uuid': watch_uuid, 'timestamp': timestamp}, | ||||
|             history_entry,  | ||||
|             upsert=True | ||||
|         ) | ||||
|          | ||||
|         return snapshot_id | ||||
|      | ||||
|     def get_history(self, watch_uuid): | ||||
|         """Get history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             dict: The history with timestamp keys and snapshot IDs as values | ||||
|         """ | ||||
|         history = {} | ||||
|          | ||||
|         # Query history entries for this watch | ||||
|         entries = self.history_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', 1) | ||||
|          | ||||
|         for entry in entries: | ||||
|             history[str(entry['timestamp'])] = entry['snapshot_id'] | ||||
|              | ||||
|         return history | ||||
|      | ||||
|     def get_history_snapshot(self, watch_uuid, timestamp): | ||||
|         """Get a history snapshot from MongoDB | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str: The snapshot content | ||||
|         """ | ||||
|         # Query for the snapshot | ||||
|         snapshot = self.snapshots_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"}) | ||||
|          | ||||
|         if not snapshot: | ||||
|             return None | ||||
|              | ||||
|         if snapshot.get('compressed', False): | ||||
|             # Decompress the contents | ||||
|             compressed_data = base64.b64decode(snapshot['contents']) | ||||
|             return brotli.decompress(compressed_data).decode('utf-8') | ||||
|         else: | ||||
|             return snapshot['contents'] | ||||
|      | ||||
|     def save_screenshot(self, watch_uuid, screenshot, as_error=False): | ||||
|         """Save a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             screenshot (bytes): Screenshot data | ||||
|             as_error (bool): Whether this is an error screenshot | ||||
|         """ | ||||
|         collection_name = 'error_screenshots' if as_error else 'screenshots' | ||||
|         collection = self.db[collection_name] | ||||
|          | ||||
|         # Encode the screenshot as base64 | ||||
|         encoded_screenshot = base64.b64encode(screenshot).decode('ascii') | ||||
|          | ||||
|         screenshot_data = { | ||||
|             '_id': watch_uuid, | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'screenshot': encoded_screenshot, | ||||
|             'timestamp': int(time.time()) | ||||
|         } | ||||
|          | ||||
|         collection.replace_one({'_id': watch_uuid}, screenshot_data, upsert=True) | ||||
|      | ||||
|     def get_screenshot(self, watch_uuid, is_error=False): | ||||
|         """Get a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get the error screenshot | ||||
|              | ||||
|         Returns: | ||||
|             bytes or None: The screenshot data or None if not available | ||||
|         """ | ||||
|         collection_name = 'error_screenshots' if is_error else 'screenshots' | ||||
|         collection = self.db[collection_name] | ||||
|          | ||||
|         screenshot_data = collection.find_one({'_id': watch_uuid}) | ||||
|         if not screenshot_data: | ||||
|             return None | ||||
|              | ||||
|         # Decode the screenshot from base64 | ||||
|         return base64.b64decode(screenshot_data['screenshot']) | ||||
|      | ||||
|     def save_error_text(self, watch_uuid, contents): | ||||
|         """Save error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Error contents | ||||
|         """ | ||||
|         error_data = { | ||||
|             '_id': watch_uuid, | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'error_text': contents, | ||||
|             'timestamp': int(time.time()) | ||||
|         } | ||||
|          | ||||
|         self.error_collection.replace_one({'_id': watch_uuid}, error_data, upsert=True) | ||||
|      | ||||
|     def get_error_text(self, watch_uuid): | ||||
|         """Get error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The error text or False if not available | ||||
|         """ | ||||
|         error_data = self.error_collection.find_one({'_id': watch_uuid}) | ||||
|         if not error_data: | ||||
|             return False | ||||
|              | ||||
|         return error_data['error_text'] | ||||
|      | ||||
|     def save_xpath_data(self, watch_uuid, data, as_error=False): | ||||
|         """Save XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             data (dict): XPath data | ||||
|             as_error (bool): Whether this is error data | ||||
|         """ | ||||
|         # Compress the data | ||||
|         compressed_data = zlib.compress(json.dumps(data).encode()) | ||||
|          | ||||
|         _id = f"{watch_uuid}:error" if as_error else watch_uuid | ||||
|          | ||||
|         xpath_data = { | ||||
|             '_id': _id, | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'is_error': as_error, | ||||
|             'data': base64.b64encode(compressed_data).decode('ascii'), | ||||
|             'timestamp': int(time.time()) | ||||
|         } | ||||
|          | ||||
|         self.xpath_collection.replace_one({'_id': _id}, xpath_data, upsert=True) | ||||
|      | ||||
|     def get_xpath_data(self, watch_uuid, is_error=False): | ||||
|         """Get XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get error data | ||||
|              | ||||
|         Returns: | ||||
|             dict or None: The XPath data or None if not available | ||||
|         """ | ||||
|         _id = f"{watch_uuid}:error" if is_error else watch_uuid | ||||
|          | ||||
|         xpath_data = self.xpath_collection.find_one({'_id': _id}) | ||||
|         if not xpath_data: | ||||
|             return None | ||||
|              | ||||
|         # Decompress the data | ||||
|         compressed_data = base64.b64decode(xpath_data['data']) | ||||
|         return json.loads(zlib.decompress(compressed_data).decode('utf-8')) | ||||
|      | ||||
|     def save_last_fetched_html(self, watch_uuid, timestamp, contents): | ||||
|         """Save last fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|             contents (str): HTML contents | ||||
|         """ | ||||
|         # Compress the contents | ||||
|         contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents | ||||
|         try: | ||||
|             compressed_contents = brotli.compress(contents_bytes) | ||||
|         except Exception as e: | ||||
|             logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}") | ||||
|             compressed_contents = contents_bytes | ||||
|          | ||||
|         html_data = { | ||||
|             '_id': f"{watch_uuid}:{timestamp}", | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'timestamp': timestamp, | ||||
|             'html': base64.b64encode(compressed_contents).decode('ascii'), | ||||
|             'compressed': True | ||||
|         } | ||||
|          | ||||
|         self.html_collection.replace_one({'_id': html_data['_id']}, html_data, upsert=True) | ||||
|          | ||||
|         # Prune old snapshots - keep only the newest 2 | ||||
|         self._prune_last_fetched_html_snapshots(watch_uuid) | ||||
|      | ||||
|     def _prune_last_fetched_html_snapshots(self, watch_uuid): | ||||
|         """Prune old HTML snapshots | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # Get all HTML snapshots for this watch, sorted by timestamp descending | ||||
|         html_snapshots = list( | ||||
|             self.html_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', -1) | ||||
|         ) | ||||
|          | ||||
|         # Keep only the first 2 | ||||
|         if len(html_snapshots) > 2: | ||||
|             for snapshot in html_snapshots[2:]: | ||||
|                 self.html_collection.delete_one({'_id': snapshot['_id']}) | ||||
|      | ||||
|     def get_fetched_html(self, watch_uuid, timestamp): | ||||
|         """Get fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The HTML or False if not available | ||||
|         """ | ||||
|         html_data = self.html_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"}) | ||||
|          | ||||
|         if not html_data: | ||||
|             return False | ||||
|              | ||||
|         if html_data.get('compressed', False): | ||||
|             # Decompress the contents | ||||
|             compressed_data = base64.b64decode(html_data['html']) | ||||
|             return brotli.decompress(compressed_data).decode('utf-8') | ||||
|         else: | ||||
|             return html_data['html'] | ||||
|      | ||||
|     def save_last_text_fetched_before_filters(self, watch_uuid, contents): | ||||
|         """Save the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Text contents | ||||
|         """ | ||||
|         # Compress the contents | ||||
|         compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT) | ||||
|          | ||||
|         last_fetched_data = { | ||||
|             '_id': watch_uuid, | ||||
|             'watch_uuid': watch_uuid, | ||||
|             'contents': base64.b64encode(compressed_contents).decode('ascii'), | ||||
|             'timestamp': int(time.time()) | ||||
|         } | ||||
|          | ||||
|         self.db['last_fetched'].replace_one({'_id': watch_uuid}, last_fetched_data, upsert=True) | ||||
|      | ||||
|     def get_last_fetched_text_before_filters(self, watch_uuid): | ||||
|         """Get the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The text | ||||
|         """ | ||||
|         last_fetched_data = self.db['last_fetched'].find_one({'_id': watch_uuid}) | ||||
|          | ||||
|         if not last_fetched_data: | ||||
|             # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead | ||||
|             history = self.get_history(watch_uuid) | ||||
|             dates = list(history.keys()) | ||||
|              | ||||
|             if len(dates): | ||||
|                 return self.get_history_snapshot(watch_uuid, dates[-1]) | ||||
|             else: | ||||
|                 return '' | ||||
|                  | ||||
|         # Decompress the contents | ||||
|         compressed_data = base64.b64decode(last_fetched_data['contents']) | ||||
|         return brotli.decompress(compressed_data).decode('utf-8') | ||||
|      | ||||
|     def visualselector_data_is_ready(self, watch_uuid): | ||||
|         """Check if visual selector data is ready | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             bool: Whether visual selector data is ready | ||||
|         """ | ||||
|         # Check if screenshot and xpath data exist | ||||
|         screenshot = self.db['screenshots'].find_one({'_id': watch_uuid}) | ||||
|         xpath_data = self.xpath_collection.find_one({'_id': watch_uuid}) | ||||
|          | ||||
|         return screenshot is not None and xpath_data is not None | ||||
|      | ||||
|     def clear_watch_history(self, watch_uuid): | ||||
|         """Clear history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # Delete all snapshots and history for this watch | ||||
|         self.snapshots_collection.delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.history_collection.delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.html_collection.delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.db['last_fetched'].delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.xpath_collection.delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.db['screenshots'].delete_many({'watch_uuid': watch_uuid}) | ||||
|         self.error_collection.delete_many({'watch_uuid': watch_uuid}) | ||||
|      | ||||
|     def delete_watch(self, watch_uuid): | ||||
|         """Delete a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # Clear all history data | ||||
|         self.clear_watch_history(watch_uuid) | ||||
|          | ||||
|         # Also delete error screenshots | ||||
|         self.db['error_screenshots'].delete_many({'watch_uuid': watch_uuid}) | ||||
							
								
								
									
										525
									
								
								changedetectionio/storage/s3_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										525
									
								
								changedetectionio/storage/s3_storage.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,525 @@ | ||||
| import os | ||||
| import io | ||||
| import json | ||||
| import brotli | ||||
| import zlib | ||||
| import time | ||||
| from loguru import logger | ||||
| import boto3 | ||||
| from urllib.parse import urlparse | ||||
| import base64 | ||||
|  | ||||
| from .storage_base import StorageBase | ||||
|  | ||||
| class S3Storage(StorageBase): | ||||
|     """Amazon S3 storage backend""" | ||||
|      | ||||
|     def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"): | ||||
|         """Initialize the S3 storage backend | ||||
|          | ||||
|         Args: | ||||
|             datastore_path (str): S3 URI (s3://bucket-name/optional-prefix) | ||||
|             include_default_watches (bool): Whether to include default watches | ||||
|             version_tag (str): Version tag | ||||
|         """ | ||||
|         # Parse S3 URI | ||||
|         parsed_uri = urlparse(datastore_path) | ||||
|         self.bucket_name = parsed_uri.netloc | ||||
|         self.prefix = parsed_uri.path.lstrip('/') | ||||
|          | ||||
|         if self.prefix and not self.prefix.endswith('/'): | ||||
|             self.prefix += '/' | ||||
|              | ||||
|         # Initialize S3 client | ||||
|         # Uses AWS credentials from environment variables or IAM role | ||||
|         self.s3 = boto3.client('s3') | ||||
|          | ||||
|         logger.info(f"S3 storage initialized, using bucket '{self.bucket_name}' with prefix '{self.prefix}'") | ||||
|      | ||||
|     def _get_key(self, path): | ||||
|         """Get the S3 key for a path | ||||
|          | ||||
|         Args: | ||||
|             path (str): Path relative to the prefix | ||||
|              | ||||
|         Returns: | ||||
|             str: The full S3 key | ||||
|         """ | ||||
|         return f"{self.prefix}{path}" | ||||
|      | ||||
|     def load_data(self): | ||||
|         """Load data from S3 | ||||
|          | ||||
|         Returns: | ||||
|             dict: The loaded data | ||||
|         """ | ||||
|         key = self._get_key("app-data.json") | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             return json.loads(response['Body'].read().decode('utf-8')) | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return None | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error loading data from S3: {str(e)}") | ||||
|             raise e | ||||
|      | ||||
|     def save_data(self, data): | ||||
|         """Save data to S3 | ||||
|          | ||||
|         Args: | ||||
|             data (dict): The data to save | ||||
|         """ | ||||
|         try: | ||||
|             key = self._get_key("app-data.json") | ||||
|             self.s3.put_object( | ||||
|                 Bucket=self.bucket_name, | ||||
|                 Key=key, | ||||
|                 Body=json.dumps(data, indent=4), | ||||
|                 ContentType='application/json' | ||||
|             ) | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error saving data to S3: {str(e)}") | ||||
|             raise e | ||||
|      | ||||
|     def ensure_data_dir_exists(self, watch_uuid): | ||||
|         """Ensure the data directory exists for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # S3 doesn't need directories, this is a no-op | ||||
|         pass | ||||
|      | ||||
|     def _get_watch_prefix(self, watch_uuid): | ||||
|         """Get the S3 prefix for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The watch prefix | ||||
|         """ | ||||
|         return self._get_key(f"watches/{watch_uuid}/") | ||||
|      | ||||
|     def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id): | ||||
|         """Save history text to S3 | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Contents to save | ||||
|             timestamp (int): Timestamp | ||||
|             snapshot_id (str): Snapshot ID | ||||
|              | ||||
|         Returns: | ||||
|             str: Snapshot ID | ||||
|         """ | ||||
|         # Determine if we should compress | ||||
|         threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024)) | ||||
|         skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't') | ||||
|          | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         # Save the snapshot | ||||
|         if not skip_brotli and len(contents) > threshold: | ||||
|             snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt.br" | ||||
|             snapshot_fname = f"{snapshot_id}.txt.br" | ||||
|             compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT) | ||||
|              | ||||
|             self.s3.put_object( | ||||
|                 Bucket=self.bucket_name, | ||||
|                 Key=snapshot_key, | ||||
|                 Body=compressed_contents | ||||
|             ) | ||||
|         else: | ||||
|             snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt" | ||||
|             snapshot_fname = f"{snapshot_id}.txt" | ||||
|              | ||||
|             self.s3.put_object( | ||||
|                 Bucket=self.bucket_name, | ||||
|                 Key=snapshot_key, | ||||
|                 Body=contents.encode('utf-8') | ||||
|             ) | ||||
|          | ||||
|         # Update history index | ||||
|         history_key = f"{watch_prefix}history.txt" | ||||
|          | ||||
|         # Try to get existing history first | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key) | ||||
|             history_content = response['Body'].read().decode('utf-8') | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             history_content = "" | ||||
|          | ||||
|         # Append new entry | ||||
|         history_content += f"{timestamp},{snapshot_fname}\n" | ||||
|          | ||||
|         # Save updated history | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=history_key, | ||||
|             Body=history_content | ||||
|         ) | ||||
|          | ||||
|         return snapshot_fname | ||||
|      | ||||
|     def get_history(self, watch_uuid): | ||||
|         """Get history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             dict: The history with timestamp keys and snapshot IDs as values | ||||
|         """ | ||||
|         tmp_history = {} | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         history_key = f"{watch_prefix}history.txt" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key) | ||||
|             history_content = response['Body'].read().decode('utf-8') | ||||
|              | ||||
|             for line in history_content.splitlines(): | ||||
|                 if ',' in line: | ||||
|                     k, v = line.strip().split(',', 2) | ||||
|                     tmp_history[k] = f"{watch_prefix}snapshots/{v}" | ||||
|              | ||||
|             return tmp_history | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return {} | ||||
|      | ||||
|     def get_history_snapshot(self, watch_uuid, timestamp): | ||||
|         """Get a history snapshot from S3 | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str: The snapshot content | ||||
|         """ | ||||
|         history = self.get_history(watch_uuid) | ||||
|         if not timestamp in history: | ||||
|             return None | ||||
|              | ||||
|         key = history[timestamp] | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             content = response['Body'].read() | ||||
|              | ||||
|             if key.endswith('.br'): | ||||
|                 # Decompress brotli | ||||
|                 return brotli.decompress(content).decode('utf-8') | ||||
|             else: | ||||
|                 return content.decode('utf-8') | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error reading snapshot from S3: {str(e)}") | ||||
|             return None | ||||
|      | ||||
|     def save_screenshot(self, watch_uuid, screenshot, as_error=False): | ||||
|         """Save a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             screenshot (bytes): Screenshot data | ||||
|             as_error (bool): Whether this is an error screenshot | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         if as_error: | ||||
|             key = f"{watch_prefix}last-error-screenshot.png" | ||||
|         else: | ||||
|             key = f"{watch_prefix}last-screenshot.png" | ||||
|          | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=key, | ||||
|             Body=screenshot, | ||||
|             ContentType='image/png' | ||||
|         ) | ||||
|      | ||||
|     def get_screenshot(self, watch_uuid, is_error=False): | ||||
|         """Get a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get the error screenshot | ||||
|              | ||||
|         Returns: | ||||
|             bytes or None: The screenshot data or None if not available | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         if is_error: | ||||
|             key = f"{watch_prefix}last-error-screenshot.png" | ||||
|         else: | ||||
|             key = f"{watch_prefix}last-screenshot.png" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             return response['Body'].read() | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return None | ||||
|      | ||||
|     def save_error_text(self, watch_uuid, contents): | ||||
|         """Save error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Error contents | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}last-error.txt" | ||||
|          | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=key, | ||||
|             Body=contents.encode('utf-8') | ||||
|         ) | ||||
|      | ||||
|     def get_error_text(self, watch_uuid): | ||||
|         """Get error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The error text or False if not available | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}last-error.txt" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             return response['Body'].read().decode('utf-8') | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return False | ||||
|      | ||||
|     def save_xpath_data(self, watch_uuid, data, as_error=False): | ||||
|         """Save XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             data (dict): XPath data | ||||
|             as_error (bool): Whether this is error data | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         if as_error: | ||||
|             key = f"{watch_prefix}elements-error.deflate" | ||||
|         else: | ||||
|             key = f"{watch_prefix}elements.deflate" | ||||
|          | ||||
|         compressed_data = zlib.compress(json.dumps(data).encode()) | ||||
|          | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=key, | ||||
|             Body=compressed_data | ||||
|         ) | ||||
|      | ||||
|     def get_xpath_data(self, watch_uuid, is_error=False): | ||||
|         """Get XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get error data | ||||
|              | ||||
|         Returns: | ||||
|             dict or None: The XPath data or None if not available | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         if is_error: | ||||
|             key = f"{watch_prefix}elements-error.deflate" | ||||
|         else: | ||||
|             key = f"{watch_prefix}elements.deflate" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             compressed_data = response['Body'].read() | ||||
|             return json.loads(zlib.decompress(compressed_data).decode('utf-8')) | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return None | ||||
|      | ||||
|     def save_last_fetched_html(self, watch_uuid, timestamp, contents): | ||||
|         """Save last fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|             contents (str): HTML contents | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}html/{timestamp}.html.br" | ||||
|          | ||||
|         contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents | ||||
|         try: | ||||
|             compressed_contents = brotli.compress(contents_bytes) | ||||
|         except Exception as e: | ||||
|             logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}") | ||||
|             compressed_contents = contents_bytes | ||||
|          | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=key, | ||||
|             Body=compressed_contents | ||||
|         ) | ||||
|          | ||||
|         # Prune old snapshots - keep only the newest 2 | ||||
|         self._prune_last_fetched_html_snapshots(watch_uuid) | ||||
|      | ||||
|     def _prune_last_fetched_html_snapshots(self, watch_uuid): | ||||
|         """Prune old HTML snapshots | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         html_prefix = f"{watch_prefix}html/" | ||||
|          | ||||
|         # List all HTML snapshots | ||||
|         response = self.s3.list_objects_v2( | ||||
|             Bucket=self.bucket_name, | ||||
|             Prefix=html_prefix | ||||
|         ) | ||||
|          | ||||
|         if 'Contents' not in response: | ||||
|             return | ||||
|              | ||||
|         # Sort by timestamp (extract from key) | ||||
|         html_files = sorted( | ||||
|             response['Contents'], | ||||
|             key=lambda x: int(x['Key'].split('/')[-1].split('.')[0]), | ||||
|             reverse=True | ||||
|         ) | ||||
|          | ||||
|         # Delete all but the newest 2 | ||||
|         if len(html_files) > 2: | ||||
|             for file in html_files[2:]: | ||||
|                 self.s3.delete_object( | ||||
|                     Bucket=self.bucket_name, | ||||
|                     Key=file['Key'] | ||||
|                 ) | ||||
|      | ||||
|     def get_fetched_html(self, watch_uuid, timestamp): | ||||
|         """Get fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The HTML or False if not available | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}html/{timestamp}.html.br" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             compressed_data = response['Body'].read() | ||||
|             return brotli.decompress(compressed_data).decode('utf-8') | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             return False | ||||
|      | ||||
|     def save_last_text_fetched_before_filters(self, watch_uuid, contents): | ||||
|         """Save the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Text contents | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}last-fetched.br" | ||||
|          | ||||
|         compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT) | ||||
|          | ||||
|         self.s3.put_object( | ||||
|             Bucket=self.bucket_name, | ||||
|             Key=key, | ||||
|             Body=compressed_contents | ||||
|         ) | ||||
|      | ||||
|     def get_last_fetched_text_before_filters(self, watch_uuid): | ||||
|         """Get the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The text | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         key = f"{watch_prefix}last-fetched.br" | ||||
|          | ||||
|         try: | ||||
|             response = self.s3.get_object(Bucket=self.bucket_name, Key=key) | ||||
|             compressed_data = response['Body'].read() | ||||
|             return brotli.decompress(compressed_data).decode('utf-8') | ||||
|         except self.s3.exceptions.NoSuchKey: | ||||
|             # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead | ||||
|             history = self.get_history(watch_uuid) | ||||
|             dates = list(history.keys()) | ||||
|              | ||||
|             if len(dates): | ||||
|                 return self.get_history_snapshot(watch_uuid, dates[-1]) | ||||
|             else: | ||||
|                 return '' | ||||
|      | ||||
|     def visualselector_data_is_ready(self, watch_uuid): | ||||
|         """Check if visual selector data is ready | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             bool: Whether visual selector data is ready | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|         screenshot_key = f"{watch_prefix}last-screenshot.png" | ||||
|         elements_key = f"{watch_prefix}elements.deflate" | ||||
|          | ||||
|         try: | ||||
|             # Just check if both files exist | ||||
|             self.s3.head_object(Bucket=self.bucket_name, Key=screenshot_key) | ||||
|             self.s3.head_object(Bucket=self.bucket_name, Key=elements_key) | ||||
|             return True | ||||
|         except self.s3.exceptions.ClientError: | ||||
|             return False | ||||
|      | ||||
|     def clear_watch_history(self, watch_uuid): | ||||
|         """Clear history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         watch_prefix = self._get_watch_prefix(watch_uuid) | ||||
|          | ||||
|         # List all objects with this watch's prefix | ||||
|         paginator = self.s3.get_paginator('list_objects_v2') | ||||
|         pages = paginator.paginate( | ||||
|             Bucket=self.bucket_name, | ||||
|             Prefix=watch_prefix | ||||
|         ) | ||||
|          | ||||
|         # Delete all objects in batches | ||||
|         for page in pages: | ||||
|             if 'Contents' not in page: | ||||
|                 continue | ||||
|                  | ||||
|             delete_keys = {'Objects': [{'Key': obj['Key']} for obj in page['Contents']]} | ||||
|             self.s3.delete_objects( | ||||
|                 Bucket=self.bucket_name, | ||||
|                 Delete=delete_keys | ||||
|             ) | ||||
|      | ||||
|     def delete_watch(self, watch_uuid): | ||||
|         """Delete a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         # Same implementation as clear_watch_history for S3 | ||||
|         self.clear_watch_history(watch_uuid) | ||||
							
								
								
									
										230
									
								
								changedetectionio/storage/storage_base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										230
									
								
								changedetectionio/storage/storage_base.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,230 @@ | ||||
| from abc import ABC, abstractmethod | ||||
| import json | ||||
| from loguru import logger | ||||
|  | ||||
| class StorageBase(ABC): | ||||
|     """Abstract base class for storage backends""" | ||||
|      | ||||
|     @abstractmethod | ||||
|     def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"): | ||||
|         """Initialize the storage backend | ||||
|          | ||||
|         Args: | ||||
|             datastore_path (str): Path to the datastore | ||||
|             include_default_watches (bool): Whether to include default watches | ||||
|             version_tag (str): Version tag | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def load_data(self): | ||||
|         """Load data from the storage backend | ||||
|          | ||||
|         Returns: | ||||
|             dict: The loaded data | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_data(self, data): | ||||
|         """Save data to the storage backend | ||||
|          | ||||
|         Args: | ||||
|             data (dict): The data to save | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id): | ||||
|         """Save history text to the storage backend | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Contents to save | ||||
|             timestamp (int): Timestamp | ||||
|             snapshot_id (str): Snapshot ID | ||||
|              | ||||
|         Returns: | ||||
|             str: Snapshot filename or ID | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_history_snapshot(self, watch_uuid, timestamp): | ||||
|         """Get a history snapshot from the storage backend | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str: The snapshot content | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_history(self, watch_uuid): | ||||
|         """Get history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             dict: The history with timestamp keys and snapshot IDs as values | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_screenshot(self, watch_uuid, screenshot, as_error=False): | ||||
|         """Save a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             screenshot (bytes): Screenshot data | ||||
|             as_error (bool): Whether this is an error screenshot | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_screenshot(self, watch_uuid, is_error=False): | ||||
|         """Get a screenshot for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get the error screenshot | ||||
|              | ||||
|         Returns: | ||||
|             str or None: The screenshot path or None if not available | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_error_text(self, watch_uuid, contents): | ||||
|         """Save error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Error contents | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_error_text(self, watch_uuid): | ||||
|         """Get error text for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The error text or False if not available | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_xpath_data(self, watch_uuid, data, as_error=False): | ||||
|         """Save XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             data (dict): XPath data | ||||
|             as_error (bool): Whether this is error data | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_xpath_data(self, watch_uuid, is_error=False): | ||||
|         """Get XPath data for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             is_error (bool): Whether to get error data | ||||
|              | ||||
|         Returns: | ||||
|             dict or None: The XPath data or None if not available | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_last_fetched_html(self, watch_uuid, timestamp, contents): | ||||
|         """Save last fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|             contents (str): HTML contents | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_fetched_html(self, watch_uuid, timestamp): | ||||
|         """Get fetched HTML for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             timestamp (int): Timestamp | ||||
|              | ||||
|         Returns: | ||||
|             str or False: The HTML or False if not available | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def save_last_text_fetched_before_filters(self, watch_uuid, contents): | ||||
|         """Save the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|             contents (str): Text contents | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def get_last_fetched_text_before_filters(self, watch_uuid): | ||||
|         """Get the last text fetched before filters | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             str: The text | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def ensure_data_dir_exists(self, watch_uuid): | ||||
|         """Ensure the data directory exists for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def visualselector_data_is_ready(self, watch_uuid): | ||||
|         """Check if visual selector data is ready | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|              | ||||
|         Returns: | ||||
|             bool: Whether visual selector data is ready | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def clear_watch_history(self, watch_uuid): | ||||
|         """Clear history for a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         pass | ||||
|      | ||||
|     @abstractmethod | ||||
|     def delete_watch(self, watch_uuid): | ||||
|         """Delete a watch | ||||
|          | ||||
|         Args: | ||||
|             watch_uuid (str): Watch UUID | ||||
|         """ | ||||
|         pass | ||||
							
								
								
									
										33
									
								
								changedetectionio/storage/storage_factory.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								changedetectionio/storage/storage_factory.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,33 @@ | ||||
| import re | ||||
| from loguru import logger | ||||
| from urllib.parse import urlparse | ||||
|  | ||||
| from .storage_base import StorageBase | ||||
| from .filesystem_storage import FileSystemStorage | ||||
| from .mongodb_storage import MongoDBStorage | ||||
| from .s3_storage import S3Storage | ||||
|  | ||||
| def create_storage(datastore_path, include_default_watches=True, version_tag="0.0.0"): | ||||
|     """Create a storage backend based on the datastore path | ||||
|      | ||||
|     Args: | ||||
|         datastore_path (str): Path to the datastore | ||||
|         include_default_watches (bool): Whether to include default watches | ||||
|         version_tag (str): Version tag | ||||
|          | ||||
|     Returns: | ||||
|         StorageBase: The storage backend | ||||
|     """ | ||||
|     # Check if it's a MongoDB URI | ||||
|     if datastore_path.startswith('mongodb://') or datastore_path.startswith('mongodb+srv://'): | ||||
|         logger.info(f"Using MongoDB storage backend with URI {datastore_path}") | ||||
|         return MongoDBStorage(datastore_path, include_default_watches, version_tag) | ||||
|      | ||||
|     # Check if it's an S3 URI | ||||
|     if datastore_path.startswith('s3://'): | ||||
|         logger.info(f"Using S3 storage backend with URI {datastore_path}") | ||||
|         return S3Storage(datastore_path, include_default_watches, version_tag) | ||||
|      | ||||
|     # Default to filesystem | ||||
|     logger.info(f"Using filesystem storage backend with path {datastore_path}") | ||||
|     return FileSystemStorage(datastore_path, include_default_watches, version_tag) | ||||
| @@ -20,6 +20,7 @@ from loguru import logger | ||||
|  | ||||
| from .processors import get_custom_watch_obj_for_processor | ||||
| from .processors.restock_diff import Restock | ||||
| from .storage.storage_factory import create_storage | ||||
|  | ||||
| # Because the server will run as a daemon and wont know the URL for notification links when firing off a notification | ||||
| BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)' | ||||
| @@ -38,20 +39,28 @@ class ChangeDetectionStore: | ||||
|     needs_write_urgent = False | ||||
|  | ||||
|     __version_check = True | ||||
|      | ||||
|     # Singleton instance for access from Watch class methods | ||||
|     instance = None | ||||
|  | ||||
|     def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"): | ||||
|         # Should only be active for docker | ||||
|         # logging.basicConfig(filename='/dev/stdout', level=logging.INFO) | ||||
|         self.__data = App.model() | ||||
|         self.datastore_path = datastore_path | ||||
|         self.json_store_path = "{}/url-watches.json".format(self.datastore_path) | ||||
|         logger.info(f"Datastore path is '{self.json_store_path}'") | ||||
|          | ||||
|         # Create the appropriate storage backend based on the datastore path | ||||
|         self.storage = create_storage(datastore_path, include_default_watches, version_tag) | ||||
|          | ||||
|         self.needs_write = False | ||||
|         self.start_time = time.time() | ||||
|         self.stop_thread = False | ||||
|         # Base definition for all watchers | ||||
|         # deepcopy part of #569 - not sure why its needed exactly | ||||
|         self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={})) | ||||
|         self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={})) | ||||
|          | ||||
|         # Set singleton instance | ||||
|         ChangeDetectionStore.instance = self | ||||
|  | ||||
|         if path.isfile('changedetectionio/source.txt'): | ||||
|             with open('changedetectionio/source.txt') as f: | ||||
| @@ -60,10 +69,9 @@ class ChangeDetectionStore: | ||||
|                 self.__data['build_sha'] = f.read() | ||||
|  | ||||
|         try: | ||||
|             # @todo retest with ", encoding='utf-8'" | ||||
|             with open(self.json_store_path) as json_file: | ||||
|                 from_disk = json.load(json_file) | ||||
|  | ||||
|             # Load data from storage | ||||
|             from_disk = self.storage.load_data() | ||||
|             if from_disk: | ||||
|                 # @todo isnt there a way todo this dict.update recursively? | ||||
|                 # Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore. | ||||
|                 if 'watching' in from_disk: | ||||
| @@ -91,22 +99,24 @@ class ChangeDetectionStore: | ||||
|                 for uuid, tag in self.__data['settings']['application']['tags'].items(): | ||||
|                     self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff') | ||||
|                     logger.info(f"Tag: {uuid} {tag['title']}") | ||||
|             else: | ||||
|                 # First time ran, Create the datastore. | ||||
|                 if include_default_watches: | ||||
|                     logger.critical(f"No data store found, creating new store") | ||||
|                     self.add_watch(url='https://news.ycombinator.com/', | ||||
|                                   tag='Tech news', | ||||
|                                   extras={'fetch_backend': 'html_requests'}) | ||||
|  | ||||
|         # First time ran, Create the datastore. | ||||
|         except (FileNotFoundError): | ||||
|             if include_default_watches: | ||||
|                 logger.critical(f"No JSON DB found at {self.json_store_path}, creating JSON store at {self.datastore_path}") | ||||
|                 self.add_watch(url='https://news.ycombinator.com/', | ||||
|                                tag='Tech news', | ||||
|                                extras={'fetch_backend': 'html_requests'}) | ||||
|                     self.add_watch(url='https://changedetection.io/CHANGELOG.txt', | ||||
|                                   tag='changedetection.io', | ||||
|                                   extras={'fetch_backend': 'html_requests'}) | ||||
|  | ||||
|                 self.add_watch(url='https://changedetection.io/CHANGELOG.txt', | ||||
|                                tag='changedetection.io', | ||||
|                                extras={'fetch_backend': 'html_requests'}) | ||||
|  | ||||
|             updates_available = self.get_updates_available() | ||||
|             self.__data['settings']['application']['schema_version'] = updates_available.pop() | ||||
|                 updates_available = self.get_updates_available() | ||||
|                 self.__data['settings']['application']['schema_version'] = updates_available.pop() | ||||
|  | ||||
|         except Exception as e: | ||||
|             logger.error(f"Error loading data from storage: {str(e)}") | ||||
|             raise e | ||||
|         else: | ||||
|             # Bump the update version by running updates | ||||
|             self.run_updates() | ||||
| @@ -227,23 +237,15 @@ class ChangeDetectionStore: | ||||
|  | ||||
|     # Delete a single watch by UUID | ||||
|     def delete(self, uuid): | ||||
|         import pathlib | ||||
|         import shutil | ||||
|  | ||||
|         with self.lock: | ||||
|             if uuid == 'all': | ||||
|                 # Delete all watches | ||||
|                 for watch_uuid in list(self.data['watching'].keys()): | ||||
|                     self.storage.delete_watch(watch_uuid) | ||||
|                 self.__data['watching'] = {} | ||||
|  | ||||
|                 # GitHub #30 also delete history records | ||||
|                 for uuid in self.data['watching']: | ||||
|                     path = pathlib.Path(os.path.join(self.datastore_path, uuid)) | ||||
|                     if os.path.exists(path): | ||||
|                         shutil.rmtree(path) | ||||
|  | ||||
|             else: | ||||
|                 path = pathlib.Path(os.path.join(self.datastore_path, uuid)) | ||||
|                 if os.path.exists(path): | ||||
|                     shutil.rmtree(path) | ||||
|                 # Delete a single watch | ||||
|                 self.storage.delete_watch(uuid) | ||||
|                 del self.data['watching'][uuid] | ||||
|  | ||||
|         self.needs_write_urgent = True | ||||
| @@ -266,6 +268,7 @@ class ChangeDetectionStore: | ||||
|  | ||||
|     # Remove a watchs data but keep the entry (URL etc) | ||||
|     def clear_watch_history(self, uuid): | ||||
|         self.storage.clear_watch_history(uuid) | ||||
|         self.__data['watching'][uuid].clear_watch() | ||||
|         self.needs_write_urgent = True | ||||
|  | ||||
| @@ -372,43 +375,30 @@ class ChangeDetectionStore: | ||||
|         return new_uuid | ||||
|  | ||||
|     def visualselector_data_is_ready(self, watch_uuid): | ||||
|         output_path = "{}/{}".format(self.datastore_path, watch_uuid) | ||||
|         screenshot_filename = "{}/last-screenshot.png".format(output_path) | ||||
|         elements_index_filename = "{}/elements.deflate".format(output_path) | ||||
|         if path.isfile(screenshot_filename) and  path.isfile(elements_index_filename) : | ||||
|             return True | ||||
|  | ||||
|         return False | ||||
|         return self.storage.visualselector_data_is_ready(watch_uuid) | ||||
|  | ||||
|     def sync_to_json(self): | ||||
|         logger.info("Saving JSON..") | ||||
|         logger.info("Saving data to storage backend...") | ||||
|         try: | ||||
|             data = deepcopy(self.__data) | ||||
|         except RuntimeError as e: | ||||
|             # Try again in 15 seconds | ||||
|             time.sleep(15) | ||||
|             logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}") | ||||
|             logger.error(f"! Data changed when writing to storage, trying again.. {str(e)}") | ||||
|             self.sync_to_json() | ||||
|             return | ||||
|         else: | ||||
|  | ||||
|             try: | ||||
|                 # Re #286  - First write to a temp file, then confirm it looks OK and rename it | ||||
|                 # This is a fairly basic strategy to deal with the case that the file is corrupted, | ||||
|                 # system was out of memory, out of RAM etc | ||||
|                 with open(self.json_store_path+".tmp", 'w') as json_file: | ||||
|                     json.dump(data, json_file, indent=4) | ||||
|                 os.replace(self.json_store_path+".tmp", self.json_store_path) | ||||
|                 self.storage.save_data(data) | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}") | ||||
|                 logger.error(f"Error writing to storage backend: {str(e)}") | ||||
|  | ||||
|             self.needs_write = False | ||||
|             self.needs_write_urgent = False | ||||
|  | ||||
|     # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON | ||||
|     # Thread runner, this helps with thread/write issues when there are many operations that want to update the data | ||||
|     # by just running periodically in one thread, according to python, dict updates are threadsafe. | ||||
|     def save_datastore(self): | ||||
|  | ||||
|         while True: | ||||
|             if self.stop_thread: | ||||
|                 # Suppressing "Logging error in Loguru Handler #0" during CICD. | ||||
|   | ||||
| @@ -100,3 +100,6 @@ referencing==0.35.1 | ||||
|  | ||||
| # Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !) | ||||
| tzdata | ||||
|  | ||||
| pymongo>=4.3.3 | ||||
| boto3>=1.26.0 | ||||
|   | ||||
		Reference in New Issue
	
	Block a user