mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			0.50.26
			...
			abstracted
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					80434fa16a | ||
| 
						 | 
					db10422415 | ||
| 
						 | 
					380e571ded | 
@@ -5,6 +5,7 @@ recursive-include changedetectionio/content_fetchers *
 | 
			
		||||
recursive-include changedetectionio/model *
 | 
			
		||||
recursive-include changedetectionio/processors *
 | 
			
		||||
recursive-include changedetectionio/static *
 | 
			
		||||
recursive-include changedetectionio/storage *
 | 
			
		||||
recursive-include changedetectionio/templates *
 | 
			
		||||
recursive-include changedetectionio/tests *
 | 
			
		||||
prune changedetectionio/static/package-lock.json
 | 
			
		||||
 
 | 
			
		||||
@@ -299,34 +299,17 @@ class model(watch_base):
 | 
			
		||||
    # Save some text file to the appropriate path and bump the history
 | 
			
		||||
    # result_obj from fetch_site_status.run()
 | 
			
		||||
    def save_history_text(self, contents, timestamp, snapshot_id):
 | 
			
		||||
        import brotli
 | 
			
		||||
        from changedetectionio.store import ChangeDetectionStore
 | 
			
		||||
        from changedetectionio.storage.filesystem_storage import FileSystemStorage
 | 
			
		||||
 | 
			
		||||
        logger.trace(f"{self.get('uuid')} - Updating history.txt with timestamp {timestamp}")
 | 
			
		||||
 | 
			
		||||
        self.ensure_data_dir_exists()
 | 
			
		||||
 | 
			
		||||
        threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
 | 
			
		||||
        skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
 | 
			
		||||
 | 
			
		||||
        if not skip_brotli and len(contents) > threshold:
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt.br"
 | 
			
		||||
            dest = os.path.join(self.watch_data_dir, snapshot_fname)
 | 
			
		||||
            if not os.path.exists(dest):
 | 
			
		||||
                with open(dest, 'wb') as f:
 | 
			
		||||
                    f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
 | 
			
		||||
        else:
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt"
 | 
			
		||||
            dest = os.path.join(self.watch_data_dir, snapshot_fname)
 | 
			
		||||
            if not os.path.exists(dest):
 | 
			
		||||
                with open(dest, 'wb') as f:
 | 
			
		||||
                    f.write(contents.encode('utf-8'))
 | 
			
		||||
 | 
			
		||||
        # Append to index
 | 
			
		||||
        # @todo check last char was \n
 | 
			
		||||
        index_fname = os.path.join(self.watch_data_dir, "history.txt")
 | 
			
		||||
        with open(index_fname, 'a') as f:
 | 
			
		||||
            f.write("{},{}\n".format(timestamp, snapshot_fname))
 | 
			
		||||
            f.close()
 | 
			
		||||
        # Get storage from singleton store or create a filesystem storage as default
 | 
			
		||||
        store = ChangeDetectionStore.instance if hasattr(ChangeDetectionStore, 'instance') else None
 | 
			
		||||
        storage = store.storage if store and hasattr(store, 'storage') else FileSystemStorage(self.__datastore_path)
 | 
			
		||||
        
 | 
			
		||||
        # Use the storage backend to save the history text
 | 
			
		||||
        snapshot_fname = storage.save_history_text(self.get('uuid'), contents, timestamp, snapshot_id)
 | 
			
		||||
 | 
			
		||||
        self.__newest_history_key = timestamp
 | 
			
		||||
        self.__history_n += 1
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										61
									
								
								changedetectionio/storage/README.md
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										61
									
								
								changedetectionio/storage/README.md
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,61 @@
 | 
			
		||||
# Storage Backends for changedetection.io
 | 
			
		||||
 | 
			
		||||
This module provides different storage backends for changedetection.io, allowing you to store data in various systems:
 | 
			
		||||
 | 
			
		||||
- **FileSystemStorage**: The default storage backend that stores data on the local filesystem.
 | 
			
		||||
- **MongoDBStorage**: Stores data in a MongoDB database.
 | 
			
		||||
- **S3Storage**: Stores data in an Amazon S3 bucket.
 | 
			
		||||
 | 
			
		||||
## Usage
 | 
			
		||||
 | 
			
		||||
The storage backend is automatically selected based on the datastore path provided when initializing the application:
 | 
			
		||||
 | 
			
		||||
- For filesystem storage (default): `/datastore`
 | 
			
		||||
- For MongoDB storage: `mongodb://username:password@host:port/database`
 | 
			
		||||
- For S3 storage: `s3://bucket-name/optional-prefix`
 | 
			
		||||
 | 
			
		||||
## Configuration
 | 
			
		||||
 | 
			
		||||
### Filesystem Storage
 | 
			
		||||
 | 
			
		||||
The default storage backend. Simply specify a directory path:
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
changedetection.io -d /path/to/datastore
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### MongoDB Storage
 | 
			
		||||
 | 
			
		||||
To use MongoDB storage, specify a MongoDB connection URI:
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
changedetection.io -d mongodb://username:password@host:port/database
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Make sure to install the required dependencies:
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
pip install -r requirements-storage.txt
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
### Amazon S3 Storage
 | 
			
		||||
 | 
			
		||||
To use S3 storage, specify an S3 URI:
 | 
			
		||||
 | 
			
		||||
```
 | 
			
		||||
changedetection.io -d s3://bucket-name/optional-prefix
 | 
			
		||||
```
 | 
			
		||||
 | 
			
		||||
Make sure to:
 | 
			
		||||
1. Install the required dependencies: `pip install -r requirements-storage.txt`
 | 
			
		||||
2. Configure AWS credentials using environment variables or IAM roles:
 | 
			
		||||
   - Set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables
 | 
			
		||||
   - Or use an IAM role when running on AWS EC2/ECS/EKS
 | 
			
		||||
 | 
			
		||||
## Custom Storage Backends
 | 
			
		||||
 | 
			
		||||
You can create custom storage backends by:
 | 
			
		||||
 | 
			
		||||
1. Subclassing the `StorageBase` abstract class in `storage_base.py`
 | 
			
		||||
2. Implementing all required methods
 | 
			
		||||
3. Adding your backend to the `storage_factory.py` file
 | 
			
		||||
							
								
								
									
										1
									
								
								changedetectionio/storage/__init__.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								changedetectionio/storage/__init__.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1 @@
 | 
			
		||||
# This module contains storage backend implementations
 | 
			
		||||
							
								
								
									
										449
									
								
								changedetectionio/storage/filesystem_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										449
									
								
								changedetectionio/storage/filesystem_storage.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,449 @@
 | 
			
		||||
import os
 | 
			
		||||
import shutil
 | 
			
		||||
import json
 | 
			
		||||
import brotli
 | 
			
		||||
import zlib
 | 
			
		||||
import pathlib
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from os import path
 | 
			
		||||
 | 
			
		||||
from .storage_base import StorageBase
 | 
			
		||||
 | 
			
		||||
class FileSystemStorage(StorageBase):
 | 
			
		||||
    """File system storage backend"""
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
        """Initialize the file system storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            datastore_path (str): Path to the datastore
 | 
			
		||||
            include_default_watches (bool): Whether to include default watches
 | 
			
		||||
            version_tag (str): Version tag
 | 
			
		||||
        """
 | 
			
		||||
        self.datastore_path = datastore_path
 | 
			
		||||
        self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
 | 
			
		||||
        logger.info(f"Datastore path is '{self.json_store_path}'")
 | 
			
		||||
        
 | 
			
		||||
    def load_data(self):
 | 
			
		||||
        """Load data from the file system
 | 
			
		||||
        
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The loaded data
 | 
			
		||||
        """
 | 
			
		||||
        if not path.isfile(self.json_store_path):
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        with open(self.json_store_path) as json_file:
 | 
			
		||||
            return json.load(json_file)
 | 
			
		||||
 | 
			
		||||
    def save_data(self, data):
 | 
			
		||||
        """Save data to the file system
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            data (dict): The data to save
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            # Re #286 - First write to a temp file, then confirm it looks OK and rename it
 | 
			
		||||
            # This is a fairly basic strategy to deal with the case that the file is corrupted,
 | 
			
		||||
            # system was out of memory, out of RAM etc
 | 
			
		||||
            with open(self.json_store_path+".tmp", 'w') as json_file:
 | 
			
		||||
                json.dump(data, json_file, indent=4)
 | 
			
		||||
            os.replace(self.json_store_path+".tmp", self.json_store_path)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    def get_watch_dir(self, watch_uuid):
 | 
			
		||||
        """Get the directory for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The watch directory
 | 
			
		||||
        """
 | 
			
		||||
        return os.path.join(self.datastore_path, watch_uuid)
 | 
			
		||||
    
 | 
			
		||||
    def ensure_data_dir_exists(self, watch_uuid):
 | 
			
		||||
        """Ensure the data directory exists for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        if not os.path.isdir(watch_dir):
 | 
			
		||||
            logger.debug(f"> Creating data dir {watch_dir}")
 | 
			
		||||
            os.makedirs(watch_dir, exist_ok=True)
 | 
			
		||||
    
 | 
			
		||||
    def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
 | 
			
		||||
        """Save history text to the file system
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Contents to save
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            snapshot_id (str): Snapshot ID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: Snapshot filename
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
 | 
			
		||||
        skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
 | 
			
		||||
        
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if not skip_brotli and len(contents) > threshold:
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt.br"
 | 
			
		||||
            dest = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
            if not os.path.exists(dest):
 | 
			
		||||
                with open(dest, 'wb') as f:
 | 
			
		||||
                    f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
 | 
			
		||||
        else:
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt"
 | 
			
		||||
            dest = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
            if not os.path.exists(dest):
 | 
			
		||||
                with open(dest, 'wb') as f:
 | 
			
		||||
                    f.write(contents.encode('utf-8'))
 | 
			
		||||
        
 | 
			
		||||
        # Append to index
 | 
			
		||||
        index_fname = os.path.join(watch_dir, "history.txt")
 | 
			
		||||
        with open(index_fname, 'a') as f:
 | 
			
		||||
            f.write("{},{}\n".format(timestamp, snapshot_fname))
 | 
			
		||||
        
 | 
			
		||||
        return snapshot_fname
 | 
			
		||||
    
 | 
			
		||||
    def get_history(self, watch_uuid):
 | 
			
		||||
        """Get history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The history with timestamp keys and snapshot IDs as values
 | 
			
		||||
        """
 | 
			
		||||
        tmp_history = {}
 | 
			
		||||
        
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        if not os.path.isdir(watch_dir):
 | 
			
		||||
            return tmp_history
 | 
			
		||||
        
 | 
			
		||||
        # Read the history file as a dict
 | 
			
		||||
        fname = os.path.join(watch_dir, "history.txt")
 | 
			
		||||
        if os.path.isfile(fname):
 | 
			
		||||
            logger.debug(f"Reading watch history index for {watch_uuid}")
 | 
			
		||||
            with open(fname, "r") as f:
 | 
			
		||||
                for i in f.readlines():
 | 
			
		||||
                    if ',' in i:
 | 
			
		||||
                        k, v = i.strip().split(',', 2)
 | 
			
		||||
                        
 | 
			
		||||
                        # The index history could contain a relative path, so we need to make the fullpath
 | 
			
		||||
                        # so that python can read it
 | 
			
		||||
                        if not '/' in v and not '\'' in v:
 | 
			
		||||
                            v = os.path.join(watch_dir, v)
 | 
			
		||||
                        else:
 | 
			
		||||
                            # It's possible that they moved the datadir on older versions
 | 
			
		||||
                            # So the snapshot exists but is in a different path
 | 
			
		||||
                            snapshot_fname = v.split('/')[-1]
 | 
			
		||||
                            proposed_new_path = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
                            if not os.path.exists(v) and os.path.exists(proposed_new_path):
 | 
			
		||||
                                v = proposed_new_path
 | 
			
		||||
                        
 | 
			
		||||
                        tmp_history[k] = v
 | 
			
		||||
        
 | 
			
		||||
        return tmp_history
 | 
			
		||||
    
 | 
			
		||||
    def get_history_snapshot(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get a history snapshot from the file system
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The snapshot content
 | 
			
		||||
        """
 | 
			
		||||
        history = self.get_history(watch_uuid)
 | 
			
		||||
        if not timestamp in history:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        filepath = history[timestamp]
 | 
			
		||||
        
 | 
			
		||||
        # See if a brotli versions exists and switch to that
 | 
			
		||||
        if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
 | 
			
		||||
            filepath = f"{filepath}.br"
 | 
			
		||||
        
 | 
			
		||||
        # OR in the backup case that the .br does not exist, but the plain one does
 | 
			
		||||
        if filepath.endswith('.br') and not os.path.isfile(filepath):
 | 
			
		||||
            if os.path.isfile(filepath.replace('.br', '')):
 | 
			
		||||
                filepath = filepath.replace('.br', '')
 | 
			
		||||
        
 | 
			
		||||
        if filepath.endswith('.br'):
 | 
			
		||||
            # Brotli doesnt have a fileheader to detect it, so we rely on filename
 | 
			
		||||
            # https://www.rfc-editor.org/rfc/rfc7932
 | 
			
		||||
            with open(filepath, 'rb') as f:
 | 
			
		||||
                return(brotli.decompress(f.read()).decode('utf-8'))
 | 
			
		||||
        
 | 
			
		||||
        with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
 | 
			
		||||
            return f.read()
 | 
			
		||||
    
 | 
			
		||||
    def save_screenshot(self, watch_uuid, screenshot, as_error=False):
 | 
			
		||||
        """Save a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            screenshot (bytes): Screenshot data
 | 
			
		||||
            as_error (bool): Whether this is an error screenshot
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if as_error:
 | 
			
		||||
            target_path = os.path.join(watch_dir, "last-error-screenshot.png")
 | 
			
		||||
        else:
 | 
			
		||||
            target_path = os.path.join(watch_dir, "last-screenshot.png")
 | 
			
		||||
        
 | 
			
		||||
        with open(target_path, 'wb') as f:
 | 
			
		||||
            f.write(screenshot)
 | 
			
		||||
    
 | 
			
		||||
    def get_screenshot(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get the error screenshot
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or None: The screenshot path or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if is_error:
 | 
			
		||||
            fname = os.path.join(watch_dir, "last-error-screenshot.png")
 | 
			
		||||
        else:
 | 
			
		||||
            fname = os.path.join(watch_dir, "last-screenshot.png")
 | 
			
		||||
            
 | 
			
		||||
        if os.path.isfile(fname):
 | 
			
		||||
            return fname
 | 
			
		||||
        
 | 
			
		||||
        return None
 | 
			
		||||
    
 | 
			
		||||
    def save_error_text(self, watch_uuid, contents):
 | 
			
		||||
        """Save error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Error contents
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        target_path = os.path.join(watch_dir, "last-error.txt")
 | 
			
		||||
        with open(target_path, 'w', encoding='utf-8') as f:
 | 
			
		||||
            f.write(contents)
 | 
			
		||||
    
 | 
			
		||||
    def get_error_text(self, watch_uuid):
 | 
			
		||||
        """Get error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The error text or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        fname = os.path.join(watch_dir, "last-error.txt")
 | 
			
		||||
        
 | 
			
		||||
        if os.path.isfile(fname):
 | 
			
		||||
            with open(fname, 'r') as f:
 | 
			
		||||
                return f.read()
 | 
			
		||||
                
 | 
			
		||||
        return False
 | 
			
		||||
    
 | 
			
		||||
    def save_xpath_data(self, watch_uuid, data, as_error=False):
 | 
			
		||||
        """Save XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            data (dict): XPath data
 | 
			
		||||
            as_error (bool): Whether this is error data
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if as_error:
 | 
			
		||||
            target_path = os.path.join(watch_dir, "elements-error.deflate")
 | 
			
		||||
        else:
 | 
			
		||||
            target_path = os.path.join(watch_dir, "elements.deflate")
 | 
			
		||||
        
 | 
			
		||||
        with open(target_path, 'wb') as f:
 | 
			
		||||
            f.write(zlib.compress(json.dumps(data).encode()))
 | 
			
		||||
    
 | 
			
		||||
    def get_xpath_data(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get error data
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict or None: The XPath data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if is_error:
 | 
			
		||||
            path = os.path.join(watch_dir, "elements-error.deflate")
 | 
			
		||||
        else:
 | 
			
		||||
            path = os.path.join(watch_dir, "elements.deflate")
 | 
			
		||||
            
 | 
			
		||||
        if not os.path.isfile(path):
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        with open(path, 'rb') as f:
 | 
			
		||||
            return json.loads(zlib.decompress(f.read()).decode('utf-8'))
 | 
			
		||||
    
 | 
			
		||||
    def save_last_fetched_html(self, watch_uuid, timestamp, contents):
 | 
			
		||||
        """Save last fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            contents (str): HTML contents
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        snapshot_fname = f"{timestamp}.html.br"
 | 
			
		||||
        filepath = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
        
 | 
			
		||||
        with open(filepath, 'wb') as f:
 | 
			
		||||
            contents = contents.encode('utf-8') if isinstance(contents, str) else contents
 | 
			
		||||
            try:
 | 
			
		||||
                f.write(brotli.compress(contents))
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.warning(f"{watch_uuid} - Unable to compress snapshot, saving as raw data to {filepath}")
 | 
			
		||||
                logger.warning(e)
 | 
			
		||||
                f.write(contents)
 | 
			
		||||
                
 | 
			
		||||
        # Prune old snapshots - keep only the newest 2
 | 
			
		||||
        self._prune_last_fetched_html_snapshots(watch_uuid)
 | 
			
		||||
    
 | 
			
		||||
    def _prune_last_fetched_html_snapshots(self, watch_uuid):
 | 
			
		||||
        """Prune old HTML snapshots
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        history = self.get_history(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        dates = list(history.keys())
 | 
			
		||||
        dates.reverse()
 | 
			
		||||
        
 | 
			
		||||
        for index, timestamp in enumerate(dates):
 | 
			
		||||
            snapshot_fname = f"{timestamp}.html.br"
 | 
			
		||||
            filepath = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
            
 | 
			
		||||
            # Keep only the first 2
 | 
			
		||||
            if index > 1 and os.path.isfile(filepath):
 | 
			
		||||
                os.remove(filepath)
 | 
			
		||||
    
 | 
			
		||||
    def get_fetched_html(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The HTML or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        snapshot_fname = f"{timestamp}.html.br"
 | 
			
		||||
        filepath = os.path.join(watch_dir, snapshot_fname)
 | 
			
		||||
        
 | 
			
		||||
        if os.path.isfile(filepath):
 | 
			
		||||
            with open(filepath, 'rb') as f:
 | 
			
		||||
                return brotli.decompress(f.read()).decode('utf-8')
 | 
			
		||||
                
 | 
			
		||||
        return False
 | 
			
		||||
    
 | 
			
		||||
    def save_last_text_fetched_before_filters(self, watch_uuid, contents):
 | 
			
		||||
        """Save the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Text contents
 | 
			
		||||
        """
 | 
			
		||||
        self.ensure_data_dir_exists(watch_uuid)
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        filepath = os.path.join(watch_dir, 'last-fetched.br')
 | 
			
		||||
        with open(filepath, 'wb') as f:
 | 
			
		||||
            f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
 | 
			
		||||
    
 | 
			
		||||
    def get_last_fetched_text_before_filters(self, watch_uuid):
 | 
			
		||||
        """Get the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The text
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        filepath = os.path.join(watch_dir, 'last-fetched.br')
 | 
			
		||||
        
 | 
			
		||||
        if not os.path.isfile(filepath):
 | 
			
		||||
            # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
 | 
			
		||||
            history = self.get_history(watch_uuid)
 | 
			
		||||
            dates = list(history.keys())
 | 
			
		||||
            
 | 
			
		||||
            if len(dates):
 | 
			
		||||
                return self.get_history_snapshot(watch_uuid, dates[-1])
 | 
			
		||||
            else:
 | 
			
		||||
                return ''
 | 
			
		||||
        
 | 
			
		||||
        with open(filepath, 'rb') as f:
 | 
			
		||||
            return brotli.decompress(f.read()).decode('utf-8')
 | 
			
		||||
    
 | 
			
		||||
    def visualselector_data_is_ready(self, watch_uuid):
 | 
			
		||||
        """Check if visual selector data is ready
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: Whether visual selector data is ready
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        screenshot_filename = os.path.join(watch_dir, "last-screenshot.png")
 | 
			
		||||
        elements_index_filename = os.path.join(watch_dir, "elements.deflate")
 | 
			
		||||
        
 | 
			
		||||
        return path.isfile(screenshot_filename) and path.isfile(elements_index_filename)
 | 
			
		||||
    
 | 
			
		||||
    def clear_watch_history(self, watch_uuid):
 | 
			
		||||
        """Clear history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        if not os.path.exists(watch_dir):
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        # Delete all files but keep the directory
 | 
			
		||||
        for item in pathlib.Path(watch_dir).glob("*.*"):
 | 
			
		||||
            os.unlink(item)
 | 
			
		||||
    
 | 
			
		||||
    def delete_watch(self, watch_uuid):
 | 
			
		||||
        """Delete a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_dir = self.get_watch_dir(watch_uuid)
 | 
			
		||||
        if os.path.exists(watch_dir):
 | 
			
		||||
            shutil.rmtree(watch_dir)
 | 
			
		||||
							
								
								
									
										466
									
								
								changedetectionio/storage/mongodb_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										466
									
								
								changedetectionio/storage/mongodb_storage.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,466 @@
 | 
			
		||||
import os
 | 
			
		||||
from copy import deepcopy
 | 
			
		||||
 | 
			
		||||
import brotli
 | 
			
		||||
import zlib
 | 
			
		||||
import json
 | 
			
		||||
import time
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from pymongo import MongoClient
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
import base64
 | 
			
		||||
 | 
			
		||||
from .storage_base import StorageBase
 | 
			
		||||
 | 
			
		||||
class MongoDBStorage(StorageBase):
 | 
			
		||||
    """MongoDB storage backend"""
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
        """Initialize the MongoDB storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            datastore_path (str): MongoDB connection URI
 | 
			
		||||
            include_default_watches (bool): Whether to include default watches
 | 
			
		||||
            version_tag (str): Version tag
 | 
			
		||||
        """
 | 
			
		||||
        # Parse MongoDB URI from datastore_path
 | 
			
		||||
        parsed_uri = urlparse(datastore_path)
 | 
			
		||||
        self.db_name = parsed_uri.path.lstrip('/')
 | 
			
		||||
        if not self.db_name:
 | 
			
		||||
            self.db_name = 'changedetection'
 | 
			
		||||
            
 | 
			
		||||
        # Connect to MongoDB
 | 
			
		||||
        self.client = MongoClient(datastore_path)
 | 
			
		||||
        self.db = self.client[self.db_name]
 | 
			
		||||
        
 | 
			
		||||
        # Collections
 | 
			
		||||
        self.app_collection = self.db['app']
 | 
			
		||||
        self.watches_collection = self.db['watches']
 | 
			
		||||
        self.snapshots_collection = self.db['snapshots']
 | 
			
		||||
        self.history_collection = self.db['history']
 | 
			
		||||
        self.error_collection = self.db['errors']
 | 
			
		||||
        self.xpath_collection = self.db['xpath']
 | 
			
		||||
        self.html_collection = self.db['html']
 | 
			
		||||
        
 | 
			
		||||
        logger.info(f"MongoDB storage initialized, connected to {datastore_path}")
 | 
			
		||||
    
 | 
			
		||||
    def load_data(self):
 | 
			
		||||
        """Load data from MongoDB
 | 
			
		||||
        
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The loaded data
 | 
			
		||||
        """
 | 
			
		||||
        app_data = self.app_collection.find_one({'_id': 'app_data'})
 | 
			
		||||
        if not app_data:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        # Remove MongoDB _id field
 | 
			
		||||
        if '_id' in app_data:
 | 
			
		||||
            del app_data['_id']
 | 
			
		||||
            
 | 
			
		||||
        return app_data
 | 
			
		||||
    
 | 
			
		||||
    def save_data(self, data):
 | 
			
		||||
        """Save data to MongoDB
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            data (dict): The data to save
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            # Create a copy to modify
 | 
			
		||||
            data_copy = deepcopy(data)
 | 
			
		||||
            
 | 
			
		||||
            # Set _id for app data
 | 
			
		||||
            data_copy['_id'] = 'app_data'
 | 
			
		||||
            
 | 
			
		||||
            # Insert or update app data
 | 
			
		||||
            self.app_collection.replace_one({'_id': 'app_data'}, data_copy, upsert=True)
 | 
			
		||||
            
 | 
			
		||||
            # Also store watches separately for more granular access
 | 
			
		||||
            # This provides a safety net in case of corrupted app_data
 | 
			
		||||
            watches = data.get('watching', {})
 | 
			
		||||
            for uuid, watch in watches.items():
 | 
			
		||||
                if isinstance(watch, dict):  # Handle case where watch is a Watch object
 | 
			
		||||
                    watch_copy = deepcopy(dict(watch))
 | 
			
		||||
                else:
 | 
			
		||||
                    watch_copy = deepcopy(watch)
 | 
			
		||||
                watch_copy['_id'] = uuid
 | 
			
		||||
                self.watches_collection.replace_one({'_id': uuid}, watch_copy, upsert=True)
 | 
			
		||||
                
 | 
			
		||||
            # Also store tags separately
 | 
			
		||||
            if 'settings' in data and 'application' in data['settings'] and 'tags' in data['settings']['application']:
 | 
			
		||||
                tags = data['settings']['application']['tags']
 | 
			
		||||
                for uuid, tag in tags.items():
 | 
			
		||||
                    if isinstance(tag, dict):  # Handle case where tag is a Tag object
 | 
			
		||||
                        tag_copy = deepcopy(dict(tag))
 | 
			
		||||
                    else:
 | 
			
		||||
                        tag_copy = deepcopy(tag)
 | 
			
		||||
                    tag_copy['_id'] = uuid
 | 
			
		||||
                    self.db['tags'].replace_one({'_id': uuid}, tag_copy, upsert=True)
 | 
			
		||||
                    
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error writing to MongoDB: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    def ensure_data_dir_exists(self, watch_uuid):
 | 
			
		||||
        """Ensure the data directory exists for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # MongoDB doesn't need directories, this is a no-op
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
 | 
			
		||||
        """Save history text to MongoDB
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Contents to save
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            snapshot_id (str): Snapshot ID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: Snapshot ID
 | 
			
		||||
        """
 | 
			
		||||
        # Compress the contents
 | 
			
		||||
        compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
 | 
			
		||||
        
 | 
			
		||||
        # Store the snapshot
 | 
			
		||||
        snapshot_data = {
 | 
			
		||||
            '_id': f"{watch_uuid}:{timestamp}",
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'timestamp': timestamp,
 | 
			
		||||
            'snapshot_id': snapshot_id,
 | 
			
		||||
            'contents': base64.b64encode(compressed_contents).decode('ascii'),
 | 
			
		||||
            'compressed': True
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.snapshots_collection.replace_one({'_id': snapshot_data['_id']}, snapshot_data, upsert=True)
 | 
			
		||||
        
 | 
			
		||||
        # Update history index
 | 
			
		||||
        history_entry = {
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'timestamp': timestamp,
 | 
			
		||||
            'snapshot_id': snapshot_id
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.history_collection.replace_one(
 | 
			
		||||
            {'watch_uuid': watch_uuid, 'timestamp': timestamp},
 | 
			
		||||
            history_entry, 
 | 
			
		||||
            upsert=True
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        return snapshot_id
 | 
			
		||||
    
 | 
			
		||||
    def get_history(self, watch_uuid):
 | 
			
		||||
        """Get history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The history with timestamp keys and snapshot IDs as values
 | 
			
		||||
        """
 | 
			
		||||
        history = {}
 | 
			
		||||
        
 | 
			
		||||
        # Query history entries for this watch
 | 
			
		||||
        entries = self.history_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', 1)
 | 
			
		||||
        
 | 
			
		||||
        for entry in entries:
 | 
			
		||||
            history[str(entry['timestamp'])] = entry['snapshot_id']
 | 
			
		||||
            
 | 
			
		||||
        return history
 | 
			
		||||
    
 | 
			
		||||
    def get_history_snapshot(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get a history snapshot from MongoDB
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The snapshot content
 | 
			
		||||
        """
 | 
			
		||||
        # Query for the snapshot
 | 
			
		||||
        snapshot = self.snapshots_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
 | 
			
		||||
        
 | 
			
		||||
        if not snapshot:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        if snapshot.get('compressed', False):
 | 
			
		||||
            # Decompress the contents
 | 
			
		||||
            compressed_data = base64.b64decode(snapshot['contents'])
 | 
			
		||||
            return brotli.decompress(compressed_data).decode('utf-8')
 | 
			
		||||
        else:
 | 
			
		||||
            return snapshot['contents']
 | 
			
		||||
    
 | 
			
		||||
    def save_screenshot(self, watch_uuid, screenshot, as_error=False):
 | 
			
		||||
        """Save a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            screenshot (bytes): Screenshot data
 | 
			
		||||
            as_error (bool): Whether this is an error screenshot
 | 
			
		||||
        """
 | 
			
		||||
        collection_name = 'error_screenshots' if as_error else 'screenshots'
 | 
			
		||||
        collection = self.db[collection_name]
 | 
			
		||||
        
 | 
			
		||||
        # Encode the screenshot as base64
 | 
			
		||||
        encoded_screenshot = base64.b64encode(screenshot).decode('ascii')
 | 
			
		||||
        
 | 
			
		||||
        screenshot_data = {
 | 
			
		||||
            '_id': watch_uuid,
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'screenshot': encoded_screenshot,
 | 
			
		||||
            'timestamp': int(time.time())
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        collection.replace_one({'_id': watch_uuid}, screenshot_data, upsert=True)
 | 
			
		||||
    
 | 
			
		||||
    def get_screenshot(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get the error screenshot
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bytes or None: The screenshot data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        collection_name = 'error_screenshots' if is_error else 'screenshots'
 | 
			
		||||
        collection = self.db[collection_name]
 | 
			
		||||
        
 | 
			
		||||
        screenshot_data = collection.find_one({'_id': watch_uuid})
 | 
			
		||||
        if not screenshot_data:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        # Decode the screenshot from base64
 | 
			
		||||
        return base64.b64decode(screenshot_data['screenshot'])
 | 
			
		||||
    
 | 
			
		||||
    def save_error_text(self, watch_uuid, contents):
 | 
			
		||||
        """Save error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Error contents
 | 
			
		||||
        """
 | 
			
		||||
        error_data = {
 | 
			
		||||
            '_id': watch_uuid,
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'error_text': contents,
 | 
			
		||||
            'timestamp': int(time.time())
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.error_collection.replace_one({'_id': watch_uuid}, error_data, upsert=True)
 | 
			
		||||
    
 | 
			
		||||
    def get_error_text(self, watch_uuid):
 | 
			
		||||
        """Get error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The error text or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        error_data = self.error_collection.find_one({'_id': watch_uuid})
 | 
			
		||||
        if not error_data:
 | 
			
		||||
            return False
 | 
			
		||||
            
 | 
			
		||||
        return error_data['error_text']
 | 
			
		||||
    
 | 
			
		||||
    def save_xpath_data(self, watch_uuid, data, as_error=False):
 | 
			
		||||
        """Save XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            data (dict): XPath data
 | 
			
		||||
            as_error (bool): Whether this is error data
 | 
			
		||||
        """
 | 
			
		||||
        # Compress the data
 | 
			
		||||
        compressed_data = zlib.compress(json.dumps(data).encode())
 | 
			
		||||
        
 | 
			
		||||
        _id = f"{watch_uuid}:error" if as_error else watch_uuid
 | 
			
		||||
        
 | 
			
		||||
        xpath_data = {
 | 
			
		||||
            '_id': _id,
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'is_error': as_error,
 | 
			
		||||
            'data': base64.b64encode(compressed_data).decode('ascii'),
 | 
			
		||||
            'timestamp': int(time.time())
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.xpath_collection.replace_one({'_id': _id}, xpath_data, upsert=True)
 | 
			
		||||
    
 | 
			
		||||
    def get_xpath_data(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get error data
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict or None: The XPath data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        _id = f"{watch_uuid}:error" if is_error else watch_uuid
 | 
			
		||||
        
 | 
			
		||||
        xpath_data = self.xpath_collection.find_one({'_id': _id})
 | 
			
		||||
        if not xpath_data:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        # Decompress the data
 | 
			
		||||
        compressed_data = base64.b64decode(xpath_data['data'])
 | 
			
		||||
        return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
 | 
			
		||||
    
 | 
			
		||||
    def save_last_fetched_html(self, watch_uuid, timestamp, contents):
 | 
			
		||||
        """Save last fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            contents (str): HTML contents
 | 
			
		||||
        """
 | 
			
		||||
        # Compress the contents
 | 
			
		||||
        contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
 | 
			
		||||
        try:
 | 
			
		||||
            compressed_contents = brotli.compress(contents_bytes)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
 | 
			
		||||
            compressed_contents = contents_bytes
 | 
			
		||||
        
 | 
			
		||||
        html_data = {
 | 
			
		||||
            '_id': f"{watch_uuid}:{timestamp}",
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'timestamp': timestamp,
 | 
			
		||||
            'html': base64.b64encode(compressed_contents).decode('ascii'),
 | 
			
		||||
            'compressed': True
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.html_collection.replace_one({'_id': html_data['_id']}, html_data, upsert=True)
 | 
			
		||||
        
 | 
			
		||||
        # Prune old snapshots - keep only the newest 2
 | 
			
		||||
        self._prune_last_fetched_html_snapshots(watch_uuid)
 | 
			
		||||
    
 | 
			
		||||
    def _prune_last_fetched_html_snapshots(self, watch_uuid):
 | 
			
		||||
        """Prune old HTML snapshots
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # Get all HTML snapshots for this watch, sorted by timestamp descending
 | 
			
		||||
        html_snapshots = list(
 | 
			
		||||
            self.html_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', -1)
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        # Keep only the first 2
 | 
			
		||||
        if len(html_snapshots) > 2:
 | 
			
		||||
            for snapshot in html_snapshots[2:]:
 | 
			
		||||
                self.html_collection.delete_one({'_id': snapshot['_id']})
 | 
			
		||||
    
 | 
			
		||||
    def get_fetched_html(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The HTML or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        html_data = self.html_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
 | 
			
		||||
        
 | 
			
		||||
        if not html_data:
 | 
			
		||||
            return False
 | 
			
		||||
            
 | 
			
		||||
        if html_data.get('compressed', False):
 | 
			
		||||
            # Decompress the contents
 | 
			
		||||
            compressed_data = base64.b64decode(html_data['html'])
 | 
			
		||||
            return brotli.decompress(compressed_data).decode('utf-8')
 | 
			
		||||
        else:
 | 
			
		||||
            return html_data['html']
 | 
			
		||||
    
 | 
			
		||||
    def save_last_text_fetched_before_filters(self, watch_uuid, contents):
 | 
			
		||||
        """Save the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Text contents
 | 
			
		||||
        """
 | 
			
		||||
        # Compress the contents
 | 
			
		||||
        compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
 | 
			
		||||
        
 | 
			
		||||
        last_fetched_data = {
 | 
			
		||||
            '_id': watch_uuid,
 | 
			
		||||
            'watch_uuid': watch_uuid,
 | 
			
		||||
            'contents': base64.b64encode(compressed_contents).decode('ascii'),
 | 
			
		||||
            'timestamp': int(time.time())
 | 
			
		||||
        }
 | 
			
		||||
        
 | 
			
		||||
        self.db['last_fetched'].replace_one({'_id': watch_uuid}, last_fetched_data, upsert=True)
 | 
			
		||||
    
 | 
			
		||||
    def get_last_fetched_text_before_filters(self, watch_uuid):
 | 
			
		||||
        """Get the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The text
 | 
			
		||||
        """
 | 
			
		||||
        last_fetched_data = self.db['last_fetched'].find_one({'_id': watch_uuid})
 | 
			
		||||
        
 | 
			
		||||
        if not last_fetched_data:
 | 
			
		||||
            # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
 | 
			
		||||
            history = self.get_history(watch_uuid)
 | 
			
		||||
            dates = list(history.keys())
 | 
			
		||||
            
 | 
			
		||||
            if len(dates):
 | 
			
		||||
                return self.get_history_snapshot(watch_uuid, dates[-1])
 | 
			
		||||
            else:
 | 
			
		||||
                return ''
 | 
			
		||||
                
 | 
			
		||||
        # Decompress the contents
 | 
			
		||||
        compressed_data = base64.b64decode(last_fetched_data['contents'])
 | 
			
		||||
        return brotli.decompress(compressed_data).decode('utf-8')
 | 
			
		||||
    
 | 
			
		||||
    def visualselector_data_is_ready(self, watch_uuid):
 | 
			
		||||
        """Check if visual selector data is ready
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: Whether visual selector data is ready
 | 
			
		||||
        """
 | 
			
		||||
        # Check if screenshot and xpath data exist
 | 
			
		||||
        screenshot = self.db['screenshots'].find_one({'_id': watch_uuid})
 | 
			
		||||
        xpath_data = self.xpath_collection.find_one({'_id': watch_uuid})
 | 
			
		||||
        
 | 
			
		||||
        return screenshot is not None and xpath_data is not None
 | 
			
		||||
    
 | 
			
		||||
    def clear_watch_history(self, watch_uuid):
 | 
			
		||||
        """Clear history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # Delete all snapshots and history for this watch
 | 
			
		||||
        self.snapshots_collection.delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.history_collection.delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.html_collection.delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.db['last_fetched'].delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.xpath_collection.delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.db['screenshots'].delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
        self.error_collection.delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
    
 | 
			
		||||
    def delete_watch(self, watch_uuid):
 | 
			
		||||
        """Delete a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # Clear all history data
 | 
			
		||||
        self.clear_watch_history(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        # Also delete error screenshots
 | 
			
		||||
        self.db['error_screenshots'].delete_many({'watch_uuid': watch_uuid})
 | 
			
		||||
							
								
								
									
										525
									
								
								changedetectionio/storage/s3_storage.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										525
									
								
								changedetectionio/storage/s3_storage.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,525 @@
 | 
			
		||||
import os
 | 
			
		||||
import io
 | 
			
		||||
import json
 | 
			
		||||
import brotli
 | 
			
		||||
import zlib
 | 
			
		||||
import time
 | 
			
		||||
from loguru import logger
 | 
			
		||||
import boto3
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
import base64
 | 
			
		||||
 | 
			
		||||
from .storage_base import StorageBase
 | 
			
		||||
 | 
			
		||||
class S3Storage(StorageBase):
 | 
			
		||||
    """Amazon S3 storage backend"""
 | 
			
		||||
    
 | 
			
		||||
    def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
        """Initialize the S3 storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            datastore_path (str): S3 URI (s3://bucket-name/optional-prefix)
 | 
			
		||||
            include_default_watches (bool): Whether to include default watches
 | 
			
		||||
            version_tag (str): Version tag
 | 
			
		||||
        """
 | 
			
		||||
        # Parse S3 URI
 | 
			
		||||
        parsed_uri = urlparse(datastore_path)
 | 
			
		||||
        self.bucket_name = parsed_uri.netloc
 | 
			
		||||
        self.prefix = parsed_uri.path.lstrip('/')
 | 
			
		||||
        
 | 
			
		||||
        if self.prefix and not self.prefix.endswith('/'):
 | 
			
		||||
            self.prefix += '/'
 | 
			
		||||
            
 | 
			
		||||
        # Initialize S3 client
 | 
			
		||||
        # Uses AWS credentials from environment variables or IAM role
 | 
			
		||||
        self.s3 = boto3.client('s3')
 | 
			
		||||
        
 | 
			
		||||
        logger.info(f"S3 storage initialized, using bucket '{self.bucket_name}' with prefix '{self.prefix}'")
 | 
			
		||||
    
 | 
			
		||||
    def _get_key(self, path):
 | 
			
		||||
        """Get the S3 key for a path
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            path (str): Path relative to the prefix
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The full S3 key
 | 
			
		||||
        """
 | 
			
		||||
        return f"{self.prefix}{path}"
 | 
			
		||||
    
 | 
			
		||||
    def load_data(self):
 | 
			
		||||
        """Load data from S3
 | 
			
		||||
        
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The loaded data
 | 
			
		||||
        """
 | 
			
		||||
        key = self._get_key("app-data.json")
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            return json.loads(response['Body'].read().decode('utf-8'))
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return None
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error loading data from S3: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    def save_data(self, data):
 | 
			
		||||
        """Save data to S3
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            data (dict): The data to save
 | 
			
		||||
        """
 | 
			
		||||
        try:
 | 
			
		||||
            key = self._get_key("app-data.json")
 | 
			
		||||
            self.s3.put_object(
 | 
			
		||||
                Bucket=self.bucket_name,
 | 
			
		||||
                Key=key,
 | 
			
		||||
                Body=json.dumps(data, indent=4),
 | 
			
		||||
                ContentType='application/json'
 | 
			
		||||
            )
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error saving data to S3: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
    
 | 
			
		||||
    def ensure_data_dir_exists(self, watch_uuid):
 | 
			
		||||
        """Ensure the data directory exists for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # S3 doesn't need directories, this is a no-op
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    def _get_watch_prefix(self, watch_uuid):
 | 
			
		||||
        """Get the S3 prefix for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The watch prefix
 | 
			
		||||
        """
 | 
			
		||||
        return self._get_key(f"watches/{watch_uuid}/")
 | 
			
		||||
    
 | 
			
		||||
    def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
 | 
			
		||||
        """Save history text to S3
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Contents to save
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            snapshot_id (str): Snapshot ID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: Snapshot ID
 | 
			
		||||
        """
 | 
			
		||||
        # Determine if we should compress
 | 
			
		||||
        threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
 | 
			
		||||
        skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
 | 
			
		||||
        
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        # Save the snapshot
 | 
			
		||||
        if not skip_brotli and len(contents) > threshold:
 | 
			
		||||
            snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt.br"
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt.br"
 | 
			
		||||
            compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
 | 
			
		||||
            
 | 
			
		||||
            self.s3.put_object(
 | 
			
		||||
                Bucket=self.bucket_name,
 | 
			
		||||
                Key=snapshot_key,
 | 
			
		||||
                Body=compressed_contents
 | 
			
		||||
            )
 | 
			
		||||
        else:
 | 
			
		||||
            snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt"
 | 
			
		||||
            snapshot_fname = f"{snapshot_id}.txt"
 | 
			
		||||
            
 | 
			
		||||
            self.s3.put_object(
 | 
			
		||||
                Bucket=self.bucket_name,
 | 
			
		||||
                Key=snapshot_key,
 | 
			
		||||
                Body=contents.encode('utf-8')
 | 
			
		||||
            )
 | 
			
		||||
        
 | 
			
		||||
        # Update history index
 | 
			
		||||
        history_key = f"{watch_prefix}history.txt"
 | 
			
		||||
        
 | 
			
		||||
        # Try to get existing history first
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
 | 
			
		||||
            history_content = response['Body'].read().decode('utf-8')
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            history_content = ""
 | 
			
		||||
        
 | 
			
		||||
        # Append new entry
 | 
			
		||||
        history_content += f"{timestamp},{snapshot_fname}\n"
 | 
			
		||||
        
 | 
			
		||||
        # Save updated history
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=history_key,
 | 
			
		||||
            Body=history_content
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        return snapshot_fname
 | 
			
		||||
    
 | 
			
		||||
    def get_history(self, watch_uuid):
 | 
			
		||||
        """Get history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The history with timestamp keys and snapshot IDs as values
 | 
			
		||||
        """
 | 
			
		||||
        tmp_history = {}
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        history_key = f"{watch_prefix}history.txt"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
 | 
			
		||||
            history_content = response['Body'].read().decode('utf-8')
 | 
			
		||||
            
 | 
			
		||||
            for line in history_content.splitlines():
 | 
			
		||||
                if ',' in line:
 | 
			
		||||
                    k, v = line.strip().split(',', 2)
 | 
			
		||||
                    tmp_history[k] = f"{watch_prefix}snapshots/{v}"
 | 
			
		||||
            
 | 
			
		||||
            return tmp_history
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return {}
 | 
			
		||||
    
 | 
			
		||||
    def get_history_snapshot(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get a history snapshot from S3
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The snapshot content
 | 
			
		||||
        """
 | 
			
		||||
        history = self.get_history(watch_uuid)
 | 
			
		||||
        if not timestamp in history:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        key = history[timestamp]
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            content = response['Body'].read()
 | 
			
		||||
            
 | 
			
		||||
            if key.endswith('.br'):
 | 
			
		||||
                # Decompress brotli
 | 
			
		||||
                return brotli.decompress(content).decode('utf-8')
 | 
			
		||||
            else:
 | 
			
		||||
                return content.decode('utf-8')
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error reading snapshot from S3: {str(e)}")
 | 
			
		||||
            return None
 | 
			
		||||
    
 | 
			
		||||
    def save_screenshot(self, watch_uuid, screenshot, as_error=False):
 | 
			
		||||
        """Save a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            screenshot (bytes): Screenshot data
 | 
			
		||||
            as_error (bool): Whether this is an error screenshot
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if as_error:
 | 
			
		||||
            key = f"{watch_prefix}last-error-screenshot.png"
 | 
			
		||||
        else:
 | 
			
		||||
            key = f"{watch_prefix}last-screenshot.png"
 | 
			
		||||
        
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=key,
 | 
			
		||||
            Body=screenshot,
 | 
			
		||||
            ContentType='image/png'
 | 
			
		||||
        )
 | 
			
		||||
    
 | 
			
		||||
    def get_screenshot(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get the error screenshot
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bytes or None: The screenshot data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if is_error:
 | 
			
		||||
            key = f"{watch_prefix}last-error-screenshot.png"
 | 
			
		||||
        else:
 | 
			
		||||
            key = f"{watch_prefix}last-screenshot.png"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            return response['Body'].read()
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return None
 | 
			
		||||
    
 | 
			
		||||
    def save_error_text(self, watch_uuid, contents):
 | 
			
		||||
        """Save error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Error contents
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}last-error.txt"
 | 
			
		||||
        
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=key,
 | 
			
		||||
            Body=contents.encode('utf-8')
 | 
			
		||||
        )
 | 
			
		||||
    
 | 
			
		||||
    def get_error_text(self, watch_uuid):
 | 
			
		||||
        """Get error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The error text or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}last-error.txt"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            return response['Body'].read().decode('utf-8')
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    def save_xpath_data(self, watch_uuid, data, as_error=False):
 | 
			
		||||
        """Save XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            data (dict): XPath data
 | 
			
		||||
            as_error (bool): Whether this is error data
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if as_error:
 | 
			
		||||
            key = f"{watch_prefix}elements-error.deflate"
 | 
			
		||||
        else:
 | 
			
		||||
            key = f"{watch_prefix}elements.deflate"
 | 
			
		||||
        
 | 
			
		||||
        compressed_data = zlib.compress(json.dumps(data).encode())
 | 
			
		||||
        
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=key,
 | 
			
		||||
            Body=compressed_data
 | 
			
		||||
        )
 | 
			
		||||
    
 | 
			
		||||
    def get_xpath_data(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get error data
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict or None: The XPath data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        if is_error:
 | 
			
		||||
            key = f"{watch_prefix}elements-error.deflate"
 | 
			
		||||
        else:
 | 
			
		||||
            key = f"{watch_prefix}elements.deflate"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            compressed_data = response['Body'].read()
 | 
			
		||||
            return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return None
 | 
			
		||||
    
 | 
			
		||||
    def save_last_fetched_html(self, watch_uuid, timestamp, contents):
 | 
			
		||||
        """Save last fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            contents (str): HTML contents
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}html/{timestamp}.html.br"
 | 
			
		||||
        
 | 
			
		||||
        contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
 | 
			
		||||
        try:
 | 
			
		||||
            compressed_contents = brotli.compress(contents_bytes)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
 | 
			
		||||
            compressed_contents = contents_bytes
 | 
			
		||||
        
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=key,
 | 
			
		||||
            Body=compressed_contents
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        # Prune old snapshots - keep only the newest 2
 | 
			
		||||
        self._prune_last_fetched_html_snapshots(watch_uuid)
 | 
			
		||||
    
 | 
			
		||||
    def _prune_last_fetched_html_snapshots(self, watch_uuid):
 | 
			
		||||
        """Prune old HTML snapshots
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        html_prefix = f"{watch_prefix}html/"
 | 
			
		||||
        
 | 
			
		||||
        # List all HTML snapshots
 | 
			
		||||
        response = self.s3.list_objects_v2(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Prefix=html_prefix
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        if 'Contents' not in response:
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        # Sort by timestamp (extract from key)
 | 
			
		||||
        html_files = sorted(
 | 
			
		||||
            response['Contents'],
 | 
			
		||||
            key=lambda x: int(x['Key'].split('/')[-1].split('.')[0]),
 | 
			
		||||
            reverse=True
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        # Delete all but the newest 2
 | 
			
		||||
        if len(html_files) > 2:
 | 
			
		||||
            for file in html_files[2:]:
 | 
			
		||||
                self.s3.delete_object(
 | 
			
		||||
                    Bucket=self.bucket_name,
 | 
			
		||||
                    Key=file['Key']
 | 
			
		||||
                )
 | 
			
		||||
    
 | 
			
		||||
    def get_fetched_html(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The HTML or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}html/{timestamp}.html.br"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            compressed_data = response['Body'].read()
 | 
			
		||||
            return brotli.decompress(compressed_data).decode('utf-8')
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    def save_last_text_fetched_before_filters(self, watch_uuid, contents):
 | 
			
		||||
        """Save the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Text contents
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}last-fetched.br"
 | 
			
		||||
        
 | 
			
		||||
        compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
 | 
			
		||||
        
 | 
			
		||||
        self.s3.put_object(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Key=key,
 | 
			
		||||
            Body=compressed_contents
 | 
			
		||||
        )
 | 
			
		||||
    
 | 
			
		||||
    def get_last_fetched_text_before_filters(self, watch_uuid):
 | 
			
		||||
        """Get the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The text
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        key = f"{watch_prefix}last-fetched.br"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
 | 
			
		||||
            compressed_data = response['Body'].read()
 | 
			
		||||
            return brotli.decompress(compressed_data).decode('utf-8')
 | 
			
		||||
        except self.s3.exceptions.NoSuchKey:
 | 
			
		||||
            # If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
 | 
			
		||||
            history = self.get_history(watch_uuid)
 | 
			
		||||
            dates = list(history.keys())
 | 
			
		||||
            
 | 
			
		||||
            if len(dates):
 | 
			
		||||
                return self.get_history_snapshot(watch_uuid, dates[-1])
 | 
			
		||||
            else:
 | 
			
		||||
                return ''
 | 
			
		||||
    
 | 
			
		||||
    def visualselector_data_is_ready(self, watch_uuid):
 | 
			
		||||
        """Check if visual selector data is ready
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: Whether visual selector data is ready
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        screenshot_key = f"{watch_prefix}last-screenshot.png"
 | 
			
		||||
        elements_key = f"{watch_prefix}elements.deflate"
 | 
			
		||||
        
 | 
			
		||||
        try:
 | 
			
		||||
            # Just check if both files exist
 | 
			
		||||
            self.s3.head_object(Bucket=self.bucket_name, Key=screenshot_key)
 | 
			
		||||
            self.s3.head_object(Bucket=self.bucket_name, Key=elements_key)
 | 
			
		||||
            return True
 | 
			
		||||
        except self.s3.exceptions.ClientError:
 | 
			
		||||
            return False
 | 
			
		||||
    
 | 
			
		||||
    def clear_watch_history(self, watch_uuid):
 | 
			
		||||
        """Clear history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        watch_prefix = self._get_watch_prefix(watch_uuid)
 | 
			
		||||
        
 | 
			
		||||
        # List all objects with this watch's prefix
 | 
			
		||||
        paginator = self.s3.get_paginator('list_objects_v2')
 | 
			
		||||
        pages = paginator.paginate(
 | 
			
		||||
            Bucket=self.bucket_name,
 | 
			
		||||
            Prefix=watch_prefix
 | 
			
		||||
        )
 | 
			
		||||
        
 | 
			
		||||
        # Delete all objects in batches
 | 
			
		||||
        for page in pages:
 | 
			
		||||
            if 'Contents' not in page:
 | 
			
		||||
                continue
 | 
			
		||||
                
 | 
			
		||||
            delete_keys = {'Objects': [{'Key': obj['Key']} for obj in page['Contents']]}
 | 
			
		||||
            self.s3.delete_objects(
 | 
			
		||||
                Bucket=self.bucket_name,
 | 
			
		||||
                Delete=delete_keys
 | 
			
		||||
            )
 | 
			
		||||
    
 | 
			
		||||
    def delete_watch(self, watch_uuid):
 | 
			
		||||
        """Delete a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        # Same implementation as clear_watch_history for S3
 | 
			
		||||
        self.clear_watch_history(watch_uuid)
 | 
			
		||||
							
								
								
									
										230
									
								
								changedetectionio/storage/storage_base.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										230
									
								
								changedetectionio/storage/storage_base.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,230 @@
 | 
			
		||||
from abc import ABC, abstractmethod
 | 
			
		||||
import json
 | 
			
		||||
from loguru import logger
 | 
			
		||||
 | 
			
		||||
class StorageBase(ABC):
 | 
			
		||||
    """Abstract base class for storage backends"""
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
        """Initialize the storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            datastore_path (str): Path to the datastore
 | 
			
		||||
            include_default_watches (bool): Whether to include default watches
 | 
			
		||||
            version_tag (str): Version tag
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def load_data(self):
 | 
			
		||||
        """Load data from the storage backend
 | 
			
		||||
        
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The loaded data
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_data(self, data):
 | 
			
		||||
        """Save data to the storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            data (dict): The data to save
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
 | 
			
		||||
        """Save history text to the storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Contents to save
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            snapshot_id (str): Snapshot ID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: Snapshot filename or ID
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_history_snapshot(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get a history snapshot from the storage backend
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The snapshot content
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_history(self, watch_uuid):
 | 
			
		||||
        """Get history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict: The history with timestamp keys and snapshot IDs as values
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_screenshot(self, watch_uuid, screenshot, as_error=False):
 | 
			
		||||
        """Save a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            screenshot (bytes): Screenshot data
 | 
			
		||||
            as_error (bool): Whether this is an error screenshot
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_screenshot(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get a screenshot for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get the error screenshot
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or None: The screenshot path or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_error_text(self, watch_uuid, contents):
 | 
			
		||||
        """Save error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Error contents
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_error_text(self, watch_uuid):
 | 
			
		||||
        """Get error text for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The error text or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_xpath_data(self, watch_uuid, data, as_error=False):
 | 
			
		||||
        """Save XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            data (dict): XPath data
 | 
			
		||||
            as_error (bool): Whether this is error data
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_xpath_data(self, watch_uuid, is_error=False):
 | 
			
		||||
        """Get XPath data for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            is_error (bool): Whether to get error data
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            dict or None: The XPath data or None if not available
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_last_fetched_html(self, watch_uuid, timestamp, contents):
 | 
			
		||||
        """Save last fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            contents (str): HTML contents
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_fetched_html(self, watch_uuid, timestamp):
 | 
			
		||||
        """Get fetched HTML for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            timestamp (int): Timestamp
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str or False: The HTML or False if not available
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def save_last_text_fetched_before_filters(self, watch_uuid, contents):
 | 
			
		||||
        """Save the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            contents (str): Text contents
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def get_last_fetched_text_before_filters(self, watch_uuid):
 | 
			
		||||
        """Get the last text fetched before filters
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            str: The text
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def ensure_data_dir_exists(self, watch_uuid):
 | 
			
		||||
        """Ensure the data directory exists for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def visualselector_data_is_ready(self, watch_uuid):
 | 
			
		||||
        """Check if visual selector data is ready
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
            
 | 
			
		||||
        Returns:
 | 
			
		||||
            bool: Whether visual selector data is ready
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def clear_watch_history(self, watch_uuid):
 | 
			
		||||
        """Clear history for a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
    
 | 
			
		||||
    @abstractmethod
 | 
			
		||||
    def delete_watch(self, watch_uuid):
 | 
			
		||||
        """Delete a watch
 | 
			
		||||
        
 | 
			
		||||
        Args:
 | 
			
		||||
            watch_uuid (str): Watch UUID
 | 
			
		||||
        """
 | 
			
		||||
        pass
 | 
			
		||||
							
								
								
									
										33
									
								
								changedetectionio/storage/storage_factory.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										33
									
								
								changedetectionio/storage/storage_factory.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,33 @@
 | 
			
		||||
import re
 | 
			
		||||
from loguru import logger
 | 
			
		||||
from urllib.parse import urlparse
 | 
			
		||||
 | 
			
		||||
from .storage_base import StorageBase
 | 
			
		||||
from .filesystem_storage import FileSystemStorage
 | 
			
		||||
from .mongodb_storage import MongoDBStorage
 | 
			
		||||
from .s3_storage import S3Storage
 | 
			
		||||
 | 
			
		||||
def create_storage(datastore_path, include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
    """Create a storage backend based on the datastore path
 | 
			
		||||
    
 | 
			
		||||
    Args:
 | 
			
		||||
        datastore_path (str): Path to the datastore
 | 
			
		||||
        include_default_watches (bool): Whether to include default watches
 | 
			
		||||
        version_tag (str): Version tag
 | 
			
		||||
        
 | 
			
		||||
    Returns:
 | 
			
		||||
        StorageBase: The storage backend
 | 
			
		||||
    """
 | 
			
		||||
    # Check if it's a MongoDB URI
 | 
			
		||||
    if datastore_path.startswith('mongodb://') or datastore_path.startswith('mongodb+srv://'):
 | 
			
		||||
        logger.info(f"Using MongoDB storage backend with URI {datastore_path}")
 | 
			
		||||
        return MongoDBStorage(datastore_path, include_default_watches, version_tag)
 | 
			
		||||
    
 | 
			
		||||
    # Check if it's an S3 URI
 | 
			
		||||
    if datastore_path.startswith('s3://'):
 | 
			
		||||
        logger.info(f"Using S3 storage backend with URI {datastore_path}")
 | 
			
		||||
        return S3Storage(datastore_path, include_default_watches, version_tag)
 | 
			
		||||
    
 | 
			
		||||
    # Default to filesystem
 | 
			
		||||
    logger.info(f"Using filesystem storage backend with path {datastore_path}")
 | 
			
		||||
    return FileSystemStorage(datastore_path, include_default_watches, version_tag)
 | 
			
		||||
@@ -20,6 +20,7 @@ from loguru import logger
 | 
			
		||||
 | 
			
		||||
from .processors import get_custom_watch_obj_for_processor
 | 
			
		||||
from .processors.restock_diff import Restock
 | 
			
		||||
from .storage.storage_factory import create_storage
 | 
			
		||||
 | 
			
		||||
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
 | 
			
		||||
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
 | 
			
		||||
@@ -38,20 +39,28 @@ class ChangeDetectionStore:
 | 
			
		||||
    needs_write_urgent = False
 | 
			
		||||
 | 
			
		||||
    __version_check = True
 | 
			
		||||
    
 | 
			
		||||
    # Singleton instance for access from Watch class methods
 | 
			
		||||
    instance = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
 | 
			
		||||
        # Should only be active for docker
 | 
			
		||||
        # logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
 | 
			
		||||
        self.__data = App.model()
 | 
			
		||||
        self.datastore_path = datastore_path
 | 
			
		||||
        self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
 | 
			
		||||
        logger.info(f"Datastore path is '{self.json_store_path}'")
 | 
			
		||||
        
 | 
			
		||||
        # Create the appropriate storage backend based on the datastore path
 | 
			
		||||
        self.storage = create_storage(datastore_path, include_default_watches, version_tag)
 | 
			
		||||
        
 | 
			
		||||
        self.needs_write = False
 | 
			
		||||
        self.start_time = time.time()
 | 
			
		||||
        self.stop_thread = False
 | 
			
		||||
        # Base definition for all watchers
 | 
			
		||||
        # deepcopy part of #569 - not sure why its needed exactly
 | 
			
		||||
        self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={}))
 | 
			
		||||
        self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
 | 
			
		||||
        
 | 
			
		||||
        # Set singleton instance
 | 
			
		||||
        ChangeDetectionStore.instance = self
 | 
			
		||||
 | 
			
		||||
        if path.isfile('changedetectionio/source.txt'):
 | 
			
		||||
            with open('changedetectionio/source.txt') as f:
 | 
			
		||||
@@ -60,10 +69,9 @@ class ChangeDetectionStore:
 | 
			
		||||
                self.__data['build_sha'] = f.read()
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # @todo retest with ", encoding='utf-8'"
 | 
			
		||||
            with open(self.json_store_path) as json_file:
 | 
			
		||||
                from_disk = json.load(json_file)
 | 
			
		||||
 | 
			
		||||
            # Load data from storage
 | 
			
		||||
            from_disk = self.storage.load_data()
 | 
			
		||||
            if from_disk:
 | 
			
		||||
                # @todo isnt there a way todo this dict.update recursively?
 | 
			
		||||
                # Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
 | 
			
		||||
                if 'watching' in from_disk:
 | 
			
		||||
@@ -91,22 +99,24 @@ class ChangeDetectionStore:
 | 
			
		||||
                for uuid, tag in self.__data['settings']['application']['tags'].items():
 | 
			
		||||
                    self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
 | 
			
		||||
                    logger.info(f"Tag: {uuid} {tag['title']}")
 | 
			
		||||
            else:
 | 
			
		||||
                # First time ran, Create the datastore.
 | 
			
		||||
                if include_default_watches:
 | 
			
		||||
                    logger.critical(f"No data store found, creating new store")
 | 
			
		||||
                    self.add_watch(url='https://news.ycombinator.com/',
 | 
			
		||||
                                  tag='Tech news',
 | 
			
		||||
                                  extras={'fetch_backend': 'html_requests'})
 | 
			
		||||
 | 
			
		||||
        # First time ran, Create the datastore.
 | 
			
		||||
        except (FileNotFoundError):
 | 
			
		||||
            if include_default_watches:
 | 
			
		||||
                logger.critical(f"No JSON DB found at {self.json_store_path}, creating JSON store at {self.datastore_path}")
 | 
			
		||||
                self.add_watch(url='https://news.ycombinator.com/',
 | 
			
		||||
                               tag='Tech news',
 | 
			
		||||
                               extras={'fetch_backend': 'html_requests'})
 | 
			
		||||
                    self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
 | 
			
		||||
                                  tag='changedetection.io',
 | 
			
		||||
                                  extras={'fetch_backend': 'html_requests'})
 | 
			
		||||
 | 
			
		||||
                self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
 | 
			
		||||
                               tag='changedetection.io',
 | 
			
		||||
                               extras={'fetch_backend': 'html_requests'})
 | 
			
		||||
 | 
			
		||||
            updates_available = self.get_updates_available()
 | 
			
		||||
            self.__data['settings']['application']['schema_version'] = updates_available.pop()
 | 
			
		||||
                updates_available = self.get_updates_available()
 | 
			
		||||
                self.__data['settings']['application']['schema_version'] = updates_available.pop()
 | 
			
		||||
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error loading data from storage: {str(e)}")
 | 
			
		||||
            raise e
 | 
			
		||||
        else:
 | 
			
		||||
            # Bump the update version by running updates
 | 
			
		||||
            self.run_updates()
 | 
			
		||||
@@ -227,23 +237,15 @@ class ChangeDetectionStore:
 | 
			
		||||
 | 
			
		||||
    # Delete a single watch by UUID
 | 
			
		||||
    def delete(self, uuid):
 | 
			
		||||
        import pathlib
 | 
			
		||||
        import shutil
 | 
			
		||||
 | 
			
		||||
        with self.lock:
 | 
			
		||||
            if uuid == 'all':
 | 
			
		||||
                # Delete all watches
 | 
			
		||||
                for watch_uuid in list(self.data['watching'].keys()):
 | 
			
		||||
                    self.storage.delete_watch(watch_uuid)
 | 
			
		||||
                self.__data['watching'] = {}
 | 
			
		||||
 | 
			
		||||
                # GitHub #30 also delete history records
 | 
			
		||||
                for uuid in self.data['watching']:
 | 
			
		||||
                    path = pathlib.Path(os.path.join(self.datastore_path, uuid))
 | 
			
		||||
                    if os.path.exists(path):
 | 
			
		||||
                        shutil.rmtree(path)
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                path = pathlib.Path(os.path.join(self.datastore_path, uuid))
 | 
			
		||||
                if os.path.exists(path):
 | 
			
		||||
                    shutil.rmtree(path)
 | 
			
		||||
                # Delete a single watch
 | 
			
		||||
                self.storage.delete_watch(uuid)
 | 
			
		||||
                del self.data['watching'][uuid]
 | 
			
		||||
 | 
			
		||||
        self.needs_write_urgent = True
 | 
			
		||||
@@ -266,6 +268,7 @@ class ChangeDetectionStore:
 | 
			
		||||
 | 
			
		||||
    # Remove a watchs data but keep the entry (URL etc)
 | 
			
		||||
    def clear_watch_history(self, uuid):
 | 
			
		||||
        self.storage.clear_watch_history(uuid)
 | 
			
		||||
        self.__data['watching'][uuid].clear_watch()
 | 
			
		||||
        self.needs_write_urgent = True
 | 
			
		||||
 | 
			
		||||
@@ -372,43 +375,30 @@ class ChangeDetectionStore:
 | 
			
		||||
        return new_uuid
 | 
			
		||||
 | 
			
		||||
    def visualselector_data_is_ready(self, watch_uuid):
 | 
			
		||||
        output_path = "{}/{}".format(self.datastore_path, watch_uuid)
 | 
			
		||||
        screenshot_filename = "{}/last-screenshot.png".format(output_path)
 | 
			
		||||
        elements_index_filename = "{}/elements.deflate".format(output_path)
 | 
			
		||||
        if path.isfile(screenshot_filename) and  path.isfile(elements_index_filename) :
 | 
			
		||||
            return True
 | 
			
		||||
 | 
			
		||||
        return False
 | 
			
		||||
        return self.storage.visualselector_data_is_ready(watch_uuid)
 | 
			
		||||
 | 
			
		||||
    def sync_to_json(self):
 | 
			
		||||
        logger.info("Saving JSON..")
 | 
			
		||||
        logger.info("Saving data to storage backend...")
 | 
			
		||||
        try:
 | 
			
		||||
            data = deepcopy(self.__data)
 | 
			
		||||
        except RuntimeError as e:
 | 
			
		||||
            # Try again in 15 seconds
 | 
			
		||||
            time.sleep(15)
 | 
			
		||||
            logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
 | 
			
		||||
            logger.error(f"! Data changed when writing to storage, trying again.. {str(e)}")
 | 
			
		||||
            self.sync_to_json()
 | 
			
		||||
            return
 | 
			
		||||
        else:
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                # Re #286  - First write to a temp file, then confirm it looks OK and rename it
 | 
			
		||||
                # This is a fairly basic strategy to deal with the case that the file is corrupted,
 | 
			
		||||
                # system was out of memory, out of RAM etc
 | 
			
		||||
                with open(self.json_store_path+".tmp", 'w') as json_file:
 | 
			
		||||
                    json.dump(data, json_file, indent=4)
 | 
			
		||||
                os.replace(self.json_store_path+".tmp", self.json_store_path)
 | 
			
		||||
                self.storage.save_data(data)
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
 | 
			
		||||
                logger.error(f"Error writing to storage backend: {str(e)}")
 | 
			
		||||
 | 
			
		||||
            self.needs_write = False
 | 
			
		||||
            self.needs_write_urgent = False
 | 
			
		||||
 | 
			
		||||
    # Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON
 | 
			
		||||
    # Thread runner, this helps with thread/write issues when there are many operations that want to update the data
 | 
			
		||||
    # by just running periodically in one thread, according to python, dict updates are threadsafe.
 | 
			
		||||
    def save_datastore(self):
 | 
			
		||||
 | 
			
		||||
        while True:
 | 
			
		||||
            if self.stop_thread:
 | 
			
		||||
                # Suppressing "Logging error in Loguru Handler #0" during CICD.
 | 
			
		||||
 
 | 
			
		||||
@@ -100,3 +100,6 @@ referencing==0.35.1
 | 
			
		||||
 | 
			
		||||
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
 | 
			
		||||
tzdata
 | 
			
		||||
 | 
			
		||||
pymongo>=4.3.3
 | 
			
		||||
boto3>=1.26.0
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user