import shutil from changedetectionio.strtobool import strtobool from changedetectionio.validate_url import is_safe_valid_url from flask import ( flash ) from flask_babel import gettext from ..model import App, Watch from copy import deepcopy from os import path, unlink import json import os import re import secrets import sys import time import uuid as uuid_builder from loguru import logger from blinker import signal from ..model.Tags import TagsDict # Try to import orjson for faster JSON serialization try: import orjson HAS_ORJSON = True except ImportError: HAS_ORJSON = False from ..processors import get_custom_watch_obj_for_processor # Import the base class and helpers from .file_saving_datastore import FileSavingDataStore, load_all_watches, load_all_tags, save_json_atomic from .updates import DatastoreUpdatesMixin # Because the server will run as a daemon and wont know the URL for notification links when firing off a notification BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)' dictfilt = lambda x, y: dict([(i, x[i]) for i in x if i in set(y)]) # Is there an existing library to ensure some data store (JSON etc) is in sync with CRUD methods? # Open a github issue if you know something :) # https://stackoverflow.com/questions/6190468/how-to-trigger-function-on-value-change class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore): __version_check = True def __init__(self, datastore_path="/datastore", include_default_watches=True, version_tag="0.0.0"): # Initialize parent class super().__init__() # Should only be active for docker # logging.basicConfig(filename='/dev/stdout', level=logging.INFO) self.datastore_path = datastore_path self.start_time = time.time() self.save_version_copy_json_db(version_tag) self.reload_state(datastore_path=datastore_path, include_default_watches=include_default_watches, version_tag=version_tag) def save_version_copy_json_db(self, version_tag): """ Create version-tagged backup of changedetection.json. This is called on version upgrades to preserve a backup in case the new version has issues. """ import re version_text = re.sub(r'\D+', '-', version_tag) db_path = os.path.join(self.datastore_path, "changedetection.json") db_path_version_backup = os.path.join(self.datastore_path, f"changedetection-{version_text}.json") if not os.path.isfile(db_path_version_backup) and os.path.isfile(db_path): from shutil import copyfile logger.info(f"Backing up changedetection.json due to new version to '{db_path_version_backup}'.") copyfile(db_path, db_path_version_backup) def _load_settings(self, filename="changedetection.json"): """ Load settings from storage. File backend implementation: reads from changedetection.json Returns: dict: Settings data loaded from storage """ changedetection_json = os.path.join(self.datastore_path, filename) logger.info(f"Loading settings from {changedetection_json}") if HAS_ORJSON: with open(changedetection_json, 'rb') as f: return orjson.loads(f.read()) else: with open(changedetection_json, 'r', encoding='utf-8') as f: return json.load(f) def _apply_settings(self, settings_data): """ Apply loaded settings data to internal data structure. Args: settings_data: Dictionary loaded from changedetection.json """ # Apply top-level fields if 'app_guid' in settings_data: self.__data['app_guid'] = settings_data['app_guid'] if 'build_sha' in settings_data: self.__data['build_sha'] = settings_data['build_sha'] if 'version_tag' in settings_data: self.__data['version_tag'] = settings_data['version_tag'] # Apply settings sections if 'settings' in settings_data: if 'headers' in settings_data['settings']: self.__data['settings']['headers'].update(settings_data['settings']['headers']) if 'requests' in settings_data['settings']: self.__data['settings']['requests'].update(settings_data['settings']['requests']) if 'application' in settings_data['settings']: self.__data['settings']['application'].update(settings_data['settings']['application']) # Use our Tags dict with cleanup helpers etc # @todo Same for Watches existing_tags = settings_data.get('settings', {}).get('application', {}).get('tags') or {} self.__data['settings']['application']['tags'] = TagsDict(existing_tags, datastore_path=self.datastore_path) # More or less for the old format which had this data in the one url-watches.json # cant hurt to leave it here, if 'watching' in settings_data: self.__data['watching'].update(settings_data['watching']) def _rehydrate_tags(self): """Rehydrate tag entities from stored data into Tag objects with restock_diff processor.""" from ..model import Tag for uuid, tag in self.__data['settings']['application']['tags'].items(): # Force processor to restock_diff for override functionality (technical debt) tag['processor'] = 'restock_diff' self.__data['settings']['application']['tags'][uuid] = Tag.model( datastore_path=self.datastore_path, __datastore=self.__data, default=tag ) logger.info(f"Tag: {uuid} {tag['title']}") def _rehydrate_watches(self): """Rehydrate watch entities from stored data (converts dicts to Watch objects).""" watch_count = len(self.__data.get('watching', {})) if watch_count == 0: return logger.info(f"Rehydrating {watch_count} watches...") watching_rehydrated = {} for uuid, watch_dict in self.__data.get('watching', {}).items(): if isinstance(watch_dict, dict): watching_rehydrated[uuid] = self.rehydrate_entity(uuid, watch_dict) else: logger.error(f"Watch UUID {uuid} already rehydrated") self.__data['watching'] = watching_rehydrated logger.success(f"Rehydrated {watch_count} watches into Watch objects") def _load_state(self, main_settings_filename="changedetection.json"): """ Load complete datastore state from storage. Orchestrates loading of settings, watches, and tags using polymorphic methods. """ # Load settings settings_data = self._load_settings(filename=main_settings_filename) self._apply_settings(settings_data) # Load watches, scan them from the disk self._load_watches() self._rehydrate_watches() # Load tags from individual tag.json files # These will override any tags in settings (migration path) self._load_tags() # Rehydrate any remaining tags from settings (legacy/fallback) self._rehydrate_tags() def reload_state(self, datastore_path, include_default_watches, version_tag): """ Load datastore from storage or create new one. Supports two scenarios: 1. NEW format: changedetection.json exists → load and run updates if needed 2. EMPTY: No changedetection.json → create new OR trigger migration from legacy Note: Legacy url-watches.json migration happens in update_26, not here. """ logger.info(f"Datastore path is '{datastore_path}'") # CRITICAL: Update datastore_path (was using old path from __init__) self.datastore_path = datastore_path # Initialize data structure self.__data = App.model(datastore_path=datastore_path) self.json_store_path = os.path.join(self.datastore_path, "changedetection.json") # Base definition for all watchers (deepcopy part of #569) self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, __datastore=self.__data, default={})) # Load build SHA if available (Docker deployments) if path.isfile('changedetectionio/source.txt'): with open('changedetectionio/source.txt') as f: self.__data['build_sha'] = f.read() # Check if datastore already exists changedetection_json = os.path.join(self.datastore_path, "changedetection.json") changedetection_json_old_schema = os.path.join(self.datastore_path, "url-watches.json") if os.path.exists(changedetection_json): # Run schema updates if needed # Pass current schema version from loaded datastore (defaults to 0 if not set) # Load existing datastore (changedetection.json + watch.json files) logger.info("Loading existing datastore") self._load_state() current_schema = self.data['settings']['application'].get('schema_version', 0) self.run_updates(current_schema_version=current_schema) # Legacy datastore detected - trigger migration, even works if the schema is much before the migration step. elif os.path.exists(changedetection_json_old_schema): logger.critical(f"Legacy datastore detected at {changedetection_json_old_schema}, loading and running updates") self._load_state(main_settings_filename="url-watches.json") # update 26 will load the whole old config from disk to __data current_schema = self.__data['settings']['application'].get('schema_version', 0) self.run_updates(current_schema_version=current_schema) # Probably tags were also shifted to disk and many other changes, so best to reload here. self._load_state() else: # No datastore yet - check if this is a fresh install or legacy migration self.init_fresh_install(include_default_watches=include_default_watches, version_tag=version_tag) # Maybe they copied a bunch of watch subdirs across too self._load_state() def init_fresh_install(self, include_default_watches, version_tag): # Generate app_guid FIRST (required for all operations) if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ: self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) else: self.__data['app_guid'] = str(uuid_builder.uuid4()) # Generate RSS access token self.__data['settings']['application']['rss_access_token'] = secrets.token_hex(16) # Generate API access token self.__data['settings']['application']['api_access_token'] = secrets.token_hex(16) logger.warning(f"No datastore found, creating new datastore at {self.datastore_path}") # Set schema version to latest (no updates needed) latest_update_available = self.get_updates_available().pop() logger.info(f"Marking fresh install to schema version {latest_update_available}") self.__data['settings']['application']['schema_version'] = latest_update_available # Add default watches if requested if include_default_watches: self.add_watch( url='https://news.ycombinator.com/', tag='Tech news', extras={'fetch_backend': 'html_requests'} ) self.add_watch( url='https://changedetection.io/CHANGELOG.txt', tag='changedetection.io', extras={'fetch_backend': 'html_requests'} ) # Create changedetection.json immediately try: self._save_settings() logger.info("Created changedetection.json for new datastore") except Exception as e: logger.error(f"Failed to create initial changedetection.json: {e}") # Set version tag self.__data['version_tag'] = version_tag # Validate proxies.json if it exists _ = self.proxy_list # Just to test parsing # Ensure app_guid exists (for datastores loaded from existing files) if 'app_guid' not in self.__data: if "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ: self.__data['app_guid'] = "test-" + str(uuid_builder.uuid4()) else: self.__data['app_guid'] = str(uuid_builder.uuid4()) self.commit() # Ensure RSS access token exists if not self.__data['settings']['application'].get('rss_access_token'): secret = secrets.token_hex(16) self.__data['settings']['application']['rss_access_token'] = secret self.commit() # Ensure API access token exists if not self.__data['settings']['application'].get('api_access_token'): secret = secrets.token_hex(16) self.__data['settings']['application']['api_access_token'] = secret self.commit() # Handle password reset lockfile password_reset_lockfile = os.path.join(self.datastore_path, "removepassword.lock") if path.isfile(password_reset_lockfile): self.remove_password() unlink(password_reset_lockfile) def rehydrate_entity(self, uuid, entity, processor_override=None): """Set the dict back to the dict Watch object""" entity['uuid'] = uuid if processor_override: watch_class = get_custom_watch_obj_for_processor(processor_override) entity['processor'] = processor_override else: watch_class = get_custom_watch_obj_for_processor(entity.get('processor')) if entity.get('processor') != 'text_json_diff': logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}") entity = watch_class(datastore_path=self.datastore_path, __datastore=self.__data, default=entity) return entity # ============================================================================ # FileSavingDataStore Abstract Method Implementations # ============================================================================ def _watch_exists(self, uuid): """Check if watch exists in datastore.""" return uuid in self.__data['watching'] def _get_watch_dict(self, uuid): """Get watch as dictionary.""" return dict(self.__data['watching'][uuid]) def _build_settings_data(self): """ Build settings data structure for saving. Tags behavior depends on schema version: - Before update_28 (schema < 28): Tags saved in settings for migration - After update_28 (schema >= 28): Tags excluded from settings (in individual files) Returns: dict: Settings data ready for serialization """ import copy # Deep copy settings to avoid modifying the original settings_copy = copy.deepcopy(self.__data['settings']) # Is saved as {uuid}/tag.json settings_copy['application']['tags'] = {} return { 'note': 'Settings file - watches are in {uuid}/watch.json, tags are in {uuid}/tag.json', 'app_guid': self.__data.get('app_guid'), 'settings': settings_copy, 'build_sha': self.__data.get('build_sha'), 'version_tag': self.__data.get('version_tag') } def _save_settings(self): """ Save settings to storage. File backend implementation: saves to changedetection.json Implementation of abstract method from FileSavingDataStore. Uses the generic save_json_atomic helper. Raises: OSError: If disk is full or other I/O error """ settings_data = self._build_settings_data() changedetection_json = os.path.join(self.datastore_path, "changedetection.json") save_json_atomic(changedetection_json, settings_data, label="settings") def _load_watches(self): """ Load all watches from storage. File backend implementation: reads individual watch.json files Implementation of abstract method from FileSavingDataStore. Delegates to helper function and stores results in internal data structure. """ # Store loaded data # @note this will also work for the old legacy format because self.__data['watching'] should already have them loaded by this point. self.__data['watching'].update(load_all_watches( self.datastore_path, self.rehydrate_entity )) logger.debug(f"Loaded {len(self.__data['watching'])} watches") def _load_tags(self): """ Load all tags from storage. File backend implementation: reads individual tag.json files. Tags loaded from files override any tags in settings (migration path). """ from ..model import Tag def rehydrate_tag(uuid, entity_dict): """Rehydrate tag as Tag object with forced restock_diff processor.""" entity_dict['uuid'] = uuid entity_dict['processor'] = 'restock_diff' # Force processor for override functionality return Tag.model( datastore_path=self.datastore_path, __datastore=self.__data, default=entity_dict ) tags = load_all_tags( self.datastore_path, rehydrate_tag ) # Override settings tags with loaded tags # This ensures tag.json files take precedence over settings if tags: self.__data['settings']['application']['tags'].update(tags) logger.info(f"Loaded {len(tags)} tags from individual tag.json files") def _delete_watch(self, uuid): """ Delete a watch from storage. File backend implementation: deletes entire {uuid}/ directory recursively. Implementation of abstract method from FileSavingDataStore. Args: uuid: Watch UUID to delete """ watch_dir = os.path.join(self.datastore_path, uuid) if os.path.exists(watch_dir): shutil.rmtree(watch_dir) logger.info(f"Deleted watch directory: {watch_dir}") # ============================================================================ # Watch Management Methods # ============================================================================ def set_last_viewed(self, uuid, timestamp): logger.debug(f"Setting watch UUID: {uuid} last viewed to {int(timestamp)}") self.data['watching'][uuid].update({'last_viewed': int(timestamp)}) self.data['watching'][uuid].commit() watch_check_update = signal('watch_check_update') if watch_check_update: watch_check_update.send(watch_uuid=uuid) def remove_password(self): self.__data['settings']['application']['password'] = False self.commit() def clear_all_last_checksums(self): """ Delete all last-checksum.txt files to force reprocessing of all watches. This should be called when global settings change, since watches inherit configuration and need to reprocess even if their individual watch dict hasn't been modified. Note: We delete the checksum file rather than setting was_edited=True because: - was_edited is not persisted across restarts - File deletion ensures reprocessing works across app restarts """ deleted_count = 0 for uuid in self.__data['watching'].keys(): watch = self.__data['watching'][uuid] if watch.data_dir: checksum_file = os.path.join(watch.data_dir, 'last-checksum.txt') if os.path.isfile(checksum_file): try: os.remove(checksum_file) deleted_count += 1 logger.debug(f"Cleared checksum for watch {uuid}") except OSError as e: logger.warning(f"Failed to delete checksum file for {uuid}: {e}") logger.info(f"Cleared {deleted_count} checksum files to force reprocessing") return deleted_count def clear_checksums_for_tag(self, tag_uuid): """ Delete last-checksum.txt files for all watches using a specific tag. This should be called when a tag configuration is edited, since watches inherit tag settings and need to reprocess. Args: tag_uuid: UUID of the tag that was modified Returns: int: Number of checksum files deleted """ deleted_count = 0 for uuid, watch in self.__data['watching'].items(): if watch.get('tags') and tag_uuid in watch['tags']: if watch.data_dir: checksum_file = os.path.join(watch.data_dir, 'last-checksum.txt') if os.path.isfile(checksum_file): try: os.remove(checksum_file) deleted_count += 1 logger.debug(f"Cleared checksum for watch {uuid} (tag {tag_uuid})") except OSError as e: logger.warning(f"Failed to delete checksum file for {uuid}: {e}") logger.info(f"Cleared {deleted_count} checksum files for tag {tag_uuid}") return deleted_count def commit(self): """ Save settings immediately to disk using atomic write. Uses atomic write pattern (temp file + rename) for crash safety. Fire-and-forget: Logs errors but does not raise exceptions. Settings data remains in memory even if save fails, so next commit will retry. """ try: self._save_settings() logger.debug("Committed settings") except Exception as e: logger.error(f"Failed to commit settings: {e}") def update_watch(self, uuid, update_obj): # It's possible that the watch could be deleted before update if not self.__data['watching'].get(uuid): return with self.lock: # In python 3.9 we have the |= dict operator, but that still will lose data on nested structures... for dict_key, d in self.generic_definition.items(): if isinstance(d, dict): if update_obj is not None and dict_key in update_obj: self.__data['watching'][uuid][dict_key].update(update_obj[dict_key]) del (update_obj[dict_key]) self.__data['watching'][uuid].update(update_obj) # Immediate save self.__data['watching'][uuid].commit() @property def threshold_seconds(self): seconds = 0 for m, n in Watch.mtable.items(): x = self.__data['settings']['requests']['time_between_check'].get(m) if x: seconds += x * n return seconds @property def unread_changes_count(self): unread_changes_count = 0 for uuid, watch in self.__data['watching'].items(): if watch.history_n >= 2 and watch.viewed == False: unread_changes_count += 1 return unread_changes_count @property def data(self): # Re #152, Return env base_url if not overriden # Re #148 - Some people have just {{ base_url }} in the body or title, but this may break some notification services # like 'Join', so it's always best to atleast set something obvious so that they are not broken. active_base_url = BASE_URL_NOT_SET_TEXT if self.__data['settings']['application'].get('base_url'): active_base_url = self.__data['settings']['application'].get('base_url') elif os.getenv('BASE_URL'): active_base_url = os.getenv('BASE_URL') # I looked at various ways todo the following, but in the end just copying the dict seemed simplest/most reliable # even given the memory tradeoff - if you know a better way.. maybe return d|self.__data.. or something d = self.__data d['settings']['application']['active_base_url'] = active_base_url.strip('" ') return d # Delete a single watch by UUID def delete(self, uuid): """ Delete a watch by UUID. Uses abstracted storage method for backend-agnostic deletion. Supports 'all' to delete all watches (mainly for testing). Args: uuid: Watch UUID to delete, or 'all' to delete all watches """ with self.lock: if uuid == 'all': # Delete all watches - capture UUIDs first before modifying dict all_uuids = list(self.__data['watching'].keys()) for watch_uuid in all_uuids: # Delete from storage using polymorphic method try: self._delete_watch(watch_uuid) except Exception as e: logger.error(f"Failed to delete watch {watch_uuid} from storage: {e}") # Send delete signal watch_delete_signal = signal('watch_deleted') if watch_delete_signal: watch_delete_signal.send(watch_uuid=watch_uuid) # Clear the dict self.__data['watching'] = {} # Mainly used for testing to allow all items to flush before running next test time.sleep(1) else: # Delete single watch from storage using polymorphic method try: self._delete_watch(uuid) except Exception as e: logger.error(f"Failed to delete watch {uuid} from storage: {e}") # Remove from watching dict del self.data['watching'][uuid] # Send delete signal watch_delete_signal = signal('watch_deleted') if watch_delete_signal: watch_delete_signal.send(watch_uuid=uuid) # Clone a watch by UUID def clone(self, uuid): url = self.data['watching'][uuid].get('url') # No need to deepcopy here - add_watch() will deepcopy extras anyway (line 569) # Just pass a dict copy (with lock for thread safety) # NOTE: dict() is shallow copy but safe since add_watch() deepcopies it with self.lock: extras = dict(self.data['watching'][uuid]) new_uuid = self.add_watch(url=url, extras=extras) watch = self.data['watching'][new_uuid] return new_uuid def url_exists(self, url): # Probably their should be dict... for watch in self.data['watching'].values(): if watch['url'].lower() == url.lower(): return True return False # Remove a watchs data but keep the entry (URL etc) def clear_watch_history(self, uuid): self.__data['watching'][uuid].clear_watch() self.__data['watching'][uuid].commit() def add_watch(self, url, tag='', extras=None, tag_uuids=None, save_immediately=True): if extras is None: extras = {} # Incase these are copied across, assume it's a reference and deepcopy() apply_extras = deepcopy(extras) apply_extras['tags'] = [] if not apply_extras.get('tags') else apply_extras.get('tags') # Was it a share link? try to fetch the data if (url.startswith("https://changedetection.io/share/")): import requests try: r = requests.request(method="GET", url=url, # So we know to return the JSON instead of the human-friendly "help" page headers={'App-Guid': self.__data['app_guid']}, timeout=5.0) # 5 second timeout to prevent blocking res = r.json() # List of permissible attributes we accept from the wild internet for k in [ 'body', 'browser_steps', 'css_filter', 'extract_text', 'headers', 'ignore_text', 'include_filters', 'method', 'paused', 'previous_md5', 'processor', 'subtractive_selectors', 'tag', 'tags', 'text_should_not_be_present', 'title', 'trigger_text', 'url', 'use_page_title_in_list', 'webdriver_js_execute_code', ]: if res.get(k): if k != 'css_filter': apply_extras[k] = res[k] else: # We renamed the field and made it a list apply_extras['include_filters'] = [res['css_filter']] except Exception as e: logger.error(f"Error fetching metadata for shared watch link {url} {str(e)}") flash(gettext("Error fetching metadata for {}").format(url), 'error') return False if not is_safe_valid_url(url): from flask import has_request_context if has_request_context(): flash(gettext('Watch protocol is not permitted or invalid URL format'), 'error') else: logger.error(f"add_watch: URL '{url}' is not permitted or invalid, skipping.") return None # Check PAGE_WATCH_LIMIT if set page_watch_limit = os.getenv('PAGE_WATCH_LIMIT') if page_watch_limit: try: page_watch_limit = int(page_watch_limit) current_watch_count = len(self.__data['watching']) if current_watch_count >= page_watch_limit: logger.error(f"Watch limit reached: {current_watch_count}/{page_watch_limit} watches. Cannot add {url}") flash(gettext("Watch limit reached ({}/{} watches). Cannot add more watches.").format(current_watch_count, page_watch_limit), 'error') return None except ValueError: logger.warning(f"Invalid PAGE_WATCH_LIMIT value: {page_watch_limit}, ignoring limit check") if tag and type(tag) == str: # Then it's probably a string of the actual tag by name, split and add it for t in tag.split(','): # for each stripped tag, add tag as UUID for a_t in t.split(','): tag_uuid = self.add_tag(a_t) apply_extras['tags'].append(tag_uuid) # Or if UUIDs given directly if tag_uuids: for t in tag_uuids: apply_extras['tags'] = list(set(apply_extras['tags'] + [t.strip()])) # Make any uuids unique if apply_extras.get('tags'): apply_extras['tags'] = list(set(apply_extras.get('tags'))) # If the processor also has its own Watch implementation watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor')) new_watch = watch_class(datastore_path=self.datastore_path, __datastore=self.__data, url=url) new_uuid = new_watch.get('uuid') logger.debug(f"Adding URL '{url}' - {new_uuid}") for k in ['uuid', 'history', 'last_checked', 'last_changed', 'newest_history_key', 'previous_md5', 'viewed']: if k in apply_extras: del apply_extras[k] if not apply_extras.get('date_created'): apply_extras['date_created'] = int(time.time()) new_watch.update(apply_extras) new_watch.ensure_data_dir_exists() self.__data['watching'][new_uuid] = new_watch if save_immediately: # Save immediately using commit new_watch.commit() logger.debug(f"Saved new watch {new_uuid}") logger.debug(f"Added '{url}'") return new_uuid def _watch_resource_exists(self, watch_uuid, resource_name): """ Check if a watch-related resource exists. File backend implementation: checks if file exists in watch directory. Args: watch_uuid: Watch UUID resource_name: Name of resource (e.g., "last-screenshot.png") Returns: bool: True if resource exists """ resource_path = os.path.join(self.datastore_path, watch_uuid, resource_name) return path.isfile(resource_path) def visualselector_data_is_ready(self, watch_uuid): """ Check if visual selector data (screenshot + elements) is ready. Returns: bool: True if both screenshot and elements data exist """ has_screenshot = self._watch_resource_exists(watch_uuid, "last-screenshot.png") has_elements = self._watch_resource_exists(watch_uuid, "elements.deflate") return has_screenshot and has_elements # Old sync_to_json and save_datastore methods removed - now handled by FileSavingDataStore parent class @property def proxy_list(self): proxy_list = {} proxy_list_file = os.path.join(self.datastore_path, 'proxies.json') # Load from external config file if path.isfile(proxy_list_file): if HAS_ORJSON: # orjson.loads() expects UTF-8 encoded bytes #3611 with open(os.path.join(self.datastore_path, "proxies.json"), 'rb') as f: proxy_list = orjson.loads(f.read()) else: with open(os.path.join(self.datastore_path, "proxies.json"), encoding='utf-8') as f: proxy_list = json.load(f) # Mapping from UI config if available extras = self.data['settings']['requests'].get('extra_proxies') if extras: i = 0 for proxy in extras: i += 0 if proxy.get('proxy_name') and proxy.get('proxy_url'): k = "ui-" + str(i) + proxy.get('proxy_name') proxy_list[k] = {'label': proxy.get('proxy_name'), 'url': proxy.get('proxy_url')} if proxy_list and strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')): proxy_list["no-proxy"] = {'label': "No proxy", 'url': ''} return proxy_list if len(proxy_list) else None def get_preferred_proxy_for_watch(self, uuid): """ Returns the preferred proxy by ID key :param uuid: UUID :return: proxy "key" id """ if self.proxy_list is None: return None # If it's a valid one watch = self.data['watching'].get(uuid) if strtobool(os.getenv('ENABLE_NO_PROXY_OPTION', 'True')) and watch.get('proxy') == "no-proxy": return None if watch.get('proxy') and watch.get('proxy') in list(self.proxy_list.keys()): return watch.get('proxy') # not valid (including None), try the system one else: system_proxy_id = self.data['settings']['requests'].get('proxy') # Is not None and exists if self.proxy_list.get(system_proxy_id): return system_proxy_id # Fallback - Did not resolve anything, or doesnt exist, use the first available if system_proxy_id is None or not self.proxy_list.get(system_proxy_id): first_default = list(self.proxy_list)[0] return first_default return None @property def has_extra_headers_file(self): filepath = os.path.join(self.datastore_path, 'headers.txt') return os.path.isfile(filepath) def get_all_base_headers(self): headers = {} # Global app settings headers.update(self.data['settings'].get('headers', {})) return headers def get_all_headers_in_textfile_for_watch(self, uuid): from ..model.App import parse_headers_from_text_file headers = {} # Global in /datastore/headers.txt filepath = os.path.join(self.datastore_path, 'headers.txt') try: if os.path.isfile(filepath): headers.update(parse_headers_from_text_file(filepath)) except Exception as e: logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") watch = self.data['watching'].get(uuid) if watch: # In /datastore/xyz-xyz/headers.txt filepath = os.path.join(watch.data_dir, 'headers.txt') try: if os.path.isfile(filepath): headers.update(parse_headers_from_text_file(filepath)) except Exception as e: logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") # In /datastore/tag-name.txt tags = self.get_all_tags_for_watch(uuid=uuid) for tag_uuid, tag in tags.items(): fname = "headers-" + re.sub(r'[\W_]', '', tag.get('title')).lower().strip() + ".txt" filepath = os.path.join(self.datastore_path, fname) try: if os.path.isfile(filepath): headers.update(parse_headers_from_text_file(filepath)) except Exception as e: logger.error(f"ERROR reading headers.txt at {filepath} {str(e)}") return headers def get_tag_overrides_for_watch(self, uuid, attr): tags = self.get_all_tags_for_watch(uuid=uuid) ret = [] if tags: for tag_uuid, tag in tags.items(): if attr in tag and tag[attr]: ret = [*ret, *tag[attr]] return ret def add_tag(self, title): # If name exists, return that n = title.strip().lower() logger.debug(f">>> Adding new tag - '{n}'") if not n: return False for uuid, tag in self.__data['settings']['application'].get('tags', {}).items(): if n == tag.get('title', '').lower().strip(): logger.warning(f"Tag '{title}' already exists, skipping creation.") return uuid # Eventually almost everything todo with a watch will apply as a Tag # So we use the same model as a Watch with self.lock: from ..model import Tag new_tag = Tag.model( datastore_path=self.datastore_path, __datastore=self.__data, default={ 'title': title.strip(), 'date_created': int(time.time()) } ) new_uuid = new_tag.get('uuid') self.__data['settings']['application']['tags'][new_uuid] = new_tag # Save tag to its own tag.json file instead of settings new_tag.commit() return new_uuid def get_all_tags_for_watch(self, uuid): """This should be in Watch model but Watch doesn't have access to datastore, not sure how to solve that yet""" watch = self.data['watching'].get(uuid) # Should return a dict of full tag info linked by UUID if watch: return dictfilt(self.__data['settings']['application']['tags'], watch.get('tags', [])) return {} @property def extra_browsers(self): res = [] p = list(filter( lambda s: (s.get('browser_name') and s.get('browser_connection_url')), self.__data['settings']['requests'].get('extra_browsers', []))) if p: for i in p: res.append(("extra_browser_" + i['browser_name'], i['browser_name'])) return res def tag_exists_by_name(self, tag_name): # Check if any tag dictionary has a 'title' attribute matching the provided tag_name tags = self.__data['settings']['application']['tags'].values() return next((v for v in tags if v.get('title', '').lower() == tag_name.lower()), None) def any_watches_have_processor_by_name(self, processor_name): for watch in self.data['watching'].values(): if watch.get('processor') == processor_name: return True return False def search_watches_for_url(self, query, tag_limit=None, partial=False): """Search watches by URL, title, or error messages Args: query (str): Search term to match against watch URLs, titles, and error messages tag_limit (str, optional): Optional tag name to limit search results partial: (bool, optional): sub-string matching Returns: list: List of UUIDs of watches that match the search criteria """ matching_uuids = [] query = query.lower().strip() tag = self.tag_exists_by_name(tag_limit) if tag_limit else False for uuid, watch in self.data['watching'].items(): # Filter by tag if requested if tag_limit: if not tag.get('uuid') in watch.get('tags', []): continue # Search in URL, title, or error messages if partial: if ((watch.get('title') and query in watch.get('title').lower()) or query in watch.get('url', '').lower() or (watch.get('last_error') and query in watch.get('last_error').lower())): matching_uuids.append(uuid) else: if ((watch.get('title') and query == watch.get('title').lower()) or query == watch.get('url', '').lower() or (watch.get('last_error') and query == watch.get('last_error').lower())): matching_uuids.append(uuid) return matching_uuids def get_unique_notification_tokens_available(self): # Ask each type of watch if they have any extra notification token to add to the validation extra_notification_tokens = {} watch_processors_checked = set() for watch_uuid, watch in self.__data['watching'].items(): processor = watch.get('processor') if processor not in watch_processors_checked: extra_notification_tokens.update(watch.extra_notification_token_values()) watch_processors_checked.add(processor) return extra_notification_tokens def get_unique_notification_token_placeholders_available(self): # The actual description of the tokens, could be combined with get_unique_notification_tokens_available instead of doing this twice extra_notification_tokens = [] watch_processors_checked = set() for watch_uuid, watch in self.__data['watching'].items(): processor = watch.get('processor') if processor not in watch_processors_checked: extra_notification_tokens += watch.extra_notification_token_placeholder_info() watch_processors_checked.add(processor) return extra_notification_tokens def add_notification_url(self, notification_url): logger.debug(f">>> Adding new notification_url - '{notification_url}'") notification_urls = self.data['settings']['application'].get('notification_urls', []) if notification_url in notification_urls: return notification_url with self.lock: notification_urls = self.__data['settings']['application'].get('notification_urls', []) if notification_url in notification_urls: return notification_url # Append and update the datastore notification_urls.append(notification_url) self.__data['settings']['application']['notification_urls'] = notification_urls self.commit() return notification_url # Schema update methods moved to store/updates.py (DatastoreUpdatesMixin) # This includes: get_updates_available(), run_updates(), and update_1() through update_26()