mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-10-30 14:17:40 +00:00
Compare commits
3 Commits
openapi-me
...
abstracted
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
80434fa16a | ||
|
|
db10422415 | ||
|
|
380e571ded |
@@ -5,6 +5,7 @@ recursive-include changedetectionio/content_fetchers *
|
||||
recursive-include changedetectionio/model *
|
||||
recursive-include changedetectionio/processors *
|
||||
recursive-include changedetectionio/static *
|
||||
recursive-include changedetectionio/storage *
|
||||
recursive-include changedetectionio/templates *
|
||||
recursive-include changedetectionio/tests *
|
||||
prune changedetectionio/static/package-lock.json
|
||||
|
||||
@@ -299,34 +299,17 @@ class model(watch_base):
|
||||
# Save some text file to the appropriate path and bump the history
|
||||
# result_obj from fetch_site_status.run()
|
||||
def save_history_text(self, contents, timestamp, snapshot_id):
|
||||
import brotli
|
||||
from changedetectionio.store import ChangeDetectionStore
|
||||
from changedetectionio.storage.filesystem_storage import FileSystemStorage
|
||||
|
||||
logger.trace(f"{self.get('uuid')} - Updating history.txt with timestamp {timestamp}")
|
||||
|
||||
self.ensure_data_dir_exists()
|
||||
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
dest = os.path.join(self.watch_data_dir, snapshot_fname)
|
||||
if not os.path.exists(dest):
|
||||
with open(dest, 'wb') as f:
|
||||
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
|
||||
else:
|
||||
snapshot_fname = f"{snapshot_id}.txt"
|
||||
dest = os.path.join(self.watch_data_dir, snapshot_fname)
|
||||
if not os.path.exists(dest):
|
||||
with open(dest, 'wb') as f:
|
||||
f.write(contents.encode('utf-8'))
|
||||
|
||||
# Append to index
|
||||
# @todo check last char was \n
|
||||
index_fname = os.path.join(self.watch_data_dir, "history.txt")
|
||||
with open(index_fname, 'a') as f:
|
||||
f.write("{},{}\n".format(timestamp, snapshot_fname))
|
||||
f.close()
|
||||
# Get storage from singleton store or create a filesystem storage as default
|
||||
store = ChangeDetectionStore.instance if hasattr(ChangeDetectionStore, 'instance') else None
|
||||
storage = store.storage if store and hasattr(store, 'storage') else FileSystemStorage(self.__datastore_path)
|
||||
|
||||
# Use the storage backend to save the history text
|
||||
snapshot_fname = storage.save_history_text(self.get('uuid'), contents, timestamp, snapshot_id)
|
||||
|
||||
self.__newest_history_key = timestamp
|
||||
self.__history_n += 1
|
||||
|
||||
61
changedetectionio/storage/README.md
Normal file
61
changedetectionio/storage/README.md
Normal file
@@ -0,0 +1,61 @@
|
||||
# Storage Backends for changedetection.io
|
||||
|
||||
This module provides different storage backends for changedetection.io, allowing you to store data in various systems:
|
||||
|
||||
- **FileSystemStorage**: The default storage backend that stores data on the local filesystem.
|
||||
- **MongoDBStorage**: Stores data in a MongoDB database.
|
||||
- **S3Storage**: Stores data in an Amazon S3 bucket.
|
||||
|
||||
## Usage
|
||||
|
||||
The storage backend is automatically selected based on the datastore path provided when initializing the application:
|
||||
|
||||
- For filesystem storage (default): `/datastore`
|
||||
- For MongoDB storage: `mongodb://username:password@host:port/database`
|
||||
- For S3 storage: `s3://bucket-name/optional-prefix`
|
||||
|
||||
## Configuration
|
||||
|
||||
### Filesystem Storage
|
||||
|
||||
The default storage backend. Simply specify a directory path:
|
||||
|
||||
```
|
||||
changedetection.io -d /path/to/datastore
|
||||
```
|
||||
|
||||
### MongoDB Storage
|
||||
|
||||
To use MongoDB storage, specify a MongoDB connection URI:
|
||||
|
||||
```
|
||||
changedetection.io -d mongodb://username:password@host:port/database
|
||||
```
|
||||
|
||||
Make sure to install the required dependencies:
|
||||
|
||||
```
|
||||
pip install -r requirements-storage.txt
|
||||
```
|
||||
|
||||
### Amazon S3 Storage
|
||||
|
||||
To use S3 storage, specify an S3 URI:
|
||||
|
||||
```
|
||||
changedetection.io -d s3://bucket-name/optional-prefix
|
||||
```
|
||||
|
||||
Make sure to:
|
||||
1. Install the required dependencies: `pip install -r requirements-storage.txt`
|
||||
2. Configure AWS credentials using environment variables or IAM roles:
|
||||
- Set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables
|
||||
- Or use an IAM role when running on AWS EC2/ECS/EKS
|
||||
|
||||
## Custom Storage Backends
|
||||
|
||||
You can create custom storage backends by:
|
||||
|
||||
1. Subclassing the `StorageBase` abstract class in `storage_base.py`
|
||||
2. Implementing all required methods
|
||||
3. Adding your backend to the `storage_factory.py` file
|
||||
1
changedetectionio/storage/__init__.py
Normal file
1
changedetectionio/storage/__init__.py
Normal file
@@ -0,0 +1 @@
|
||||
# This module contains storage backend implementations
|
||||
449
changedetectionio/storage/filesystem_storage.py
Normal file
449
changedetectionio/storage/filesystem_storage.py
Normal file
@@ -0,0 +1,449 @@
|
||||
import os
|
||||
import shutil
|
||||
import json
|
||||
import brotli
|
||||
import zlib
|
||||
import pathlib
|
||||
from loguru import logger
|
||||
from os import path
|
||||
|
||||
from .storage_base import StorageBase
|
||||
|
||||
class FileSystemStorage(StorageBase):
|
||||
"""File system storage backend"""
|
||||
|
||||
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
|
||||
"""Initialize the file system storage backend
|
||||
|
||||
Args:
|
||||
datastore_path (str): Path to the datastore
|
||||
include_default_watches (bool): Whether to include default watches
|
||||
version_tag (str): Version tag
|
||||
"""
|
||||
self.datastore_path = datastore_path
|
||||
self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
|
||||
logger.info(f"Datastore path is '{self.json_store_path}'")
|
||||
|
||||
def load_data(self):
|
||||
"""Load data from the file system
|
||||
|
||||
Returns:
|
||||
dict: The loaded data
|
||||
"""
|
||||
if not path.isfile(self.json_store_path):
|
||||
return None
|
||||
|
||||
with open(self.json_store_path) as json_file:
|
||||
return json.load(json_file)
|
||||
|
||||
def save_data(self, data):
|
||||
"""Save data to the file system
|
||||
|
||||
Args:
|
||||
data (dict): The data to save
|
||||
"""
|
||||
try:
|
||||
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
|
||||
# This is a fairly basic strategy to deal with the case that the file is corrupted,
|
||||
# system was out of memory, out of RAM etc
|
||||
with open(self.json_store_path+".tmp", 'w') as json_file:
|
||||
json.dump(data, json_file, indent=4)
|
||||
os.replace(self.json_store_path+".tmp", self.json_store_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
|
||||
raise e
|
||||
|
||||
def get_watch_dir(self, watch_uuid):
|
||||
"""Get the directory for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The watch directory
|
||||
"""
|
||||
return os.path.join(self.datastore_path, watch_uuid)
|
||||
|
||||
def ensure_data_dir_exists(self, watch_uuid):
|
||||
"""Ensure the data directory exists for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
if not os.path.isdir(watch_dir):
|
||||
logger.debug(f"> Creating data dir {watch_dir}")
|
||||
os.makedirs(watch_dir, exist_ok=True)
|
||||
|
||||
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
|
||||
"""Save history text to the file system
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Contents to save
|
||||
timestamp (int): Timestamp
|
||||
snapshot_id (str): Snapshot ID
|
||||
|
||||
Returns:
|
||||
str: Snapshot filename
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
|
||||
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
dest = os.path.join(watch_dir, snapshot_fname)
|
||||
if not os.path.exists(dest):
|
||||
with open(dest, 'wb') as f:
|
||||
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
|
||||
else:
|
||||
snapshot_fname = f"{snapshot_id}.txt"
|
||||
dest = os.path.join(watch_dir, snapshot_fname)
|
||||
if not os.path.exists(dest):
|
||||
with open(dest, 'wb') as f:
|
||||
f.write(contents.encode('utf-8'))
|
||||
|
||||
# Append to index
|
||||
index_fname = os.path.join(watch_dir, "history.txt")
|
||||
with open(index_fname, 'a') as f:
|
||||
f.write("{},{}\n".format(timestamp, snapshot_fname))
|
||||
|
||||
return snapshot_fname
|
||||
|
||||
def get_history(self, watch_uuid):
|
||||
"""Get history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
dict: The history with timestamp keys and snapshot IDs as values
|
||||
"""
|
||||
tmp_history = {}
|
||||
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
if not os.path.isdir(watch_dir):
|
||||
return tmp_history
|
||||
|
||||
# Read the history file as a dict
|
||||
fname = os.path.join(watch_dir, "history.txt")
|
||||
if os.path.isfile(fname):
|
||||
logger.debug(f"Reading watch history index for {watch_uuid}")
|
||||
with open(fname, "r") as f:
|
||||
for i in f.readlines():
|
||||
if ',' in i:
|
||||
k, v = i.strip().split(',', 2)
|
||||
|
||||
# The index history could contain a relative path, so we need to make the fullpath
|
||||
# so that python can read it
|
||||
if not '/' in v and not '\'' in v:
|
||||
v = os.path.join(watch_dir, v)
|
||||
else:
|
||||
# It's possible that they moved the datadir on older versions
|
||||
# So the snapshot exists but is in a different path
|
||||
snapshot_fname = v.split('/')[-1]
|
||||
proposed_new_path = os.path.join(watch_dir, snapshot_fname)
|
||||
if not os.path.exists(v) and os.path.exists(proposed_new_path):
|
||||
v = proposed_new_path
|
||||
|
||||
tmp_history[k] = v
|
||||
|
||||
return tmp_history
|
||||
|
||||
def get_history_snapshot(self, watch_uuid, timestamp):
|
||||
"""Get a history snapshot from the file system
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str: The snapshot content
|
||||
"""
|
||||
history = self.get_history(watch_uuid)
|
||||
if not timestamp in history:
|
||||
return None
|
||||
|
||||
filepath = history[timestamp]
|
||||
|
||||
# See if a brotli versions exists and switch to that
|
||||
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
||||
filepath = f"{filepath}.br"
|
||||
|
||||
# OR in the backup case that the .br does not exist, but the plain one does
|
||||
if filepath.endswith('.br') and not os.path.isfile(filepath):
|
||||
if os.path.isfile(filepath.replace('.br', '')):
|
||||
filepath = filepath.replace('.br', '')
|
||||
|
||||
if filepath.endswith('.br'):
|
||||
# Brotli doesnt have a fileheader to detect it, so we rely on filename
|
||||
# https://www.rfc-editor.org/rfc/rfc7932
|
||||
with open(filepath, 'rb') as f:
|
||||
return(brotli.decompress(f.read()).decode('utf-8'))
|
||||
|
||||
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
return f.read()
|
||||
|
||||
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
|
||||
"""Save a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
screenshot (bytes): Screenshot data
|
||||
as_error (bool): Whether this is an error screenshot
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
if as_error:
|
||||
target_path = os.path.join(watch_dir, "last-error-screenshot.png")
|
||||
else:
|
||||
target_path = os.path.join(watch_dir, "last-screenshot.png")
|
||||
|
||||
with open(target_path, 'wb') as f:
|
||||
f.write(screenshot)
|
||||
|
||||
def get_screenshot(self, watch_uuid, is_error=False):
|
||||
"""Get a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get the error screenshot
|
||||
|
||||
Returns:
|
||||
str or None: The screenshot path or None if not available
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
if is_error:
|
||||
fname = os.path.join(watch_dir, "last-error-screenshot.png")
|
||||
else:
|
||||
fname = os.path.join(watch_dir, "last-screenshot.png")
|
||||
|
||||
if os.path.isfile(fname):
|
||||
return fname
|
||||
|
||||
return None
|
||||
|
||||
def save_error_text(self, watch_uuid, contents):
|
||||
"""Save error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Error contents
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
target_path = os.path.join(watch_dir, "last-error.txt")
|
||||
with open(target_path, 'w', encoding='utf-8') as f:
|
||||
f.write(contents)
|
||||
|
||||
def get_error_text(self, watch_uuid):
|
||||
"""Get error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str or False: The error text or False if not available
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
fname = os.path.join(watch_dir, "last-error.txt")
|
||||
|
||||
if os.path.isfile(fname):
|
||||
with open(fname, 'r') as f:
|
||||
return f.read()
|
||||
|
||||
return False
|
||||
|
||||
def save_xpath_data(self, watch_uuid, data, as_error=False):
|
||||
"""Save XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
data (dict): XPath data
|
||||
as_error (bool): Whether this is error data
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
if as_error:
|
||||
target_path = os.path.join(watch_dir, "elements-error.deflate")
|
||||
else:
|
||||
target_path = os.path.join(watch_dir, "elements.deflate")
|
||||
|
||||
with open(target_path, 'wb') as f:
|
||||
f.write(zlib.compress(json.dumps(data).encode()))
|
||||
|
||||
def get_xpath_data(self, watch_uuid, is_error=False):
|
||||
"""Get XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get error data
|
||||
|
||||
Returns:
|
||||
dict or None: The XPath data or None if not available
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
if is_error:
|
||||
path = os.path.join(watch_dir, "elements-error.deflate")
|
||||
else:
|
||||
path = os.path.join(watch_dir, "elements.deflate")
|
||||
|
||||
if not os.path.isfile(path):
|
||||
return None
|
||||
|
||||
with open(path, 'rb') as f:
|
||||
return json.loads(zlib.decompress(f.read()).decode('utf-8'))
|
||||
|
||||
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
|
||||
"""Save last fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
contents (str): HTML contents
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
snapshot_fname = f"{timestamp}.html.br"
|
||||
filepath = os.path.join(watch_dir, snapshot_fname)
|
||||
|
||||
with open(filepath, 'wb') as f:
|
||||
contents = contents.encode('utf-8') if isinstance(contents, str) else contents
|
||||
try:
|
||||
f.write(brotli.compress(contents))
|
||||
except Exception as e:
|
||||
logger.warning(f"{watch_uuid} - Unable to compress snapshot, saving as raw data to {filepath}")
|
||||
logger.warning(e)
|
||||
f.write(contents)
|
||||
|
||||
# Prune old snapshots - keep only the newest 2
|
||||
self._prune_last_fetched_html_snapshots(watch_uuid)
|
||||
|
||||
def _prune_last_fetched_html_snapshots(self, watch_uuid):
|
||||
"""Prune old HTML snapshots
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
history = self.get_history(watch_uuid)
|
||||
|
||||
dates = list(history.keys())
|
||||
dates.reverse()
|
||||
|
||||
for index, timestamp in enumerate(dates):
|
||||
snapshot_fname = f"{timestamp}.html.br"
|
||||
filepath = os.path.join(watch_dir, snapshot_fname)
|
||||
|
||||
# Keep only the first 2
|
||||
if index > 1 and os.path.isfile(filepath):
|
||||
os.remove(filepath)
|
||||
|
||||
def get_fetched_html(self, watch_uuid, timestamp):
|
||||
"""Get fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str or False: The HTML or False if not available
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
snapshot_fname = f"{timestamp}.html.br"
|
||||
filepath = os.path.join(watch_dir, snapshot_fname)
|
||||
|
||||
if os.path.isfile(filepath):
|
||||
with open(filepath, 'rb') as f:
|
||||
return brotli.decompress(f.read()).decode('utf-8')
|
||||
|
||||
return False
|
||||
|
||||
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
|
||||
"""Save the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Text contents
|
||||
"""
|
||||
self.ensure_data_dir_exists(watch_uuid)
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
|
||||
filepath = os.path.join(watch_dir, 'last-fetched.br')
|
||||
with open(filepath, 'wb') as f:
|
||||
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
|
||||
|
||||
def get_last_fetched_text_before_filters(self, watch_uuid):
|
||||
"""Get the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The text
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
filepath = os.path.join(watch_dir, 'last-fetched.br')
|
||||
|
||||
if not os.path.isfile(filepath):
|
||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||
history = self.get_history(watch_uuid)
|
||||
dates = list(history.keys())
|
||||
|
||||
if len(dates):
|
||||
return self.get_history_snapshot(watch_uuid, dates[-1])
|
||||
else:
|
||||
return ''
|
||||
|
||||
with open(filepath, 'rb') as f:
|
||||
return brotli.decompress(f.read()).decode('utf-8')
|
||||
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
"""Check if visual selector data is ready
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
bool: Whether visual selector data is ready
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
screenshot_filename = os.path.join(watch_dir, "last-screenshot.png")
|
||||
elements_index_filename = os.path.join(watch_dir, "elements.deflate")
|
||||
|
||||
return path.isfile(screenshot_filename) and path.isfile(elements_index_filename)
|
||||
|
||||
def clear_watch_history(self, watch_uuid):
|
||||
"""Clear history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
if not os.path.exists(watch_dir):
|
||||
return
|
||||
|
||||
# Delete all files but keep the directory
|
||||
for item in pathlib.Path(watch_dir).glob("*.*"):
|
||||
os.unlink(item)
|
||||
|
||||
def delete_watch(self, watch_uuid):
|
||||
"""Delete a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_dir = self.get_watch_dir(watch_uuid)
|
||||
if os.path.exists(watch_dir):
|
||||
shutil.rmtree(watch_dir)
|
||||
466
changedetectionio/storage/mongodb_storage.py
Normal file
466
changedetectionio/storage/mongodb_storage.py
Normal file
@@ -0,0 +1,466 @@
|
||||
import os
|
||||
from copy import deepcopy
|
||||
|
||||
import brotli
|
||||
import zlib
|
||||
import json
|
||||
import time
|
||||
from loguru import logger
|
||||
from pymongo import MongoClient
|
||||
from urllib.parse import urlparse
|
||||
import base64
|
||||
|
||||
from .storage_base import StorageBase
|
||||
|
||||
class MongoDBStorage(StorageBase):
|
||||
"""MongoDB storage backend"""
|
||||
|
||||
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
|
||||
"""Initialize the MongoDB storage backend
|
||||
|
||||
Args:
|
||||
datastore_path (str): MongoDB connection URI
|
||||
include_default_watches (bool): Whether to include default watches
|
||||
version_tag (str): Version tag
|
||||
"""
|
||||
# Parse MongoDB URI from datastore_path
|
||||
parsed_uri = urlparse(datastore_path)
|
||||
self.db_name = parsed_uri.path.lstrip('/')
|
||||
if not self.db_name:
|
||||
self.db_name = 'changedetection'
|
||||
|
||||
# Connect to MongoDB
|
||||
self.client = MongoClient(datastore_path)
|
||||
self.db = self.client[self.db_name]
|
||||
|
||||
# Collections
|
||||
self.app_collection = self.db['app']
|
||||
self.watches_collection = self.db['watches']
|
||||
self.snapshots_collection = self.db['snapshots']
|
||||
self.history_collection = self.db['history']
|
||||
self.error_collection = self.db['errors']
|
||||
self.xpath_collection = self.db['xpath']
|
||||
self.html_collection = self.db['html']
|
||||
|
||||
logger.info(f"MongoDB storage initialized, connected to {datastore_path}")
|
||||
|
||||
def load_data(self):
|
||||
"""Load data from MongoDB
|
||||
|
||||
Returns:
|
||||
dict: The loaded data
|
||||
"""
|
||||
app_data = self.app_collection.find_one({'_id': 'app_data'})
|
||||
if not app_data:
|
||||
return None
|
||||
|
||||
# Remove MongoDB _id field
|
||||
if '_id' in app_data:
|
||||
del app_data['_id']
|
||||
|
||||
return app_data
|
||||
|
||||
def save_data(self, data):
|
||||
"""Save data to MongoDB
|
||||
|
||||
Args:
|
||||
data (dict): The data to save
|
||||
"""
|
||||
try:
|
||||
# Create a copy to modify
|
||||
data_copy = deepcopy(data)
|
||||
|
||||
# Set _id for app data
|
||||
data_copy['_id'] = 'app_data'
|
||||
|
||||
# Insert or update app data
|
||||
self.app_collection.replace_one({'_id': 'app_data'}, data_copy, upsert=True)
|
||||
|
||||
# Also store watches separately for more granular access
|
||||
# This provides a safety net in case of corrupted app_data
|
||||
watches = data.get('watching', {})
|
||||
for uuid, watch in watches.items():
|
||||
if isinstance(watch, dict): # Handle case where watch is a Watch object
|
||||
watch_copy = deepcopy(dict(watch))
|
||||
else:
|
||||
watch_copy = deepcopy(watch)
|
||||
watch_copy['_id'] = uuid
|
||||
self.watches_collection.replace_one({'_id': uuid}, watch_copy, upsert=True)
|
||||
|
||||
# Also store tags separately
|
||||
if 'settings' in data and 'application' in data['settings'] and 'tags' in data['settings']['application']:
|
||||
tags = data['settings']['application']['tags']
|
||||
for uuid, tag in tags.items():
|
||||
if isinstance(tag, dict): # Handle case where tag is a Tag object
|
||||
tag_copy = deepcopy(dict(tag))
|
||||
else:
|
||||
tag_copy = deepcopy(tag)
|
||||
tag_copy['_id'] = uuid
|
||||
self.db['tags'].replace_one({'_id': uuid}, tag_copy, upsert=True)
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing to MongoDB: {str(e)}")
|
||||
raise e
|
||||
|
||||
def ensure_data_dir_exists(self, watch_uuid):
|
||||
"""Ensure the data directory exists for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# MongoDB doesn't need directories, this is a no-op
|
||||
pass
|
||||
|
||||
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
|
||||
"""Save history text to MongoDB
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Contents to save
|
||||
timestamp (int): Timestamp
|
||||
snapshot_id (str): Snapshot ID
|
||||
|
||||
Returns:
|
||||
str: Snapshot ID
|
||||
"""
|
||||
# Compress the contents
|
||||
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
|
||||
|
||||
# Store the snapshot
|
||||
snapshot_data = {
|
||||
'_id': f"{watch_uuid}:{timestamp}",
|
||||
'watch_uuid': watch_uuid,
|
||||
'timestamp': timestamp,
|
||||
'snapshot_id': snapshot_id,
|
||||
'contents': base64.b64encode(compressed_contents).decode('ascii'),
|
||||
'compressed': True
|
||||
}
|
||||
|
||||
self.snapshots_collection.replace_one({'_id': snapshot_data['_id']}, snapshot_data, upsert=True)
|
||||
|
||||
# Update history index
|
||||
history_entry = {
|
||||
'watch_uuid': watch_uuid,
|
||||
'timestamp': timestamp,
|
||||
'snapshot_id': snapshot_id
|
||||
}
|
||||
|
||||
self.history_collection.replace_one(
|
||||
{'watch_uuid': watch_uuid, 'timestamp': timestamp},
|
||||
history_entry,
|
||||
upsert=True
|
||||
)
|
||||
|
||||
return snapshot_id
|
||||
|
||||
def get_history(self, watch_uuid):
|
||||
"""Get history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
dict: The history with timestamp keys and snapshot IDs as values
|
||||
"""
|
||||
history = {}
|
||||
|
||||
# Query history entries for this watch
|
||||
entries = self.history_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', 1)
|
||||
|
||||
for entry in entries:
|
||||
history[str(entry['timestamp'])] = entry['snapshot_id']
|
||||
|
||||
return history
|
||||
|
||||
def get_history_snapshot(self, watch_uuid, timestamp):
|
||||
"""Get a history snapshot from MongoDB
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str: The snapshot content
|
||||
"""
|
||||
# Query for the snapshot
|
||||
snapshot = self.snapshots_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
|
||||
|
||||
if not snapshot:
|
||||
return None
|
||||
|
||||
if snapshot.get('compressed', False):
|
||||
# Decompress the contents
|
||||
compressed_data = base64.b64decode(snapshot['contents'])
|
||||
return brotli.decompress(compressed_data).decode('utf-8')
|
||||
else:
|
||||
return snapshot['contents']
|
||||
|
||||
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
|
||||
"""Save a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
screenshot (bytes): Screenshot data
|
||||
as_error (bool): Whether this is an error screenshot
|
||||
"""
|
||||
collection_name = 'error_screenshots' if as_error else 'screenshots'
|
||||
collection = self.db[collection_name]
|
||||
|
||||
# Encode the screenshot as base64
|
||||
encoded_screenshot = base64.b64encode(screenshot).decode('ascii')
|
||||
|
||||
screenshot_data = {
|
||||
'_id': watch_uuid,
|
||||
'watch_uuid': watch_uuid,
|
||||
'screenshot': encoded_screenshot,
|
||||
'timestamp': int(time.time())
|
||||
}
|
||||
|
||||
collection.replace_one({'_id': watch_uuid}, screenshot_data, upsert=True)
|
||||
|
||||
def get_screenshot(self, watch_uuid, is_error=False):
|
||||
"""Get a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get the error screenshot
|
||||
|
||||
Returns:
|
||||
bytes or None: The screenshot data or None if not available
|
||||
"""
|
||||
collection_name = 'error_screenshots' if is_error else 'screenshots'
|
||||
collection = self.db[collection_name]
|
||||
|
||||
screenshot_data = collection.find_one({'_id': watch_uuid})
|
||||
if not screenshot_data:
|
||||
return None
|
||||
|
||||
# Decode the screenshot from base64
|
||||
return base64.b64decode(screenshot_data['screenshot'])
|
||||
|
||||
def save_error_text(self, watch_uuid, contents):
|
||||
"""Save error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Error contents
|
||||
"""
|
||||
error_data = {
|
||||
'_id': watch_uuid,
|
||||
'watch_uuid': watch_uuid,
|
||||
'error_text': contents,
|
||||
'timestamp': int(time.time())
|
||||
}
|
||||
|
||||
self.error_collection.replace_one({'_id': watch_uuid}, error_data, upsert=True)
|
||||
|
||||
def get_error_text(self, watch_uuid):
|
||||
"""Get error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str or False: The error text or False if not available
|
||||
"""
|
||||
error_data = self.error_collection.find_one({'_id': watch_uuid})
|
||||
if not error_data:
|
||||
return False
|
||||
|
||||
return error_data['error_text']
|
||||
|
||||
def save_xpath_data(self, watch_uuid, data, as_error=False):
|
||||
"""Save XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
data (dict): XPath data
|
||||
as_error (bool): Whether this is error data
|
||||
"""
|
||||
# Compress the data
|
||||
compressed_data = zlib.compress(json.dumps(data).encode())
|
||||
|
||||
_id = f"{watch_uuid}:error" if as_error else watch_uuid
|
||||
|
||||
xpath_data = {
|
||||
'_id': _id,
|
||||
'watch_uuid': watch_uuid,
|
||||
'is_error': as_error,
|
||||
'data': base64.b64encode(compressed_data).decode('ascii'),
|
||||
'timestamp': int(time.time())
|
||||
}
|
||||
|
||||
self.xpath_collection.replace_one({'_id': _id}, xpath_data, upsert=True)
|
||||
|
||||
def get_xpath_data(self, watch_uuid, is_error=False):
|
||||
"""Get XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get error data
|
||||
|
||||
Returns:
|
||||
dict or None: The XPath data or None if not available
|
||||
"""
|
||||
_id = f"{watch_uuid}:error" if is_error else watch_uuid
|
||||
|
||||
xpath_data = self.xpath_collection.find_one({'_id': _id})
|
||||
if not xpath_data:
|
||||
return None
|
||||
|
||||
# Decompress the data
|
||||
compressed_data = base64.b64decode(xpath_data['data'])
|
||||
return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
|
||||
|
||||
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
|
||||
"""Save last fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
contents (str): HTML contents
|
||||
"""
|
||||
# Compress the contents
|
||||
contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
|
||||
try:
|
||||
compressed_contents = brotli.compress(contents_bytes)
|
||||
except Exception as e:
|
||||
logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
|
||||
compressed_contents = contents_bytes
|
||||
|
||||
html_data = {
|
||||
'_id': f"{watch_uuid}:{timestamp}",
|
||||
'watch_uuid': watch_uuid,
|
||||
'timestamp': timestamp,
|
||||
'html': base64.b64encode(compressed_contents).decode('ascii'),
|
||||
'compressed': True
|
||||
}
|
||||
|
||||
self.html_collection.replace_one({'_id': html_data['_id']}, html_data, upsert=True)
|
||||
|
||||
# Prune old snapshots - keep only the newest 2
|
||||
self._prune_last_fetched_html_snapshots(watch_uuid)
|
||||
|
||||
def _prune_last_fetched_html_snapshots(self, watch_uuid):
|
||||
"""Prune old HTML snapshots
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# Get all HTML snapshots for this watch, sorted by timestamp descending
|
||||
html_snapshots = list(
|
||||
self.html_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', -1)
|
||||
)
|
||||
|
||||
# Keep only the first 2
|
||||
if len(html_snapshots) > 2:
|
||||
for snapshot in html_snapshots[2:]:
|
||||
self.html_collection.delete_one({'_id': snapshot['_id']})
|
||||
|
||||
def get_fetched_html(self, watch_uuid, timestamp):
|
||||
"""Get fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str or False: The HTML or False if not available
|
||||
"""
|
||||
html_data = self.html_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
|
||||
|
||||
if not html_data:
|
||||
return False
|
||||
|
||||
if html_data.get('compressed', False):
|
||||
# Decompress the contents
|
||||
compressed_data = base64.b64decode(html_data['html'])
|
||||
return brotli.decompress(compressed_data).decode('utf-8')
|
||||
else:
|
||||
return html_data['html']
|
||||
|
||||
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
|
||||
"""Save the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Text contents
|
||||
"""
|
||||
# Compress the contents
|
||||
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
|
||||
|
||||
last_fetched_data = {
|
||||
'_id': watch_uuid,
|
||||
'watch_uuid': watch_uuid,
|
||||
'contents': base64.b64encode(compressed_contents).decode('ascii'),
|
||||
'timestamp': int(time.time())
|
||||
}
|
||||
|
||||
self.db['last_fetched'].replace_one({'_id': watch_uuid}, last_fetched_data, upsert=True)
|
||||
|
||||
def get_last_fetched_text_before_filters(self, watch_uuid):
|
||||
"""Get the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The text
|
||||
"""
|
||||
last_fetched_data = self.db['last_fetched'].find_one({'_id': watch_uuid})
|
||||
|
||||
if not last_fetched_data:
|
||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||
history = self.get_history(watch_uuid)
|
||||
dates = list(history.keys())
|
||||
|
||||
if len(dates):
|
||||
return self.get_history_snapshot(watch_uuid, dates[-1])
|
||||
else:
|
||||
return ''
|
||||
|
||||
# Decompress the contents
|
||||
compressed_data = base64.b64decode(last_fetched_data['contents'])
|
||||
return brotli.decompress(compressed_data).decode('utf-8')
|
||||
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
"""Check if visual selector data is ready
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
bool: Whether visual selector data is ready
|
||||
"""
|
||||
# Check if screenshot and xpath data exist
|
||||
screenshot = self.db['screenshots'].find_one({'_id': watch_uuid})
|
||||
xpath_data = self.xpath_collection.find_one({'_id': watch_uuid})
|
||||
|
||||
return screenshot is not None and xpath_data is not None
|
||||
|
||||
def clear_watch_history(self, watch_uuid):
|
||||
"""Clear history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# Delete all snapshots and history for this watch
|
||||
self.snapshots_collection.delete_many({'watch_uuid': watch_uuid})
|
||||
self.history_collection.delete_many({'watch_uuid': watch_uuid})
|
||||
self.html_collection.delete_many({'watch_uuid': watch_uuid})
|
||||
self.db['last_fetched'].delete_many({'watch_uuid': watch_uuid})
|
||||
self.xpath_collection.delete_many({'watch_uuid': watch_uuid})
|
||||
self.db['screenshots'].delete_many({'watch_uuid': watch_uuid})
|
||||
self.error_collection.delete_many({'watch_uuid': watch_uuid})
|
||||
|
||||
def delete_watch(self, watch_uuid):
|
||||
"""Delete a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# Clear all history data
|
||||
self.clear_watch_history(watch_uuid)
|
||||
|
||||
# Also delete error screenshots
|
||||
self.db['error_screenshots'].delete_many({'watch_uuid': watch_uuid})
|
||||
525
changedetectionio/storage/s3_storage.py
Normal file
525
changedetectionio/storage/s3_storage.py
Normal file
@@ -0,0 +1,525 @@
|
||||
import os
|
||||
import io
|
||||
import json
|
||||
import brotli
|
||||
import zlib
|
||||
import time
|
||||
from loguru import logger
|
||||
import boto3
|
||||
from urllib.parse import urlparse
|
||||
import base64
|
||||
|
||||
from .storage_base import StorageBase
|
||||
|
||||
class S3Storage(StorageBase):
|
||||
"""Amazon S3 storage backend"""
|
||||
|
||||
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
|
||||
"""Initialize the S3 storage backend
|
||||
|
||||
Args:
|
||||
datastore_path (str): S3 URI (s3://bucket-name/optional-prefix)
|
||||
include_default_watches (bool): Whether to include default watches
|
||||
version_tag (str): Version tag
|
||||
"""
|
||||
# Parse S3 URI
|
||||
parsed_uri = urlparse(datastore_path)
|
||||
self.bucket_name = parsed_uri.netloc
|
||||
self.prefix = parsed_uri.path.lstrip('/')
|
||||
|
||||
if self.prefix and not self.prefix.endswith('/'):
|
||||
self.prefix += '/'
|
||||
|
||||
# Initialize S3 client
|
||||
# Uses AWS credentials from environment variables or IAM role
|
||||
self.s3 = boto3.client('s3')
|
||||
|
||||
logger.info(f"S3 storage initialized, using bucket '{self.bucket_name}' with prefix '{self.prefix}'")
|
||||
|
||||
def _get_key(self, path):
|
||||
"""Get the S3 key for a path
|
||||
|
||||
Args:
|
||||
path (str): Path relative to the prefix
|
||||
|
||||
Returns:
|
||||
str: The full S3 key
|
||||
"""
|
||||
return f"{self.prefix}{path}"
|
||||
|
||||
def load_data(self):
|
||||
"""Load data from S3
|
||||
|
||||
Returns:
|
||||
dict: The loaded data
|
||||
"""
|
||||
key = self._get_key("app-data.json")
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
return json.loads(response['Body'].read().decode('utf-8'))
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return None
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading data from S3: {str(e)}")
|
||||
raise e
|
||||
|
||||
def save_data(self, data):
|
||||
"""Save data to S3
|
||||
|
||||
Args:
|
||||
data (dict): The data to save
|
||||
"""
|
||||
try:
|
||||
key = self._get_key("app-data.json")
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=json.dumps(data, indent=4),
|
||||
ContentType='application/json'
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Error saving data to S3: {str(e)}")
|
||||
raise e
|
||||
|
||||
def ensure_data_dir_exists(self, watch_uuid):
|
||||
"""Ensure the data directory exists for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# S3 doesn't need directories, this is a no-op
|
||||
pass
|
||||
|
||||
def _get_watch_prefix(self, watch_uuid):
|
||||
"""Get the S3 prefix for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The watch prefix
|
||||
"""
|
||||
return self._get_key(f"watches/{watch_uuid}/")
|
||||
|
||||
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
|
||||
"""Save history text to S3
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Contents to save
|
||||
timestamp (int): Timestamp
|
||||
snapshot_id (str): Snapshot ID
|
||||
|
||||
Returns:
|
||||
str: Snapshot ID
|
||||
"""
|
||||
# Determine if we should compress
|
||||
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
|
||||
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
# Save the snapshot
|
||||
if not skip_brotli and len(contents) > threshold:
|
||||
snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt.br"
|
||||
snapshot_fname = f"{snapshot_id}.txt.br"
|
||||
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=snapshot_key,
|
||||
Body=compressed_contents
|
||||
)
|
||||
else:
|
||||
snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt"
|
||||
snapshot_fname = f"{snapshot_id}.txt"
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=snapshot_key,
|
||||
Body=contents.encode('utf-8')
|
||||
)
|
||||
|
||||
# Update history index
|
||||
history_key = f"{watch_prefix}history.txt"
|
||||
|
||||
# Try to get existing history first
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
|
||||
history_content = response['Body'].read().decode('utf-8')
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
history_content = ""
|
||||
|
||||
# Append new entry
|
||||
history_content += f"{timestamp},{snapshot_fname}\n"
|
||||
|
||||
# Save updated history
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=history_key,
|
||||
Body=history_content
|
||||
)
|
||||
|
||||
return snapshot_fname
|
||||
|
||||
def get_history(self, watch_uuid):
|
||||
"""Get history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
dict: The history with timestamp keys and snapshot IDs as values
|
||||
"""
|
||||
tmp_history = {}
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
history_key = f"{watch_prefix}history.txt"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
|
||||
history_content = response['Body'].read().decode('utf-8')
|
||||
|
||||
for line in history_content.splitlines():
|
||||
if ',' in line:
|
||||
k, v = line.strip().split(',', 2)
|
||||
tmp_history[k] = f"{watch_prefix}snapshots/{v}"
|
||||
|
||||
return tmp_history
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return {}
|
||||
|
||||
def get_history_snapshot(self, watch_uuid, timestamp):
|
||||
"""Get a history snapshot from S3
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str: The snapshot content
|
||||
"""
|
||||
history = self.get_history(watch_uuid)
|
||||
if not timestamp in history:
|
||||
return None
|
||||
|
||||
key = history[timestamp]
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
content = response['Body'].read()
|
||||
|
||||
if key.endswith('.br'):
|
||||
# Decompress brotli
|
||||
return brotli.decompress(content).decode('utf-8')
|
||||
else:
|
||||
return content.decode('utf-8')
|
||||
except Exception as e:
|
||||
logger.error(f"Error reading snapshot from S3: {str(e)}")
|
||||
return None
|
||||
|
||||
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
|
||||
"""Save a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
screenshot (bytes): Screenshot data
|
||||
as_error (bool): Whether this is an error screenshot
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
if as_error:
|
||||
key = f"{watch_prefix}last-error-screenshot.png"
|
||||
else:
|
||||
key = f"{watch_prefix}last-screenshot.png"
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=screenshot,
|
||||
ContentType='image/png'
|
||||
)
|
||||
|
||||
def get_screenshot(self, watch_uuid, is_error=False):
|
||||
"""Get a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get the error screenshot
|
||||
|
||||
Returns:
|
||||
bytes or None: The screenshot data or None if not available
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
if is_error:
|
||||
key = f"{watch_prefix}last-error-screenshot.png"
|
||||
else:
|
||||
key = f"{watch_prefix}last-screenshot.png"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
return response['Body'].read()
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return None
|
||||
|
||||
def save_error_text(self, watch_uuid, contents):
|
||||
"""Save error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Error contents
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}last-error.txt"
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=contents.encode('utf-8')
|
||||
)
|
||||
|
||||
def get_error_text(self, watch_uuid):
|
||||
"""Get error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str or False: The error text or False if not available
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}last-error.txt"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
return response['Body'].read().decode('utf-8')
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return False
|
||||
|
||||
def save_xpath_data(self, watch_uuid, data, as_error=False):
|
||||
"""Save XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
data (dict): XPath data
|
||||
as_error (bool): Whether this is error data
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
if as_error:
|
||||
key = f"{watch_prefix}elements-error.deflate"
|
||||
else:
|
||||
key = f"{watch_prefix}elements.deflate"
|
||||
|
||||
compressed_data = zlib.compress(json.dumps(data).encode())
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=compressed_data
|
||||
)
|
||||
|
||||
def get_xpath_data(self, watch_uuid, is_error=False):
|
||||
"""Get XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get error data
|
||||
|
||||
Returns:
|
||||
dict or None: The XPath data or None if not available
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
if is_error:
|
||||
key = f"{watch_prefix}elements-error.deflate"
|
||||
else:
|
||||
key = f"{watch_prefix}elements.deflate"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
compressed_data = response['Body'].read()
|
||||
return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return None
|
||||
|
||||
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
|
||||
"""Save last fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
contents (str): HTML contents
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}html/{timestamp}.html.br"
|
||||
|
||||
contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
|
||||
try:
|
||||
compressed_contents = brotli.compress(contents_bytes)
|
||||
except Exception as e:
|
||||
logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
|
||||
compressed_contents = contents_bytes
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=compressed_contents
|
||||
)
|
||||
|
||||
# Prune old snapshots - keep only the newest 2
|
||||
self._prune_last_fetched_html_snapshots(watch_uuid)
|
||||
|
||||
def _prune_last_fetched_html_snapshots(self, watch_uuid):
|
||||
"""Prune old HTML snapshots
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
html_prefix = f"{watch_prefix}html/"
|
||||
|
||||
# List all HTML snapshots
|
||||
response = self.s3.list_objects_v2(
|
||||
Bucket=self.bucket_name,
|
||||
Prefix=html_prefix
|
||||
)
|
||||
|
||||
if 'Contents' not in response:
|
||||
return
|
||||
|
||||
# Sort by timestamp (extract from key)
|
||||
html_files = sorted(
|
||||
response['Contents'],
|
||||
key=lambda x: int(x['Key'].split('/')[-1].split('.')[0]),
|
||||
reverse=True
|
||||
)
|
||||
|
||||
# Delete all but the newest 2
|
||||
if len(html_files) > 2:
|
||||
for file in html_files[2:]:
|
||||
self.s3.delete_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=file['Key']
|
||||
)
|
||||
|
||||
def get_fetched_html(self, watch_uuid, timestamp):
|
||||
"""Get fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str or False: The HTML or False if not available
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}html/{timestamp}.html.br"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
compressed_data = response['Body'].read()
|
||||
return brotli.decompress(compressed_data).decode('utf-8')
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
return False
|
||||
|
||||
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
|
||||
"""Save the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Text contents
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}last-fetched.br"
|
||||
|
||||
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
|
||||
|
||||
self.s3.put_object(
|
||||
Bucket=self.bucket_name,
|
||||
Key=key,
|
||||
Body=compressed_contents
|
||||
)
|
||||
|
||||
def get_last_fetched_text_before_filters(self, watch_uuid):
|
||||
"""Get the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The text
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
key = f"{watch_prefix}last-fetched.br"
|
||||
|
||||
try:
|
||||
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
|
||||
compressed_data = response['Body'].read()
|
||||
return brotli.decompress(compressed_data).decode('utf-8')
|
||||
except self.s3.exceptions.NoSuchKey:
|
||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||
history = self.get_history(watch_uuid)
|
||||
dates = list(history.keys())
|
||||
|
||||
if len(dates):
|
||||
return self.get_history_snapshot(watch_uuid, dates[-1])
|
||||
else:
|
||||
return ''
|
||||
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
"""Check if visual selector data is ready
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
bool: Whether visual selector data is ready
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
screenshot_key = f"{watch_prefix}last-screenshot.png"
|
||||
elements_key = f"{watch_prefix}elements.deflate"
|
||||
|
||||
try:
|
||||
# Just check if both files exist
|
||||
self.s3.head_object(Bucket=self.bucket_name, Key=screenshot_key)
|
||||
self.s3.head_object(Bucket=self.bucket_name, Key=elements_key)
|
||||
return True
|
||||
except self.s3.exceptions.ClientError:
|
||||
return False
|
||||
|
||||
def clear_watch_history(self, watch_uuid):
|
||||
"""Clear history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
watch_prefix = self._get_watch_prefix(watch_uuid)
|
||||
|
||||
# List all objects with this watch's prefix
|
||||
paginator = self.s3.get_paginator('list_objects_v2')
|
||||
pages = paginator.paginate(
|
||||
Bucket=self.bucket_name,
|
||||
Prefix=watch_prefix
|
||||
)
|
||||
|
||||
# Delete all objects in batches
|
||||
for page in pages:
|
||||
if 'Contents' not in page:
|
||||
continue
|
||||
|
||||
delete_keys = {'Objects': [{'Key': obj['Key']} for obj in page['Contents']]}
|
||||
self.s3.delete_objects(
|
||||
Bucket=self.bucket_name,
|
||||
Delete=delete_keys
|
||||
)
|
||||
|
||||
def delete_watch(self, watch_uuid):
|
||||
"""Delete a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
# Same implementation as clear_watch_history for S3
|
||||
self.clear_watch_history(watch_uuid)
|
||||
230
changedetectionio/storage/storage_base.py
Normal file
230
changedetectionio/storage/storage_base.py
Normal file
@@ -0,0 +1,230 @@
|
||||
from abc import ABC, abstractmethod
|
||||
import json
|
||||
from loguru import logger
|
||||
|
||||
class StorageBase(ABC):
|
||||
"""Abstract base class for storage backends"""
|
||||
|
||||
@abstractmethod
|
||||
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
|
||||
"""Initialize the storage backend
|
||||
|
||||
Args:
|
||||
datastore_path (str): Path to the datastore
|
||||
include_default_watches (bool): Whether to include default watches
|
||||
version_tag (str): Version tag
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def load_data(self):
|
||||
"""Load data from the storage backend
|
||||
|
||||
Returns:
|
||||
dict: The loaded data
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_data(self, data):
|
||||
"""Save data to the storage backend
|
||||
|
||||
Args:
|
||||
data (dict): The data to save
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
|
||||
"""Save history text to the storage backend
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Contents to save
|
||||
timestamp (int): Timestamp
|
||||
snapshot_id (str): Snapshot ID
|
||||
|
||||
Returns:
|
||||
str: Snapshot filename or ID
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_history_snapshot(self, watch_uuid, timestamp):
|
||||
"""Get a history snapshot from the storage backend
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str: The snapshot content
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_history(self, watch_uuid):
|
||||
"""Get history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
dict: The history with timestamp keys and snapshot IDs as values
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
|
||||
"""Save a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
screenshot (bytes): Screenshot data
|
||||
as_error (bool): Whether this is an error screenshot
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_screenshot(self, watch_uuid, is_error=False):
|
||||
"""Get a screenshot for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get the error screenshot
|
||||
|
||||
Returns:
|
||||
str or None: The screenshot path or None if not available
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_error_text(self, watch_uuid, contents):
|
||||
"""Save error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Error contents
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_error_text(self, watch_uuid):
|
||||
"""Get error text for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str or False: The error text or False if not available
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_xpath_data(self, watch_uuid, data, as_error=False):
|
||||
"""Save XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
data (dict): XPath data
|
||||
as_error (bool): Whether this is error data
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_xpath_data(self, watch_uuid, is_error=False):
|
||||
"""Get XPath data for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
is_error (bool): Whether to get error data
|
||||
|
||||
Returns:
|
||||
dict or None: The XPath data or None if not available
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
|
||||
"""Save last fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
contents (str): HTML contents
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_fetched_html(self, watch_uuid, timestamp):
|
||||
"""Get fetched HTML for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
timestamp (int): Timestamp
|
||||
|
||||
Returns:
|
||||
str or False: The HTML or False if not available
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
|
||||
"""Save the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
contents (str): Text contents
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def get_last_fetched_text_before_filters(self, watch_uuid):
|
||||
"""Get the last text fetched before filters
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
str: The text
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def ensure_data_dir_exists(self, watch_uuid):
|
||||
"""Ensure the data directory exists for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
"""Check if visual selector data is ready
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
|
||||
Returns:
|
||||
bool: Whether visual selector data is ready
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def clear_watch_history(self, watch_uuid):
|
||||
"""Clear history for a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
pass
|
||||
|
||||
@abstractmethod
|
||||
def delete_watch(self, watch_uuid):
|
||||
"""Delete a watch
|
||||
|
||||
Args:
|
||||
watch_uuid (str): Watch UUID
|
||||
"""
|
||||
pass
|
||||
33
changedetectionio/storage/storage_factory.py
Normal file
33
changedetectionio/storage/storage_factory.py
Normal file
@@ -0,0 +1,33 @@
|
||||
import re
|
||||
from loguru import logger
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from .storage_base import StorageBase
|
||||
from .filesystem_storage import FileSystemStorage
|
||||
from .mongodb_storage import MongoDBStorage
|
||||
from .s3_storage import S3Storage
|
||||
|
||||
def create_storage(datastore_path, include_default_watches=True, version_tag="0.0.0"):
|
||||
"""Create a storage backend based on the datastore path
|
||||
|
||||
Args:
|
||||
datastore_path (str): Path to the datastore
|
||||
include_default_watches (bool): Whether to include default watches
|
||||
version_tag (str): Version tag
|
||||
|
||||
Returns:
|
||||
StorageBase: The storage backend
|
||||
"""
|
||||
# Check if it's a MongoDB URI
|
||||
if datastore_path.startswith('mongodb://') or datastore_path.startswith('mongodb+srv://'):
|
||||
logger.info(f"Using MongoDB storage backend with URI {datastore_path}")
|
||||
return MongoDBStorage(datastore_path, include_default_watches, version_tag)
|
||||
|
||||
# Check if it's an S3 URI
|
||||
if datastore_path.startswith('s3://'):
|
||||
logger.info(f"Using S3 storage backend with URI {datastore_path}")
|
||||
return S3Storage(datastore_path, include_default_watches, version_tag)
|
||||
|
||||
# Default to filesystem
|
||||
logger.info(f"Using filesystem storage backend with path {datastore_path}")
|
||||
return FileSystemStorage(datastore_path, include_default_watches, version_tag)
|
||||
@@ -20,6 +20,7 @@ from loguru import logger
|
||||
|
||||
from .processors import get_custom_watch_obj_for_processor
|
||||
from .processors.restock_diff import Restock
|
||||
from .storage.storage_factory import create_storage
|
||||
|
||||
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
|
||||
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
|
||||
@@ -38,20 +39,28 @@ class ChangeDetectionStore:
|
||||
needs_write_urgent = False
|
||||
|
||||
__version_check = True
|
||||
|
||||
# Singleton instance for access from Watch class methods
|
||||
instance = None
|
||||
|
||||
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
|
||||
# Should only be active for docker
|
||||
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
|
||||
self.__data = App.model()
|
||||
self.datastore_path = datastore_path
|
||||
self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
|
||||
logger.info(f"Datastore path is '{self.json_store_path}'")
|
||||
|
||||
# Create the appropriate storage backend based on the datastore path
|
||||
self.storage = create_storage(datastore_path, include_default_watches, version_tag)
|
||||
|
||||
self.needs_write = False
|
||||
self.start_time = time.time()
|
||||
self.stop_thread = False
|
||||
# Base definition for all watchers
|
||||
# deepcopy part of #569 - not sure why its needed exactly
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={}))
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
|
||||
|
||||
# Set singleton instance
|
||||
ChangeDetectionStore.instance = self
|
||||
|
||||
if path.isfile('changedetectionio/source.txt'):
|
||||
with open('changedetectionio/source.txt') as f:
|
||||
@@ -60,10 +69,9 @@ class ChangeDetectionStore:
|
||||
self.__data['build_sha'] = f.read()
|
||||
|
||||
try:
|
||||
# @todo retest with ", encoding='utf-8'"
|
||||
with open(self.json_store_path) as json_file:
|
||||
from_disk = json.load(json_file)
|
||||
|
||||
# Load data from storage
|
||||
from_disk = self.storage.load_data()
|
||||
if from_disk:
|
||||
# @todo isnt there a way todo this dict.update recursively?
|
||||
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
|
||||
if 'watching' in from_disk:
|
||||
@@ -91,22 +99,24 @@ class ChangeDetectionStore:
|
||||
for uuid, tag in self.__data['settings']['application']['tags'].items():
|
||||
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
|
||||
logger.info(f"Tag: {uuid} {tag['title']}")
|
||||
else:
|
||||
# First time ran, Create the datastore.
|
||||
if include_default_watches:
|
||||
logger.critical(f"No data store found, creating new store")
|
||||
self.add_watch(url='https://news.ycombinator.com/',
|
||||
tag='Tech news',
|
||||
extras={'fetch_backend': 'html_requests'})
|
||||
|
||||
# First time ran, Create the datastore.
|
||||
except (FileNotFoundError):
|
||||
if include_default_watches:
|
||||
logger.critical(f"No JSON DB found at {self.json_store_path}, creating JSON store at {self.datastore_path}")
|
||||
self.add_watch(url='https://news.ycombinator.com/',
|
||||
tag='Tech news',
|
||||
extras={'fetch_backend': 'html_requests'})
|
||||
self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
|
||||
tag='changedetection.io',
|
||||
extras={'fetch_backend': 'html_requests'})
|
||||
|
||||
self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
|
||||
tag='changedetection.io',
|
||||
extras={'fetch_backend': 'html_requests'})
|
||||
|
||||
updates_available = self.get_updates_available()
|
||||
self.__data['settings']['application']['schema_version'] = updates_available.pop()
|
||||
updates_available = self.get_updates_available()
|
||||
self.__data['settings']['application']['schema_version'] = updates_available.pop()
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Error loading data from storage: {str(e)}")
|
||||
raise e
|
||||
else:
|
||||
# Bump the update version by running updates
|
||||
self.run_updates()
|
||||
@@ -227,23 +237,15 @@ class ChangeDetectionStore:
|
||||
|
||||
# Delete a single watch by UUID
|
||||
def delete(self, uuid):
|
||||
import pathlib
|
||||
import shutil
|
||||
|
||||
with self.lock:
|
||||
if uuid == 'all':
|
||||
# Delete all watches
|
||||
for watch_uuid in list(self.data['watching'].keys()):
|
||||
self.storage.delete_watch(watch_uuid)
|
||||
self.__data['watching'] = {}
|
||||
|
||||
# GitHub #30 also delete history records
|
||||
for uuid in self.data['watching']:
|
||||
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
|
||||
else:
|
||||
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
|
||||
if os.path.exists(path):
|
||||
shutil.rmtree(path)
|
||||
# Delete a single watch
|
||||
self.storage.delete_watch(uuid)
|
||||
del self.data['watching'][uuid]
|
||||
|
||||
self.needs_write_urgent = True
|
||||
@@ -266,6 +268,7 @@ class ChangeDetectionStore:
|
||||
|
||||
# Remove a watchs data but keep the entry (URL etc)
|
||||
def clear_watch_history(self, uuid):
|
||||
self.storage.clear_watch_history(uuid)
|
||||
self.__data['watching'][uuid].clear_watch()
|
||||
self.needs_write_urgent = True
|
||||
|
||||
@@ -372,43 +375,30 @@ class ChangeDetectionStore:
|
||||
return new_uuid
|
||||
|
||||
def visualselector_data_is_ready(self, watch_uuid):
|
||||
output_path = "{}/{}".format(self.datastore_path, watch_uuid)
|
||||
screenshot_filename = "{}/last-screenshot.png".format(output_path)
|
||||
elements_index_filename = "{}/elements.deflate".format(output_path)
|
||||
if path.isfile(screenshot_filename) and path.isfile(elements_index_filename) :
|
||||
return True
|
||||
|
||||
return False
|
||||
return self.storage.visualselector_data_is_ready(watch_uuid)
|
||||
|
||||
def sync_to_json(self):
|
||||
logger.info("Saving JSON..")
|
||||
logger.info("Saving data to storage backend...")
|
||||
try:
|
||||
data = deepcopy(self.__data)
|
||||
except RuntimeError as e:
|
||||
# Try again in 15 seconds
|
||||
time.sleep(15)
|
||||
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
|
||||
logger.error(f"! Data changed when writing to storage, trying again.. {str(e)}")
|
||||
self.sync_to_json()
|
||||
return
|
||||
else:
|
||||
|
||||
try:
|
||||
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
|
||||
# This is a fairly basic strategy to deal with the case that the file is corrupted,
|
||||
# system was out of memory, out of RAM etc
|
||||
with open(self.json_store_path+".tmp", 'w') as json_file:
|
||||
json.dump(data, json_file, indent=4)
|
||||
os.replace(self.json_store_path+".tmp", self.json_store_path)
|
||||
self.storage.save_data(data)
|
||||
except Exception as e:
|
||||
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
|
||||
logger.error(f"Error writing to storage backend: {str(e)}")
|
||||
|
||||
self.needs_write = False
|
||||
self.needs_write_urgent = False
|
||||
|
||||
# Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON
|
||||
# Thread runner, this helps with thread/write issues when there are many operations that want to update the data
|
||||
# by just running periodically in one thread, according to python, dict updates are threadsafe.
|
||||
def save_datastore(self):
|
||||
|
||||
while True:
|
||||
if self.stop_thread:
|
||||
# Suppressing "Logging error in Loguru Handler #0" during CICD.
|
||||
|
||||
@@ -100,3 +100,6 @@ referencing==0.35.1
|
||||
|
||||
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
|
||||
tzdata
|
||||
|
||||
pymongo>=4.3.3
|
||||
boto3>=1.26.0
|
||||
|
||||
Reference in New Issue
Block a user