Compare commits

...

3 Commits

Author SHA1 Message Date
dgtlmoon
80434fa16a Update MANIFEST.in 2025-03-14 01:25:32 +01:00
dgtlmoon
db10422415 tweaked 2025-03-14 00:33:27 +01:00
dgtlmoon
380e571ded Try abstract the storage out 2025-03-14 00:31:07 +01:00
11 changed files with 1819 additions and 77 deletions

View File

@@ -5,6 +5,7 @@ recursive-include changedetectionio/content_fetchers *
recursive-include changedetectionio/model *
recursive-include changedetectionio/processors *
recursive-include changedetectionio/static *
recursive-include changedetectionio/storage *
recursive-include changedetectionio/templates *
recursive-include changedetectionio/tests *
prune changedetectionio/static/package-lock.json

View File

@@ -299,34 +299,17 @@ class model(watch_base):
# Save some text file to the appropriate path and bump the history
# result_obj from fetch_site_status.run()
def save_history_text(self, contents, timestamp, snapshot_id):
import brotli
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.storage.filesystem_storage import FileSystemStorage
logger.trace(f"{self.get('uuid')} - Updating history.txt with timestamp {timestamp}")
self.ensure_data_dir_exists()
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
if not skip_brotli and len(contents) > threshold:
snapshot_fname = f"{snapshot_id}.txt.br"
dest = os.path.join(self.watch_data_dir, snapshot_fname)
if not os.path.exists(dest):
with open(dest, 'wb') as f:
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
else:
snapshot_fname = f"{snapshot_id}.txt"
dest = os.path.join(self.watch_data_dir, snapshot_fname)
if not os.path.exists(dest):
with open(dest, 'wb') as f:
f.write(contents.encode('utf-8'))
# Append to index
# @todo check last char was \n
index_fname = os.path.join(self.watch_data_dir, "history.txt")
with open(index_fname, 'a') as f:
f.write("{},{}\n".format(timestamp, snapshot_fname))
f.close()
# Get storage from singleton store or create a filesystem storage as default
store = ChangeDetectionStore.instance if hasattr(ChangeDetectionStore, 'instance') else None
storage = store.storage if store and hasattr(store, 'storage') else FileSystemStorage(self.__datastore_path)
# Use the storage backend to save the history text
snapshot_fname = storage.save_history_text(self.get('uuid'), contents, timestamp, snapshot_id)
self.__newest_history_key = timestamp
self.__history_n += 1

View File

@@ -0,0 +1,61 @@
# Storage Backends for changedetection.io
This module provides different storage backends for changedetection.io, allowing you to store data in various systems:
- **FileSystemStorage**: The default storage backend that stores data on the local filesystem.
- **MongoDBStorage**: Stores data in a MongoDB database.
- **S3Storage**: Stores data in an Amazon S3 bucket.
## Usage
The storage backend is automatically selected based on the datastore path provided when initializing the application:
- For filesystem storage (default): `/datastore`
- For MongoDB storage: `mongodb://username:password@host:port/database`
- For S3 storage: `s3://bucket-name/optional-prefix`
## Configuration
### Filesystem Storage
The default storage backend. Simply specify a directory path:
```
changedetection.io -d /path/to/datastore
```
### MongoDB Storage
To use MongoDB storage, specify a MongoDB connection URI:
```
changedetection.io -d mongodb://username:password@host:port/database
```
Make sure to install the required dependencies:
```
pip install -r requirements-storage.txt
```
### Amazon S3 Storage
To use S3 storage, specify an S3 URI:
```
changedetection.io -d s3://bucket-name/optional-prefix
```
Make sure to:
1. Install the required dependencies: `pip install -r requirements-storage.txt`
2. Configure AWS credentials using environment variables or IAM roles:
- Set `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables
- Or use an IAM role when running on AWS EC2/ECS/EKS
## Custom Storage Backends
You can create custom storage backends by:
1. Subclassing the `StorageBase` abstract class in `storage_base.py`
2. Implementing all required methods
3. Adding your backend to the `storage_factory.py` file

View File

@@ -0,0 +1 @@
# This module contains storage backend implementations

View File

@@ -0,0 +1,449 @@
import os
import shutil
import json
import brotli
import zlib
import pathlib
from loguru import logger
from os import path
from .storage_base import StorageBase
class FileSystemStorage(StorageBase):
"""File system storage backend"""
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
"""Initialize the file system storage backend
Args:
datastore_path (str): Path to the datastore
include_default_watches (bool): Whether to include default watches
version_tag (str): Version tag
"""
self.datastore_path = datastore_path
self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
logger.info(f"Datastore path is '{self.json_store_path}'")
def load_data(self):
"""Load data from the file system
Returns:
dict: The loaded data
"""
if not path.isfile(self.json_store_path):
return None
with open(self.json_store_path) as json_file:
return json.load(json_file)
def save_data(self, data):
"""Save data to the file system
Args:
data (dict): The data to save
"""
try:
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
# This is a fairly basic strategy to deal with the case that the file is corrupted,
# system was out of memory, out of RAM etc
with open(self.json_store_path+".tmp", 'w') as json_file:
json.dump(data, json_file, indent=4)
os.replace(self.json_store_path+".tmp", self.json_store_path)
except Exception as e:
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
raise e
def get_watch_dir(self, watch_uuid):
"""Get the directory for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str: The watch directory
"""
return os.path.join(self.datastore_path, watch_uuid)
def ensure_data_dir_exists(self, watch_uuid):
"""Ensure the data directory exists for a watch
Args:
watch_uuid (str): Watch UUID
"""
watch_dir = self.get_watch_dir(watch_uuid)
if not os.path.isdir(watch_dir):
logger.debug(f"> Creating data dir {watch_dir}")
os.makedirs(watch_dir, exist_ok=True)
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
"""Save history text to the file system
Args:
watch_uuid (str): Watch UUID
contents (str): Contents to save
timestamp (int): Timestamp
snapshot_id (str): Snapshot ID
Returns:
str: Snapshot filename
"""
self.ensure_data_dir_exists(watch_uuid)
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
watch_dir = self.get_watch_dir(watch_uuid)
if not skip_brotli and len(contents) > threshold:
snapshot_fname = f"{snapshot_id}.txt.br"
dest = os.path.join(watch_dir, snapshot_fname)
if not os.path.exists(dest):
with open(dest, 'wb') as f:
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
else:
snapshot_fname = f"{snapshot_id}.txt"
dest = os.path.join(watch_dir, snapshot_fname)
if not os.path.exists(dest):
with open(dest, 'wb') as f:
f.write(contents.encode('utf-8'))
# Append to index
index_fname = os.path.join(watch_dir, "history.txt")
with open(index_fname, 'a') as f:
f.write("{},{}\n".format(timestamp, snapshot_fname))
return snapshot_fname
def get_history(self, watch_uuid):
"""Get history for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
dict: The history with timestamp keys and snapshot IDs as values
"""
tmp_history = {}
watch_dir = self.get_watch_dir(watch_uuid)
if not os.path.isdir(watch_dir):
return tmp_history
# Read the history file as a dict
fname = os.path.join(watch_dir, "history.txt")
if os.path.isfile(fname):
logger.debug(f"Reading watch history index for {watch_uuid}")
with open(fname, "r") as f:
for i in f.readlines():
if ',' in i:
k, v = i.strip().split(',', 2)
# The index history could contain a relative path, so we need to make the fullpath
# so that python can read it
if not '/' in v and not '\'' in v:
v = os.path.join(watch_dir, v)
else:
# It's possible that they moved the datadir on older versions
# So the snapshot exists but is in a different path
snapshot_fname = v.split('/')[-1]
proposed_new_path = os.path.join(watch_dir, snapshot_fname)
if not os.path.exists(v) and os.path.exists(proposed_new_path):
v = proposed_new_path
tmp_history[k] = v
return tmp_history
def get_history_snapshot(self, watch_uuid, timestamp):
"""Get a history snapshot from the file system
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str: The snapshot content
"""
history = self.get_history(watch_uuid)
if not timestamp in history:
return None
filepath = history[timestamp]
# See if a brotli versions exists and switch to that
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
filepath = f"{filepath}.br"
# OR in the backup case that the .br does not exist, but the plain one does
if filepath.endswith('.br') and not os.path.isfile(filepath):
if os.path.isfile(filepath.replace('.br', '')):
filepath = filepath.replace('.br', '')
if filepath.endswith('.br'):
# Brotli doesnt have a fileheader to detect it, so we rely on filename
# https://www.rfc-editor.org/rfc/rfc7932
with open(filepath, 'rb') as f:
return(brotli.decompress(f.read()).decode('utf-8'))
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
return f.read()
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
"""Save a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
screenshot (bytes): Screenshot data
as_error (bool): Whether this is an error screenshot
"""
self.ensure_data_dir_exists(watch_uuid)
watch_dir = self.get_watch_dir(watch_uuid)
if as_error:
target_path = os.path.join(watch_dir, "last-error-screenshot.png")
else:
target_path = os.path.join(watch_dir, "last-screenshot.png")
with open(target_path, 'wb') as f:
f.write(screenshot)
def get_screenshot(self, watch_uuid, is_error=False):
"""Get a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get the error screenshot
Returns:
str or None: The screenshot path or None if not available
"""
watch_dir = self.get_watch_dir(watch_uuid)
if is_error:
fname = os.path.join(watch_dir, "last-error-screenshot.png")
else:
fname = os.path.join(watch_dir, "last-screenshot.png")
if os.path.isfile(fname):
return fname
return None
def save_error_text(self, watch_uuid, contents):
"""Save error text for a watch
Args:
watch_uuid (str): Watch UUID
contents (str): Error contents
"""
self.ensure_data_dir_exists(watch_uuid)
watch_dir = self.get_watch_dir(watch_uuid)
target_path = os.path.join(watch_dir, "last-error.txt")
with open(target_path, 'w', encoding='utf-8') as f:
f.write(contents)
def get_error_text(self, watch_uuid):
"""Get error text for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str or False: The error text or False if not available
"""
watch_dir = self.get_watch_dir(watch_uuid)
fname = os.path.join(watch_dir, "last-error.txt")
if os.path.isfile(fname):
with open(fname, 'r') as f:
return f.read()
return False
def save_xpath_data(self, watch_uuid, data, as_error=False):
"""Save XPath data for a watch
Args:
watch_uuid (str): Watch UUID
data (dict): XPath data
as_error (bool): Whether this is error data
"""
self.ensure_data_dir_exists(watch_uuid)
watch_dir = self.get_watch_dir(watch_uuid)
if as_error:
target_path = os.path.join(watch_dir, "elements-error.deflate")
else:
target_path = os.path.join(watch_dir, "elements.deflate")
with open(target_path, 'wb') as f:
f.write(zlib.compress(json.dumps(data).encode()))
def get_xpath_data(self, watch_uuid, is_error=False):
"""Get XPath data for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get error data
Returns:
dict or None: The XPath data or None if not available
"""
watch_dir = self.get_watch_dir(watch_uuid)
if is_error:
path = os.path.join(watch_dir, "elements-error.deflate")
else:
path = os.path.join(watch_dir, "elements.deflate")
if not os.path.isfile(path):
return None
with open(path, 'rb') as f:
return json.loads(zlib.decompress(f.read()).decode('utf-8'))
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
"""Save last fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
contents (str): HTML contents
"""
self.ensure_data_dir_exists(watch_uuid)
watch_dir = self.get_watch_dir(watch_uuid)
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(watch_dir, snapshot_fname)
with open(filepath, 'wb') as f:
contents = contents.encode('utf-8') if isinstance(contents, str) else contents
try:
f.write(brotli.compress(contents))
except Exception as e:
logger.warning(f"{watch_uuid} - Unable to compress snapshot, saving as raw data to {filepath}")
logger.warning(e)
f.write(contents)
# Prune old snapshots - keep only the newest 2
self._prune_last_fetched_html_snapshots(watch_uuid)
def _prune_last_fetched_html_snapshots(self, watch_uuid):
"""Prune old HTML snapshots
Args:
watch_uuid (str): Watch UUID
"""
watch_dir = self.get_watch_dir(watch_uuid)
history = self.get_history(watch_uuid)
dates = list(history.keys())
dates.reverse()
for index, timestamp in enumerate(dates):
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(watch_dir, snapshot_fname)
# Keep only the first 2
if index > 1 and os.path.isfile(filepath):
os.remove(filepath)
def get_fetched_html(self, watch_uuid, timestamp):
"""Get fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str or False: The HTML or False if not available
"""
watch_dir = self.get_watch_dir(watch_uuid)
snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(watch_dir, snapshot_fname)
if os.path.isfile(filepath):
with open(filepath, 'rb') as f:
return brotli.decompress(f.read()).decode('utf-8')
return False
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
"""Save the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
contents (str): Text contents
"""
self.ensure_data_dir_exists(watch_uuid)
watch_dir = self.get_watch_dir(watch_uuid)
filepath = os.path.join(watch_dir, 'last-fetched.br')
with open(filepath, 'wb') as f:
f.write(brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT))
def get_last_fetched_text_before_filters(self, watch_uuid):
"""Get the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
Returns:
str: The text
"""
watch_dir = self.get_watch_dir(watch_uuid)
filepath = os.path.join(watch_dir, 'last-fetched.br')
if not os.path.isfile(filepath):
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
history = self.get_history(watch_uuid)
dates = list(history.keys())
if len(dates):
return self.get_history_snapshot(watch_uuid, dates[-1])
else:
return ''
with open(filepath, 'rb') as f:
return brotli.decompress(f.read()).decode('utf-8')
def visualselector_data_is_ready(self, watch_uuid):
"""Check if visual selector data is ready
Args:
watch_uuid (str): Watch UUID
Returns:
bool: Whether visual selector data is ready
"""
watch_dir = self.get_watch_dir(watch_uuid)
screenshot_filename = os.path.join(watch_dir, "last-screenshot.png")
elements_index_filename = os.path.join(watch_dir, "elements.deflate")
return path.isfile(screenshot_filename) and path.isfile(elements_index_filename)
def clear_watch_history(self, watch_uuid):
"""Clear history for a watch
Args:
watch_uuid (str): Watch UUID
"""
watch_dir = self.get_watch_dir(watch_uuid)
if not os.path.exists(watch_dir):
return
# Delete all files but keep the directory
for item in pathlib.Path(watch_dir).glob("*.*"):
os.unlink(item)
def delete_watch(self, watch_uuid):
"""Delete a watch
Args:
watch_uuid (str): Watch UUID
"""
watch_dir = self.get_watch_dir(watch_uuid)
if os.path.exists(watch_dir):
shutil.rmtree(watch_dir)

View File

@@ -0,0 +1,466 @@
import os
from copy import deepcopy
import brotli
import zlib
import json
import time
from loguru import logger
from pymongo import MongoClient
from urllib.parse import urlparse
import base64
from .storage_base import StorageBase
class MongoDBStorage(StorageBase):
"""MongoDB storage backend"""
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
"""Initialize the MongoDB storage backend
Args:
datastore_path (str): MongoDB connection URI
include_default_watches (bool): Whether to include default watches
version_tag (str): Version tag
"""
# Parse MongoDB URI from datastore_path
parsed_uri = urlparse(datastore_path)
self.db_name = parsed_uri.path.lstrip('/')
if not self.db_name:
self.db_name = 'changedetection'
# Connect to MongoDB
self.client = MongoClient(datastore_path)
self.db = self.client[self.db_name]
# Collections
self.app_collection = self.db['app']
self.watches_collection = self.db['watches']
self.snapshots_collection = self.db['snapshots']
self.history_collection = self.db['history']
self.error_collection = self.db['errors']
self.xpath_collection = self.db['xpath']
self.html_collection = self.db['html']
logger.info(f"MongoDB storage initialized, connected to {datastore_path}")
def load_data(self):
"""Load data from MongoDB
Returns:
dict: The loaded data
"""
app_data = self.app_collection.find_one({'_id': 'app_data'})
if not app_data:
return None
# Remove MongoDB _id field
if '_id' in app_data:
del app_data['_id']
return app_data
def save_data(self, data):
"""Save data to MongoDB
Args:
data (dict): The data to save
"""
try:
# Create a copy to modify
data_copy = deepcopy(data)
# Set _id for app data
data_copy['_id'] = 'app_data'
# Insert or update app data
self.app_collection.replace_one({'_id': 'app_data'}, data_copy, upsert=True)
# Also store watches separately for more granular access
# This provides a safety net in case of corrupted app_data
watches = data.get('watching', {})
for uuid, watch in watches.items():
if isinstance(watch, dict): # Handle case where watch is a Watch object
watch_copy = deepcopy(dict(watch))
else:
watch_copy = deepcopy(watch)
watch_copy['_id'] = uuid
self.watches_collection.replace_one({'_id': uuid}, watch_copy, upsert=True)
# Also store tags separately
if 'settings' in data and 'application' in data['settings'] and 'tags' in data['settings']['application']:
tags = data['settings']['application']['tags']
for uuid, tag in tags.items():
if isinstance(tag, dict): # Handle case where tag is a Tag object
tag_copy = deepcopy(dict(tag))
else:
tag_copy = deepcopy(tag)
tag_copy['_id'] = uuid
self.db['tags'].replace_one({'_id': uuid}, tag_copy, upsert=True)
except Exception as e:
logger.error(f"Error writing to MongoDB: {str(e)}")
raise e
def ensure_data_dir_exists(self, watch_uuid):
"""Ensure the data directory exists for a watch
Args:
watch_uuid (str): Watch UUID
"""
# MongoDB doesn't need directories, this is a no-op
pass
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
"""Save history text to MongoDB
Args:
watch_uuid (str): Watch UUID
contents (str): Contents to save
timestamp (int): Timestamp
snapshot_id (str): Snapshot ID
Returns:
str: Snapshot ID
"""
# Compress the contents
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
# Store the snapshot
snapshot_data = {
'_id': f"{watch_uuid}:{timestamp}",
'watch_uuid': watch_uuid,
'timestamp': timestamp,
'snapshot_id': snapshot_id,
'contents': base64.b64encode(compressed_contents).decode('ascii'),
'compressed': True
}
self.snapshots_collection.replace_one({'_id': snapshot_data['_id']}, snapshot_data, upsert=True)
# Update history index
history_entry = {
'watch_uuid': watch_uuid,
'timestamp': timestamp,
'snapshot_id': snapshot_id
}
self.history_collection.replace_one(
{'watch_uuid': watch_uuid, 'timestamp': timestamp},
history_entry,
upsert=True
)
return snapshot_id
def get_history(self, watch_uuid):
"""Get history for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
dict: The history with timestamp keys and snapshot IDs as values
"""
history = {}
# Query history entries for this watch
entries = self.history_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', 1)
for entry in entries:
history[str(entry['timestamp'])] = entry['snapshot_id']
return history
def get_history_snapshot(self, watch_uuid, timestamp):
"""Get a history snapshot from MongoDB
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str: The snapshot content
"""
# Query for the snapshot
snapshot = self.snapshots_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
if not snapshot:
return None
if snapshot.get('compressed', False):
# Decompress the contents
compressed_data = base64.b64decode(snapshot['contents'])
return brotli.decompress(compressed_data).decode('utf-8')
else:
return snapshot['contents']
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
"""Save a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
screenshot (bytes): Screenshot data
as_error (bool): Whether this is an error screenshot
"""
collection_name = 'error_screenshots' if as_error else 'screenshots'
collection = self.db[collection_name]
# Encode the screenshot as base64
encoded_screenshot = base64.b64encode(screenshot).decode('ascii')
screenshot_data = {
'_id': watch_uuid,
'watch_uuid': watch_uuid,
'screenshot': encoded_screenshot,
'timestamp': int(time.time())
}
collection.replace_one({'_id': watch_uuid}, screenshot_data, upsert=True)
def get_screenshot(self, watch_uuid, is_error=False):
"""Get a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get the error screenshot
Returns:
bytes or None: The screenshot data or None if not available
"""
collection_name = 'error_screenshots' if is_error else 'screenshots'
collection = self.db[collection_name]
screenshot_data = collection.find_one({'_id': watch_uuid})
if not screenshot_data:
return None
# Decode the screenshot from base64
return base64.b64decode(screenshot_data['screenshot'])
def save_error_text(self, watch_uuid, contents):
"""Save error text for a watch
Args:
watch_uuid (str): Watch UUID
contents (str): Error contents
"""
error_data = {
'_id': watch_uuid,
'watch_uuid': watch_uuid,
'error_text': contents,
'timestamp': int(time.time())
}
self.error_collection.replace_one({'_id': watch_uuid}, error_data, upsert=True)
def get_error_text(self, watch_uuid):
"""Get error text for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str or False: The error text or False if not available
"""
error_data = self.error_collection.find_one({'_id': watch_uuid})
if not error_data:
return False
return error_data['error_text']
def save_xpath_data(self, watch_uuid, data, as_error=False):
"""Save XPath data for a watch
Args:
watch_uuid (str): Watch UUID
data (dict): XPath data
as_error (bool): Whether this is error data
"""
# Compress the data
compressed_data = zlib.compress(json.dumps(data).encode())
_id = f"{watch_uuid}:error" if as_error else watch_uuid
xpath_data = {
'_id': _id,
'watch_uuid': watch_uuid,
'is_error': as_error,
'data': base64.b64encode(compressed_data).decode('ascii'),
'timestamp': int(time.time())
}
self.xpath_collection.replace_one({'_id': _id}, xpath_data, upsert=True)
def get_xpath_data(self, watch_uuid, is_error=False):
"""Get XPath data for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get error data
Returns:
dict or None: The XPath data or None if not available
"""
_id = f"{watch_uuid}:error" if is_error else watch_uuid
xpath_data = self.xpath_collection.find_one({'_id': _id})
if not xpath_data:
return None
# Decompress the data
compressed_data = base64.b64decode(xpath_data['data'])
return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
"""Save last fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
contents (str): HTML contents
"""
# Compress the contents
contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
try:
compressed_contents = brotli.compress(contents_bytes)
except Exception as e:
logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
compressed_contents = contents_bytes
html_data = {
'_id': f"{watch_uuid}:{timestamp}",
'watch_uuid': watch_uuid,
'timestamp': timestamp,
'html': base64.b64encode(compressed_contents).decode('ascii'),
'compressed': True
}
self.html_collection.replace_one({'_id': html_data['_id']}, html_data, upsert=True)
# Prune old snapshots - keep only the newest 2
self._prune_last_fetched_html_snapshots(watch_uuid)
def _prune_last_fetched_html_snapshots(self, watch_uuid):
"""Prune old HTML snapshots
Args:
watch_uuid (str): Watch UUID
"""
# Get all HTML snapshots for this watch, sorted by timestamp descending
html_snapshots = list(
self.html_collection.find({'watch_uuid': watch_uuid}).sort('timestamp', -1)
)
# Keep only the first 2
if len(html_snapshots) > 2:
for snapshot in html_snapshots[2:]:
self.html_collection.delete_one({'_id': snapshot['_id']})
def get_fetched_html(self, watch_uuid, timestamp):
"""Get fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str or False: The HTML or False if not available
"""
html_data = self.html_collection.find_one({'_id': f"{watch_uuid}:{timestamp}"})
if not html_data:
return False
if html_data.get('compressed', False):
# Decompress the contents
compressed_data = base64.b64decode(html_data['html'])
return brotli.decompress(compressed_data).decode('utf-8')
else:
return html_data['html']
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
"""Save the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
contents (str): Text contents
"""
# Compress the contents
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
last_fetched_data = {
'_id': watch_uuid,
'watch_uuid': watch_uuid,
'contents': base64.b64encode(compressed_contents).decode('ascii'),
'timestamp': int(time.time())
}
self.db['last_fetched'].replace_one({'_id': watch_uuid}, last_fetched_data, upsert=True)
def get_last_fetched_text_before_filters(self, watch_uuid):
"""Get the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
Returns:
str: The text
"""
last_fetched_data = self.db['last_fetched'].find_one({'_id': watch_uuid})
if not last_fetched_data:
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
history = self.get_history(watch_uuid)
dates = list(history.keys())
if len(dates):
return self.get_history_snapshot(watch_uuid, dates[-1])
else:
return ''
# Decompress the contents
compressed_data = base64.b64decode(last_fetched_data['contents'])
return brotli.decompress(compressed_data).decode('utf-8')
def visualselector_data_is_ready(self, watch_uuid):
"""Check if visual selector data is ready
Args:
watch_uuid (str): Watch UUID
Returns:
bool: Whether visual selector data is ready
"""
# Check if screenshot and xpath data exist
screenshot = self.db['screenshots'].find_one({'_id': watch_uuid})
xpath_data = self.xpath_collection.find_one({'_id': watch_uuid})
return screenshot is not None and xpath_data is not None
def clear_watch_history(self, watch_uuid):
"""Clear history for a watch
Args:
watch_uuid (str): Watch UUID
"""
# Delete all snapshots and history for this watch
self.snapshots_collection.delete_many({'watch_uuid': watch_uuid})
self.history_collection.delete_many({'watch_uuid': watch_uuid})
self.html_collection.delete_many({'watch_uuid': watch_uuid})
self.db['last_fetched'].delete_many({'watch_uuid': watch_uuid})
self.xpath_collection.delete_many({'watch_uuid': watch_uuid})
self.db['screenshots'].delete_many({'watch_uuid': watch_uuid})
self.error_collection.delete_many({'watch_uuid': watch_uuid})
def delete_watch(self, watch_uuid):
"""Delete a watch
Args:
watch_uuid (str): Watch UUID
"""
# Clear all history data
self.clear_watch_history(watch_uuid)
# Also delete error screenshots
self.db['error_screenshots'].delete_many({'watch_uuid': watch_uuid})

View File

@@ -0,0 +1,525 @@
import os
import io
import json
import brotli
import zlib
import time
from loguru import logger
import boto3
from urllib.parse import urlparse
import base64
from .storage_base import StorageBase
class S3Storage(StorageBase):
"""Amazon S3 storage backend"""
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
"""Initialize the S3 storage backend
Args:
datastore_path (str): S3 URI (s3://bucket-name/optional-prefix)
include_default_watches (bool): Whether to include default watches
version_tag (str): Version tag
"""
# Parse S3 URI
parsed_uri = urlparse(datastore_path)
self.bucket_name = parsed_uri.netloc
self.prefix = parsed_uri.path.lstrip('/')
if self.prefix and not self.prefix.endswith('/'):
self.prefix += '/'
# Initialize S3 client
# Uses AWS credentials from environment variables or IAM role
self.s3 = boto3.client('s3')
logger.info(f"S3 storage initialized, using bucket '{self.bucket_name}' with prefix '{self.prefix}'")
def _get_key(self, path):
"""Get the S3 key for a path
Args:
path (str): Path relative to the prefix
Returns:
str: The full S3 key
"""
return f"{self.prefix}{path}"
def load_data(self):
"""Load data from S3
Returns:
dict: The loaded data
"""
key = self._get_key("app-data.json")
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
return json.loads(response['Body'].read().decode('utf-8'))
except self.s3.exceptions.NoSuchKey:
return None
except Exception as e:
logger.error(f"Error loading data from S3: {str(e)}")
raise e
def save_data(self, data):
"""Save data to S3
Args:
data (dict): The data to save
"""
try:
key = self._get_key("app-data.json")
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=json.dumps(data, indent=4),
ContentType='application/json'
)
except Exception as e:
logger.error(f"Error saving data to S3: {str(e)}")
raise e
def ensure_data_dir_exists(self, watch_uuid):
"""Ensure the data directory exists for a watch
Args:
watch_uuid (str): Watch UUID
"""
# S3 doesn't need directories, this is a no-op
pass
def _get_watch_prefix(self, watch_uuid):
"""Get the S3 prefix for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str: The watch prefix
"""
return self._get_key(f"watches/{watch_uuid}/")
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
"""Save history text to S3
Args:
watch_uuid (str): Watch UUID
contents (str): Contents to save
timestamp (int): Timestamp
snapshot_id (str): Snapshot ID
Returns:
str: Snapshot ID
"""
# Determine if we should compress
threshold = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
skip_brotli = os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False').lower() in ('true', '1', 't')
watch_prefix = self._get_watch_prefix(watch_uuid)
# Save the snapshot
if not skip_brotli and len(contents) > threshold:
snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt.br"
snapshot_fname = f"{snapshot_id}.txt.br"
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
self.s3.put_object(
Bucket=self.bucket_name,
Key=snapshot_key,
Body=compressed_contents
)
else:
snapshot_key = f"{watch_prefix}snapshots/{snapshot_id}.txt"
snapshot_fname = f"{snapshot_id}.txt"
self.s3.put_object(
Bucket=self.bucket_name,
Key=snapshot_key,
Body=contents.encode('utf-8')
)
# Update history index
history_key = f"{watch_prefix}history.txt"
# Try to get existing history first
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
history_content = response['Body'].read().decode('utf-8')
except self.s3.exceptions.NoSuchKey:
history_content = ""
# Append new entry
history_content += f"{timestamp},{snapshot_fname}\n"
# Save updated history
self.s3.put_object(
Bucket=self.bucket_name,
Key=history_key,
Body=history_content
)
return snapshot_fname
def get_history(self, watch_uuid):
"""Get history for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
dict: The history with timestamp keys and snapshot IDs as values
"""
tmp_history = {}
watch_prefix = self._get_watch_prefix(watch_uuid)
history_key = f"{watch_prefix}history.txt"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=history_key)
history_content = response['Body'].read().decode('utf-8')
for line in history_content.splitlines():
if ',' in line:
k, v = line.strip().split(',', 2)
tmp_history[k] = f"{watch_prefix}snapshots/{v}"
return tmp_history
except self.s3.exceptions.NoSuchKey:
return {}
def get_history_snapshot(self, watch_uuid, timestamp):
"""Get a history snapshot from S3
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str: The snapshot content
"""
history = self.get_history(watch_uuid)
if not timestamp in history:
return None
key = history[timestamp]
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
content = response['Body'].read()
if key.endswith('.br'):
# Decompress brotli
return brotli.decompress(content).decode('utf-8')
else:
return content.decode('utf-8')
except Exception as e:
logger.error(f"Error reading snapshot from S3: {str(e)}")
return None
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
"""Save a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
screenshot (bytes): Screenshot data
as_error (bool): Whether this is an error screenshot
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
if as_error:
key = f"{watch_prefix}last-error-screenshot.png"
else:
key = f"{watch_prefix}last-screenshot.png"
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=screenshot,
ContentType='image/png'
)
def get_screenshot(self, watch_uuid, is_error=False):
"""Get a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get the error screenshot
Returns:
bytes or None: The screenshot data or None if not available
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
if is_error:
key = f"{watch_prefix}last-error-screenshot.png"
else:
key = f"{watch_prefix}last-screenshot.png"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
return response['Body'].read()
except self.s3.exceptions.NoSuchKey:
return None
def save_error_text(self, watch_uuid, contents):
"""Save error text for a watch
Args:
watch_uuid (str): Watch UUID
contents (str): Error contents
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}last-error.txt"
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=contents.encode('utf-8')
)
def get_error_text(self, watch_uuid):
"""Get error text for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str or False: The error text or False if not available
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}last-error.txt"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
return response['Body'].read().decode('utf-8')
except self.s3.exceptions.NoSuchKey:
return False
def save_xpath_data(self, watch_uuid, data, as_error=False):
"""Save XPath data for a watch
Args:
watch_uuid (str): Watch UUID
data (dict): XPath data
as_error (bool): Whether this is error data
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
if as_error:
key = f"{watch_prefix}elements-error.deflate"
else:
key = f"{watch_prefix}elements.deflate"
compressed_data = zlib.compress(json.dumps(data).encode())
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=compressed_data
)
def get_xpath_data(self, watch_uuid, is_error=False):
"""Get XPath data for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get error data
Returns:
dict or None: The XPath data or None if not available
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
if is_error:
key = f"{watch_prefix}elements-error.deflate"
else:
key = f"{watch_prefix}elements.deflate"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
compressed_data = response['Body'].read()
return json.loads(zlib.decompress(compressed_data).decode('utf-8'))
except self.s3.exceptions.NoSuchKey:
return None
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
"""Save last fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
contents (str): HTML contents
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}html/{timestamp}.html.br"
contents_bytes = contents.encode('utf-8') if isinstance(contents, str) else contents
try:
compressed_contents = brotli.compress(contents_bytes)
except Exception as e:
logger.warning(f"{watch_uuid} - Unable to compress HTML snapshot: {str(e)}")
compressed_contents = contents_bytes
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=compressed_contents
)
# Prune old snapshots - keep only the newest 2
self._prune_last_fetched_html_snapshots(watch_uuid)
def _prune_last_fetched_html_snapshots(self, watch_uuid):
"""Prune old HTML snapshots
Args:
watch_uuid (str): Watch UUID
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
html_prefix = f"{watch_prefix}html/"
# List all HTML snapshots
response = self.s3.list_objects_v2(
Bucket=self.bucket_name,
Prefix=html_prefix
)
if 'Contents' not in response:
return
# Sort by timestamp (extract from key)
html_files = sorted(
response['Contents'],
key=lambda x: int(x['Key'].split('/')[-1].split('.')[0]),
reverse=True
)
# Delete all but the newest 2
if len(html_files) > 2:
for file in html_files[2:]:
self.s3.delete_object(
Bucket=self.bucket_name,
Key=file['Key']
)
def get_fetched_html(self, watch_uuid, timestamp):
"""Get fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str or False: The HTML or False if not available
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}html/{timestamp}.html.br"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
compressed_data = response['Body'].read()
return brotli.decompress(compressed_data).decode('utf-8')
except self.s3.exceptions.NoSuchKey:
return False
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
"""Save the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
contents (str): Text contents
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}last-fetched.br"
compressed_contents = brotli.compress(contents.encode('utf-8'), mode=brotli.MODE_TEXT)
self.s3.put_object(
Bucket=self.bucket_name,
Key=key,
Body=compressed_contents
)
def get_last_fetched_text_before_filters(self, watch_uuid):
"""Get the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
Returns:
str: The text
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
key = f"{watch_prefix}last-fetched.br"
try:
response = self.s3.get_object(Bucket=self.bucket_name, Key=key)
compressed_data = response['Body'].read()
return brotli.decompress(compressed_data).decode('utf-8')
except self.s3.exceptions.NoSuchKey:
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
history = self.get_history(watch_uuid)
dates = list(history.keys())
if len(dates):
return self.get_history_snapshot(watch_uuid, dates[-1])
else:
return ''
def visualselector_data_is_ready(self, watch_uuid):
"""Check if visual selector data is ready
Args:
watch_uuid (str): Watch UUID
Returns:
bool: Whether visual selector data is ready
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
screenshot_key = f"{watch_prefix}last-screenshot.png"
elements_key = f"{watch_prefix}elements.deflate"
try:
# Just check if both files exist
self.s3.head_object(Bucket=self.bucket_name, Key=screenshot_key)
self.s3.head_object(Bucket=self.bucket_name, Key=elements_key)
return True
except self.s3.exceptions.ClientError:
return False
def clear_watch_history(self, watch_uuid):
"""Clear history for a watch
Args:
watch_uuid (str): Watch UUID
"""
watch_prefix = self._get_watch_prefix(watch_uuid)
# List all objects with this watch's prefix
paginator = self.s3.get_paginator('list_objects_v2')
pages = paginator.paginate(
Bucket=self.bucket_name,
Prefix=watch_prefix
)
# Delete all objects in batches
for page in pages:
if 'Contents' not in page:
continue
delete_keys = {'Objects': [{'Key': obj['Key']} for obj in page['Contents']]}
self.s3.delete_objects(
Bucket=self.bucket_name,
Delete=delete_keys
)
def delete_watch(self, watch_uuid):
"""Delete a watch
Args:
watch_uuid (str): Watch UUID
"""
# Same implementation as clear_watch_history for S3
self.clear_watch_history(watch_uuid)

View File

@@ -0,0 +1,230 @@
from abc import ABC, abstractmethod
import json
from loguru import logger
class StorageBase(ABC):
"""Abstract base class for storage backends"""
@abstractmethod
def __init__(self, datastore_path, include_default_watches=True, version_tag="0.0.0"):
"""Initialize the storage backend
Args:
datastore_path (str): Path to the datastore
include_default_watches (bool): Whether to include default watches
version_tag (str): Version tag
"""
pass
@abstractmethod
def load_data(self):
"""Load data from the storage backend
Returns:
dict: The loaded data
"""
pass
@abstractmethod
def save_data(self, data):
"""Save data to the storage backend
Args:
data (dict): The data to save
"""
pass
@abstractmethod
def save_history_text(self, watch_uuid, contents, timestamp, snapshot_id):
"""Save history text to the storage backend
Args:
watch_uuid (str): Watch UUID
contents (str): Contents to save
timestamp (int): Timestamp
snapshot_id (str): Snapshot ID
Returns:
str: Snapshot filename or ID
"""
pass
@abstractmethod
def get_history_snapshot(self, watch_uuid, timestamp):
"""Get a history snapshot from the storage backend
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str: The snapshot content
"""
pass
@abstractmethod
def get_history(self, watch_uuid):
"""Get history for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
dict: The history with timestamp keys and snapshot IDs as values
"""
pass
@abstractmethod
def save_screenshot(self, watch_uuid, screenshot, as_error=False):
"""Save a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
screenshot (bytes): Screenshot data
as_error (bool): Whether this is an error screenshot
"""
pass
@abstractmethod
def get_screenshot(self, watch_uuid, is_error=False):
"""Get a screenshot for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get the error screenshot
Returns:
str or None: The screenshot path or None if not available
"""
pass
@abstractmethod
def save_error_text(self, watch_uuid, contents):
"""Save error text for a watch
Args:
watch_uuid (str): Watch UUID
contents (str): Error contents
"""
pass
@abstractmethod
def get_error_text(self, watch_uuid):
"""Get error text for a watch
Args:
watch_uuid (str): Watch UUID
Returns:
str or False: The error text or False if not available
"""
pass
@abstractmethod
def save_xpath_data(self, watch_uuid, data, as_error=False):
"""Save XPath data for a watch
Args:
watch_uuid (str): Watch UUID
data (dict): XPath data
as_error (bool): Whether this is error data
"""
pass
@abstractmethod
def get_xpath_data(self, watch_uuid, is_error=False):
"""Get XPath data for a watch
Args:
watch_uuid (str): Watch UUID
is_error (bool): Whether to get error data
Returns:
dict or None: The XPath data or None if not available
"""
pass
@abstractmethod
def save_last_fetched_html(self, watch_uuid, timestamp, contents):
"""Save last fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
contents (str): HTML contents
"""
pass
@abstractmethod
def get_fetched_html(self, watch_uuid, timestamp):
"""Get fetched HTML for a watch
Args:
watch_uuid (str): Watch UUID
timestamp (int): Timestamp
Returns:
str or False: The HTML or False if not available
"""
pass
@abstractmethod
def save_last_text_fetched_before_filters(self, watch_uuid, contents):
"""Save the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
contents (str): Text contents
"""
pass
@abstractmethod
def get_last_fetched_text_before_filters(self, watch_uuid):
"""Get the last text fetched before filters
Args:
watch_uuid (str): Watch UUID
Returns:
str: The text
"""
pass
@abstractmethod
def ensure_data_dir_exists(self, watch_uuid):
"""Ensure the data directory exists for a watch
Args:
watch_uuid (str): Watch UUID
"""
pass
@abstractmethod
def visualselector_data_is_ready(self, watch_uuid):
"""Check if visual selector data is ready
Args:
watch_uuid (str): Watch UUID
Returns:
bool: Whether visual selector data is ready
"""
pass
@abstractmethod
def clear_watch_history(self, watch_uuid):
"""Clear history for a watch
Args:
watch_uuid (str): Watch UUID
"""
pass
@abstractmethod
def delete_watch(self, watch_uuid):
"""Delete a watch
Args:
watch_uuid (str): Watch UUID
"""
pass

View File

@@ -0,0 +1,33 @@
import re
from loguru import logger
from urllib.parse import urlparse
from .storage_base import StorageBase
from .filesystem_storage import FileSystemStorage
from .mongodb_storage import MongoDBStorage
from .s3_storage import S3Storage
def create_storage(datastore_path, include_default_watches=True, version_tag="0.0.0"):
"""Create a storage backend based on the datastore path
Args:
datastore_path (str): Path to the datastore
include_default_watches (bool): Whether to include default watches
version_tag (str): Version tag
Returns:
StorageBase: The storage backend
"""
# Check if it's a MongoDB URI
if datastore_path.startswith('mongodb://') or datastore_path.startswith('mongodb+srv://'):
logger.info(f"Using MongoDB storage backend with URI {datastore_path}")
return MongoDBStorage(datastore_path, include_default_watches, version_tag)
# Check if it's an S3 URI
if datastore_path.startswith('s3://'):
logger.info(f"Using S3 storage backend with URI {datastore_path}")
return S3Storage(datastore_path, include_default_watches, version_tag)
# Default to filesystem
logger.info(f"Using filesystem storage backend with path {datastore_path}")
return FileSystemStorage(datastore_path, include_default_watches, version_tag)

View File

@@ -20,6 +20,7 @@ from loguru import logger
from .processors import get_custom_watch_obj_for_processor
from .processors.restock_diff import Restock
from .storage.storage_factory import create_storage
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
@@ -38,20 +39,28 @@ class ChangeDetectionStore:
needs_write_urgent = False
__version_check = True
# Singleton instance for access from Watch class methods
instance = None
def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"):
# Should only be active for docker
# logging.basicConfig(filename='/dev/stdout', level=logging.INFO)
self.__data = App.model()
self.datastore_path = datastore_path
self.json_store_path = "{}/url-watches.json".format(self.datastore_path)
logger.info(f"Datastore path is '{self.json_store_path}'")
# Create the appropriate storage backend based on the datastore path
self.storage = create_storage(datastore_path, include_default_watches, version_tag)
self.needs_write = False
self.start_time = time.time()
self.stop_thread = False
# Base definition for all watchers
# deepcopy part of #569 - not sure why its needed exactly
self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={}))
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
# Set singleton instance
ChangeDetectionStore.instance = self
if path.isfile('changedetectionio/source.txt'):
with open('changedetectionio/source.txt') as f:
@@ -60,10 +69,9 @@ class ChangeDetectionStore:
self.__data['build_sha'] = f.read()
try:
# @todo retest with ", encoding='utf-8'"
with open(self.json_store_path) as json_file:
from_disk = json.load(json_file)
# Load data from storage
from_disk = self.storage.load_data()
if from_disk:
# @todo isnt there a way todo this dict.update recursively?
# Problem here is if the one on the disk is missing a sub-struct, it wont be present anymore.
if 'watching' in from_disk:
@@ -91,22 +99,24 @@ class ChangeDetectionStore:
for uuid, tag in self.__data['settings']['application']['tags'].items():
self.__data['settings']['application']['tags'][uuid] = self.rehydrate_entity(uuid, tag, processor_override='restock_diff')
logger.info(f"Tag: {uuid} {tag['title']}")
else:
# First time ran, Create the datastore.
if include_default_watches:
logger.critical(f"No data store found, creating new store")
self.add_watch(url='https://news.ycombinator.com/',
tag='Tech news',
extras={'fetch_backend': 'html_requests'})
# First time ran, Create the datastore.
except (FileNotFoundError):
if include_default_watches:
logger.critical(f"No JSON DB found at {self.json_store_path}, creating JSON store at {self.datastore_path}")
self.add_watch(url='https://news.ycombinator.com/',
tag='Tech news',
extras={'fetch_backend': 'html_requests'})
self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
tag='changedetection.io',
extras={'fetch_backend': 'html_requests'})
self.add_watch(url='https://changedetection.io/CHANGELOG.txt',
tag='changedetection.io',
extras={'fetch_backend': 'html_requests'})
updates_available = self.get_updates_available()
self.__data['settings']['application']['schema_version'] = updates_available.pop()
updates_available = self.get_updates_available()
self.__data['settings']['application']['schema_version'] = updates_available.pop()
except Exception as e:
logger.error(f"Error loading data from storage: {str(e)}")
raise e
else:
# Bump the update version by running updates
self.run_updates()
@@ -227,23 +237,15 @@ class ChangeDetectionStore:
# Delete a single watch by UUID
def delete(self, uuid):
import pathlib
import shutil
with self.lock:
if uuid == 'all':
# Delete all watches
for watch_uuid in list(self.data['watching'].keys()):
self.storage.delete_watch(watch_uuid)
self.__data['watching'] = {}
# GitHub #30 also delete history records
for uuid in self.data['watching']:
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
if os.path.exists(path):
shutil.rmtree(path)
else:
path = pathlib.Path(os.path.join(self.datastore_path, uuid))
if os.path.exists(path):
shutil.rmtree(path)
# Delete a single watch
self.storage.delete_watch(uuid)
del self.data['watching'][uuid]
self.needs_write_urgent = True
@@ -266,6 +268,7 @@ class ChangeDetectionStore:
# Remove a watchs data but keep the entry (URL etc)
def clear_watch_history(self, uuid):
self.storage.clear_watch_history(uuid)
self.__data['watching'][uuid].clear_watch()
self.needs_write_urgent = True
@@ -372,43 +375,30 @@ class ChangeDetectionStore:
return new_uuid
def visualselector_data_is_ready(self, watch_uuid):
output_path = "{}/{}".format(self.datastore_path, watch_uuid)
screenshot_filename = "{}/last-screenshot.png".format(output_path)
elements_index_filename = "{}/elements.deflate".format(output_path)
if path.isfile(screenshot_filename) and path.isfile(elements_index_filename) :
return True
return False
return self.storage.visualselector_data_is_ready(watch_uuid)
def sync_to_json(self):
logger.info("Saving JSON..")
logger.info("Saving data to storage backend...")
try:
data = deepcopy(self.__data)
except RuntimeError as e:
# Try again in 15 seconds
time.sleep(15)
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
logger.error(f"! Data changed when writing to storage, trying again.. {str(e)}")
self.sync_to_json()
return
else:
try:
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
# This is a fairly basic strategy to deal with the case that the file is corrupted,
# system was out of memory, out of RAM etc
with open(self.json_store_path+".tmp", 'w') as json_file:
json.dump(data, json_file, indent=4)
os.replace(self.json_store_path+".tmp", self.json_store_path)
self.storage.save_data(data)
except Exception as e:
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
logger.error(f"Error writing to storage backend: {str(e)}")
self.needs_write = False
self.needs_write_urgent = False
# Thread runner, this helps with thread/write issues when there are many operations that want to update the JSON
# Thread runner, this helps with thread/write issues when there are many operations that want to update the data
# by just running periodically in one thread, according to python, dict updates are threadsafe.
def save_datastore(self):
while True:
if self.stop_thread:
# Suppressing "Logging error in Loguru Handler #0" during CICD.

View File

@@ -100,3 +100,6 @@ referencing==0.35.1
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
tzdata
pymongo>=4.3.3
boto3>=1.26.0