mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-06 14:26:07 +00:00
Compare commits
6 Commits
jinja2-esc
...
watch-memo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
799818dd40 | ||
|
|
b06797636c | ||
|
|
fcd07e23f3 | ||
|
|
4fd477a60c | ||
|
|
dc8b387f40 | ||
|
|
2149a6fe3b |
@@ -66,47 +66,42 @@ class Watch(Resource):
|
||||
@validate_openapi_request('getWatch')
|
||||
def get(self, uuid):
|
||||
"""Get information about a single watch, recheck, pause, or mute."""
|
||||
import time
|
||||
from copy import deepcopy
|
||||
watch = None
|
||||
# Retry up to 20 times if dict is being modified
|
||||
# With sleep(0), this is fast: ~200µs best case, ~20ms worst case under heavy load
|
||||
for attempt in range(20):
|
||||
try:
|
||||
watch = deepcopy(self.datastore.data['watching'].get(uuid))
|
||||
break
|
||||
except RuntimeError:
|
||||
# Dict changed during deepcopy, retry after yielding to scheduler
|
||||
# sleep(0) releases GIL and yields - no fixed delay, just lets other threads run
|
||||
if attempt < 19: # Don't yield on last attempt
|
||||
time.sleep(0) # Yield to scheduler (microseconds, not milliseconds)
|
||||
|
||||
if not watch:
|
||||
# Get watch reference first (for pause/mute operations)
|
||||
watch_obj = self.datastore.data['watching'].get(uuid)
|
||||
if not watch_obj:
|
||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||
|
||||
# Create a dict copy for JSON response (with lock for thread safety)
|
||||
# This is much faster than deepcopy and doesn't copy the datastore reference
|
||||
# WARNING: dict() is a SHALLOW copy - nested dicts are shared with original!
|
||||
# Only safe because we only ADD scalar properties (line 97-101), never modify nested dicts
|
||||
# If you need to modify nested dicts, use: from copy import deepcopy; watch = deepcopy(dict(watch_obj))
|
||||
with self.datastore.lock:
|
||||
watch = dict(watch_obj)
|
||||
|
||||
if request.args.get('recheck'):
|
||||
worker_pool.queue_item_async_safe(self.update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': uuid}))
|
||||
return "OK", 200
|
||||
if request.args.get('paused', '') == 'paused':
|
||||
self.datastore.data['watching'].get(uuid).pause()
|
||||
watch_obj.pause()
|
||||
return "OK", 200
|
||||
elif request.args.get('paused', '') == 'unpaused':
|
||||
self.datastore.data['watching'].get(uuid).unpause()
|
||||
watch_obj.unpause()
|
||||
return "OK", 200
|
||||
if request.args.get('muted', '') == 'muted':
|
||||
self.datastore.data['watching'].get(uuid).mute()
|
||||
watch_obj.mute()
|
||||
return "OK", 200
|
||||
elif request.args.get('muted', '') == 'unmuted':
|
||||
self.datastore.data['watching'].get(uuid).unmute()
|
||||
watch_obj.unmute()
|
||||
return "OK", 200
|
||||
|
||||
# Return without history, get that via another API call
|
||||
# Properties are not returned as a JSON, so add the required props manually
|
||||
watch['history_n'] = watch.history_n
|
||||
watch['history_n'] = watch_obj.history_n
|
||||
# attr .last_changed will check for the last written text snapshot on change
|
||||
watch['last_changed'] = watch.last_changed
|
||||
watch['viewed'] = watch.viewed
|
||||
watch['link'] = watch.link,
|
||||
watch['last_changed'] = watch_obj.last_changed
|
||||
watch['viewed'] = watch_obj.viewed
|
||||
watch['link'] = watch_obj.link,
|
||||
|
||||
return watch
|
||||
|
||||
|
||||
@@ -59,6 +59,14 @@
|
||||
{{ _('Set to') }} <strong>0</strong> {{ _('to disable') }}
|
||||
</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.application.form.history_snapshot_max_length, class="history_snapshot_max_length") }}
|
||||
<span class="pure-form-message-inline">{{ _('Limit collection of history snapshots for each watch to this number of history items.') }}
|
||||
<br>
|
||||
{{ _('Set to empty to disable / no limit') }}
|
||||
</span>
|
||||
</div>
|
||||
|
||||
<div class="pure-control-group">
|
||||
{% if not hide_remove_pass %}
|
||||
{% if current_user.is_authenticated %}
|
||||
|
||||
@@ -197,7 +197,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
# Recast it if need be to right data Watch handler
|
||||
watch_class = processors.get_custom_watch_obj_for_processor(form.data.get('processor'))
|
||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, default=datastore.data['watching'][uuid])
|
||||
datastore.data['watching'][uuid] = watch_class(datastore_path=datastore.datastore_path, __datastore=datastore.data, default=datastore.data['watching'][uuid])
|
||||
flash(gettext("Updated watch - unpaused!") if request.args.get('unpause_on_save') else gettext("Updated watch."))
|
||||
|
||||
# Cleanup any browsersteps session for this watch
|
||||
|
||||
@@ -115,6 +115,13 @@
|
||||
{{ _('Sends a notification when the filter can no longer be seen on the page, good for knowing when the page changed and your filter will not work anymore.') }}
|
||||
</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_field(form.history_snapshot_max_length, class="history_snapshot_max_length") }}
|
||||
<span class="pure-form-message-inline">{{ _('Limit collection of history snapshots for each watch to this number of history items.') }}
|
||||
<br>
|
||||
{{ _('Set to empty to use system settings default') }}
|
||||
</span>
|
||||
</div>
|
||||
<div class="pure-control-group">
|
||||
{{ render_ternary_field(form.use_page_title_in_list) }}
|
||||
</div>
|
||||
|
||||
@@ -837,6 +837,8 @@ class processor_text_json_diff_form(commonSettingsForm):
|
||||
conditions = FieldList(FormField(ConditionFormRow), min_entries=1) # Add rule logic here
|
||||
use_page_title_in_list = TernaryNoneBooleanField(_l('Use page <title> in list'), default=None)
|
||||
|
||||
history_snapshot_max_length = IntegerField(_l('Number of history items per watch to keep'), render_kw={"style": "width: 5em;"}, validators=[validators.Optional(), validators.NumberRange(min=2)])
|
||||
|
||||
def extra_tab_content(self):
|
||||
return None
|
||||
|
||||
@@ -1034,6 +1036,8 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
||||
render_kw={"style": "width: 5em;"},
|
||||
validators=[validators.NumberRange(min=0,
|
||||
message=_l("Should contain zero or more attempts"))])
|
||||
|
||||
history_snapshot_max_length = IntegerField(_l('Number of history items per watch to keep'), render_kw={"style": "width: 5em;"}, validators=[validators.Optional(), validators.NumberRange(min=2)])
|
||||
ui = FormField(globalSettingsApplicationUIForm)
|
||||
|
||||
|
||||
|
||||
@@ -46,6 +46,7 @@ class model(dict):
|
||||
'filter_failure_notification_threshold_attempts': _FILTER_FAILURE_THRESHOLD_ATTEMPTS_DEFAULT,
|
||||
'global_ignore_text': [], # List of text to ignore when calculating the comparison checksum
|
||||
'global_subtractive_selectors': [],
|
||||
'history_snapshot_max_length': None,
|
||||
'ignore_whitespace': True,
|
||||
'ignore_status_codes': False, #@todo implement, as ternary.
|
||||
'ssim_threshold': '0.96', # Default SSIM threshold for screenshot comparison
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
import gc
|
||||
from copy import copy
|
||||
|
||||
from blinker import signal
|
||||
from changedetectionio.validate_url import is_safe_valid_url
|
||||
|
||||
@@ -13,7 +16,7 @@ from .. import jinja2_custom as safe_jinja
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
|
||||
FAVICON_RESAVE_THRESHOLD_SECONDS=86400
|
||||
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024))
|
||||
BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD', 1024*20))
|
||||
|
||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||
@@ -109,8 +112,15 @@ class model(watch_base):
|
||||
self.__datastore_path = kw.get('datastore_path')
|
||||
if kw.get('datastore_path'):
|
||||
del kw['datastore_path']
|
||||
|
||||
|
||||
self.__datastore = kw.get('__datastore')
|
||||
if not self.__datastore:
|
||||
raise ValueError("Watch object requires '__datastore' reference - cannot access global settings without it")
|
||||
if kw.get('__datastore'):
|
||||
del kw['__datastore']
|
||||
|
||||
super(model, self).__init__(*arg, **kw)
|
||||
|
||||
if kw.get('default'):
|
||||
self.update(kw['default'])
|
||||
del kw['default']
|
||||
@@ -121,6 +131,95 @@ class model(watch_base):
|
||||
# Be sure the cached timestamp is ready
|
||||
bump = self.history
|
||||
|
||||
def __deepcopy__(self, memo):
|
||||
"""
|
||||
Custom deepcopy that excludes __datastore to prevent memory leaks.
|
||||
|
||||
CRITICAL FIX: Without this, deepcopy(watch) copies the entire datastore
|
||||
(which contains all other watches), causing exponential memory growth.
|
||||
With 100 watches, this creates 10,000 watch objects in memory (100²).
|
||||
|
||||
This is called by:
|
||||
- api/Watch.py:76 (API endpoint)
|
||||
- processors/base.py:26 (EVERY processor run)
|
||||
- store/__init__.py:544 (clone watch)
|
||||
- And 4+ other locations
|
||||
"""
|
||||
from copy import deepcopy
|
||||
|
||||
# Create a new instance without calling __init__ (avoids __datastore requirement)
|
||||
cls = self.__class__
|
||||
new_watch = cls.__new__(cls)
|
||||
memo[id(self)] = new_watch
|
||||
|
||||
# Copy the dict data (all the watch settings)
|
||||
for key, value in self.items():
|
||||
new_watch[key] = deepcopy(value, memo)
|
||||
|
||||
# Copy instance attributes EXCEPT the datastore references
|
||||
# These are cached/computed values that need to be preserved
|
||||
new_watch._model__newest_history_key = self._model__newest_history_key
|
||||
new_watch._model__history_n = self._model__history_n
|
||||
new_watch.jitter_seconds = self.jitter_seconds
|
||||
|
||||
# Copy datastore_path (string, safe to copy)
|
||||
new_watch._model__datastore_path = self._model__datastore_path
|
||||
|
||||
# CRITICAL: Share the datastore reference (don't copy it!)
|
||||
# This is safe because we never modify the datastore through the watch
|
||||
new_watch._model__datastore = self._model__datastore
|
||||
|
||||
# Do NOT copy favicon cache - let it be regenerated on demand
|
||||
# This is just a performance cache (prevents repeated glob operations)
|
||||
# and will be rebuilt automatically on first access
|
||||
|
||||
return new_watch
|
||||
|
||||
def __getstate__(self):
|
||||
"""
|
||||
Custom pickle serialization that excludes __datastore.
|
||||
|
||||
This handles pickle/unpickle (used by multiprocessing, caching, etc.)
|
||||
and ensures the datastore reference is never serialized.
|
||||
"""
|
||||
# Get the dict data
|
||||
state = dict(self)
|
||||
|
||||
# Add the instance attributes we want to preserve
|
||||
state['__watch_metadata__'] = {
|
||||
'newest_history_key': self._model__newest_history_key,
|
||||
'history_n': self._model__history_n,
|
||||
'jitter_seconds': self.jitter_seconds,
|
||||
'datastore_path': self._model__datastore_path,
|
||||
}
|
||||
|
||||
# NOTE: __datastore and _favicon_filename_cache are intentionally excluded
|
||||
# Both will be regenerated/restored as needed
|
||||
return state
|
||||
|
||||
def __setstate__(self, state):
|
||||
"""
|
||||
Custom pickle deserialization.
|
||||
|
||||
WARNING: This creates a Watch without a __datastore reference!
|
||||
The caller MUST set watch._model__datastore after unpickling.
|
||||
"""
|
||||
# Extract metadata
|
||||
metadata = state.pop('__watch_metadata__', {})
|
||||
|
||||
# Restore dict data
|
||||
self.update(state)
|
||||
|
||||
# Restore instance attributes
|
||||
self._model__newest_history_key = metadata.get('newest_history_key')
|
||||
self._model__history_n = metadata.get('history_n', 0)
|
||||
self.jitter_seconds = metadata.get('jitter_seconds', 0)
|
||||
self._model__datastore_path = metadata.get('datastore_path')
|
||||
|
||||
# __datastore is NOT restored - caller must set it!
|
||||
# _favicon_filename_cache is NOT restored - will regenerate on demand
|
||||
self._model__datastore = None
|
||||
|
||||
@property
|
||||
def viewed(self):
|
||||
# Don't return viewed when last_viewed is 0 and newest_key is 0
|
||||
@@ -423,16 +522,49 @@ class model(watch_base):
|
||||
with open(filepath, 'r', encoding='utf-8', errors='ignore') as f:
|
||||
return f.read()
|
||||
|
||||
def _write_atomic(self, dest, data):
|
||||
def _write_atomic(self, dest, data, mode='wb'):
|
||||
"""Write data atomically to dest using a temp file"""
|
||||
if not os.path.exists(dest):
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile('wb', delete=False, dir=self.watch_data_dir) as tmp:
|
||||
tmp.write(data)
|
||||
tmp.flush()
|
||||
os.fsync(tmp.fileno())
|
||||
tmp_path = tmp.name
|
||||
os.replace(tmp_path, dest)
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(mode, delete=False, dir=self.watch_data_dir) as tmp:
|
||||
tmp.write(data)
|
||||
tmp.flush()
|
||||
os.fsync(tmp.fileno())
|
||||
tmp_path = tmp.name
|
||||
os.replace(tmp_path, dest)
|
||||
|
||||
def history_trim(self, newest_n_items):
|
||||
from pathlib import Path
|
||||
|
||||
# Sort by timestamp (key)
|
||||
sorted_items = sorted(self.history.items(), key=lambda x: int(x[0]))
|
||||
|
||||
keep_part = dict(sorted_items[-newest_n_items:])
|
||||
delete_part = dict(sorted_items[:-newest_n_items])
|
||||
logger.info( f"[{self.get('uuid')}] Trimming history to most recent {newest_n_items} items, keeping {len(keep_part)} items deleting {len(delete_part)} items.")
|
||||
|
||||
if delete_part:
|
||||
for item in delete_part.items():
|
||||
try:
|
||||
Path(item[1]).unlink(missing_ok=True)
|
||||
except Exception as e:
|
||||
logger.critical(f"{str(e)}")
|
||||
finally:
|
||||
logger.debug(f"[{self.get('uuid')}] Deleted {item[1]} history snapshot")
|
||||
try:
|
||||
dest = os.path.join(self.watch_data_dir, self.history_index_filename)
|
||||
output = "\r\n".join(
|
||||
f"{k},{Path(v).name}"
|
||||
for k, v in keep_part.items()
|
||||
)+"\r\n"
|
||||
self._write_atomic(dest=dest, data=output, mode='w')
|
||||
except Exception as e:
|
||||
logger.critical(f"{str(e)}")
|
||||
finally:
|
||||
logger.debug(f"[{self.get('uuid')}] Updated history index {dest}")
|
||||
|
||||
# reimport
|
||||
bump = self.history
|
||||
gc.collect()
|
||||
|
||||
# Save some text file to the appropriate path and bump the history
|
||||
# result_obj from fetch_site_status.run()
|
||||
@@ -441,7 +573,6 @@ class model(watch_base):
|
||||
logger.trace(f"{self.get('uuid')} - Updating {self.history_index_filename} with timestamp {timestamp}")
|
||||
|
||||
self.ensure_data_dir_exists()
|
||||
|
||||
skip_brotli = strtobool(os.getenv('DISABLE_BROTLI_TEXT_SNAPSHOT', 'False'))
|
||||
|
||||
# Binary data - detect file type and save without compression
|
||||
@@ -501,6 +632,15 @@ class model(watch_base):
|
||||
self.__newest_history_key = timestamp
|
||||
self.__history_n += 1
|
||||
|
||||
|
||||
maxlen = (
|
||||
self.get('history_snapshot_max_length')
|
||||
or (self.__datastore and self.__datastore['settings']['application'].get('history_snapshot_max_length'))
|
||||
)
|
||||
|
||||
if maxlen and self.__history_n and self.__history_n > maxlen:
|
||||
self.history_trim(newest_n_items=maxlen)
|
||||
|
||||
# @todo bump static cache of the last timestamp so we dont need to examine the file to set a proper ''viewed'' status
|
||||
return snapshot_fname
|
||||
|
||||
|
||||
@@ -32,6 +32,7 @@ class watch_base(dict):
|
||||
'filter_text_replaced': True,
|
||||
'follow_price_changes': True,
|
||||
'has_ldjson_price_data': None,
|
||||
'history_snapshot_max_length': None,
|
||||
'headers': {}, # Extra headers to send
|
||||
'ignore_text': [], # List of text to ignore when calculating the comparison checksum
|
||||
'ignore_status_codes': None,
|
||||
|
||||
@@ -23,7 +23,14 @@ class difference_detection_processor():
|
||||
def __init__(self, datastore, watch_uuid):
|
||||
self.datastore = datastore
|
||||
self.watch_uuid = watch_uuid
|
||||
|
||||
# Create a stable snapshot of the watch for processing
|
||||
# Why deepcopy?
|
||||
# 1. Prevents "dict changed during iteration" errors if watch is modified during processing
|
||||
# 2. Preserves Watch object with properties (.link, .is_pdf, etc.) - can't use dict()
|
||||
# 3. Safe now: Watch.__deepcopy__() shares datastore ref (no memory leak) but copies dict data
|
||||
self.watch = deepcopy(self.datastore.data['watching'].get(watch_uuid))
|
||||
|
||||
# Generic fetcher that should be extended (requests, playwright etc)
|
||||
self.fetcher = Fetcher()
|
||||
|
||||
|
||||
@@ -64,7 +64,7 @@ def prepare_filter_prevew(datastore, watch_uuid, form_data):
|
||||
# Only update vars that came in via the AJAX post
|
||||
p = {k: v for k, v in form.data.items() if k in form_data.keys()}
|
||||
tmp_watch.update(p)
|
||||
blank_watch_no_filters = watch_model()
|
||||
blank_watch_no_filters = watch_model(datastore_path=datastore.datastore_path, __datastore=datastore.data)
|
||||
blank_watch_no_filters['url'] = tmp_watch.get('url')
|
||||
|
||||
latest_filename = next(reversed(tmp_watch.history))
|
||||
|
||||
@@ -67,7 +67,7 @@ echo "-------------------- Running rest of tests in parallel -------------------
|
||||
# REMOVE_REQUESTS_OLD_SCREENSHOTS disabled so that we can write a screenshot and send it in test_notifications.py without a real browser
|
||||
FETCH_WORKERS=2 REMOVE_REQUESTS_OLD_SCREENSHOTS=false \
|
||||
pytest tests/test_*.py \
|
||||
-n 18 \
|
||||
-n 8 \
|
||||
--dist=load \
|
||||
-vvv \
|
||||
-s \
|
||||
|
||||
@@ -9,18 +9,14 @@ from flask import (
|
||||
)
|
||||
from flask_babel import gettext
|
||||
|
||||
from ..blueprint.rss import RSS_CONTENT_FORMAT_DEFAULT
|
||||
from ..html_tools import TRANSLATE_WHITESPACE_TABLE
|
||||
from ..model import App, Watch, USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH
|
||||
from copy import deepcopy, copy
|
||||
from ..model import App, Watch
|
||||
from copy import deepcopy
|
||||
from os import path, unlink
|
||||
from threading import Lock
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import secrets
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid as uuid_builder
|
||||
from loguru import logger
|
||||
@@ -35,7 +31,6 @@ except ImportError:
|
||||
HAS_ORJSON = False
|
||||
|
||||
from ..processors import get_custom_watch_obj_for_processor
|
||||
from ..processors.restock_diff import Restock
|
||||
|
||||
# Import the base class and helpers
|
||||
from .file_saving_datastore import FileSavingDataStore, load_all_watches, save_watch_atomic, save_json_atomic
|
||||
@@ -137,6 +132,19 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
)
|
||||
logger.info(f"Tag: {uuid} {tag['title']}")
|
||||
|
||||
def _rehydrate_watches(self):
|
||||
"""Rehydrate watch entities from stored data (converts dicts to Watch objects)."""
|
||||
watch_count = len(self.__data.get('watching', {}))
|
||||
if watch_count == 0:
|
||||
return
|
||||
|
||||
logger.info(f"Rehydrating {watch_count} watches...")
|
||||
watching_rehydrated = {}
|
||||
for uuid, watch_dict in self.__data.get('watching', {}).items():
|
||||
watching_rehydrated[uuid] = self.rehydrate_entity(uuid, watch_dict)
|
||||
self.__data['watching'] = watching_rehydrated
|
||||
logger.success(f"Rehydrated {watch_count} watches into Watch objects")
|
||||
|
||||
|
||||
def _load_state(self):
|
||||
"""
|
||||
@@ -174,7 +182,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
self.json_store_path = os.path.join(self.datastore_path, "changedetection.json")
|
||||
|
||||
# Base definition for all watchers (deepcopy part of #569)
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, default={}))
|
||||
self.generic_definition = deepcopy(Watch.model(datastore_path=datastore_path, __datastore=self.__data, default={}))
|
||||
|
||||
# Load build SHA if available (Docker deployments)
|
||||
if path.isfile('changedetectionio/source.txt'):
|
||||
@@ -218,23 +226,29 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
logger.critical(f"Legacy datastore detected at {self.datastore_path}/url-watches.json")
|
||||
logger.critical("Migration will be triggered via update_26")
|
||||
|
||||
# Load the legacy datastore to get its schema_version
|
||||
# Load the legacy datastore
|
||||
from .legacy_loader import load_legacy_format
|
||||
legacy_path = os.path.join(self.datastore_path, "url-watches.json")
|
||||
with open(legacy_path) as f:
|
||||
self.__data = json.load(f)
|
||||
legacy_data = load_legacy_format(legacy_path)
|
||||
|
||||
if not self.__data:
|
||||
if not legacy_data:
|
||||
raise Exception("Failed to load legacy datastore from url-watches.json")
|
||||
|
||||
# update_26 will load the legacy data again and migrate to new format
|
||||
# Only run updates AFTER the legacy schema version (e.g., if legacy is at 25, only run 26+)
|
||||
# Store the loaded data
|
||||
self.__data = legacy_data
|
||||
|
||||
# CRITICAL: Rehydrate watches from dicts into Watch objects
|
||||
# This ensures watches have their methods available during migration
|
||||
self._rehydrate_watches()
|
||||
|
||||
# update_26 will save watches to individual files and create changedetection.json
|
||||
# Next startup will load from new format normally
|
||||
self.run_updates()
|
||||
|
||||
|
||||
else:
|
||||
# Fresh install - create new datastore
|
||||
logger.critical(f"No datastore found, creating new datastore at {self.datastore_path}")
|
||||
logger.warning(f"No datastore found, creating new datastore at {self.datastore_path}")
|
||||
|
||||
# Set schema version to latest (no updates needed)
|
||||
updates_available = self.get_updates_available()
|
||||
@@ -308,7 +322,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
if entity.get('processor') != 'text_json_diff':
|
||||
logger.trace(f"Loading Watch object '{watch_class.__module__}.{watch_class.__name__}' for UUID {uuid}")
|
||||
|
||||
entity = watch_class(datastore_path=self.datastore_path, default=entity)
|
||||
entity = watch_class(datastore_path=self.datastore_path, __datastore=self.__data, default=entity)
|
||||
return entity
|
||||
|
||||
# ============================================================================
|
||||
@@ -527,7 +541,11 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
# Clone a watch by UUID
|
||||
def clone(self, uuid):
|
||||
url = self.data['watching'][uuid].get('url')
|
||||
extras = deepcopy(self.data['watching'][uuid])
|
||||
# No need to deepcopy here - add_watch() will deepcopy extras anyway (line 569)
|
||||
# Just pass a dict copy (with lock for thread safety)
|
||||
# NOTE: dict() is shallow copy but safe since add_watch() deepcopies it
|
||||
with self.lock:
|
||||
extras = dict(self.data['watching'][uuid])
|
||||
new_uuid = self.add_watch(url=url, extras=extras)
|
||||
watch = self.data['watching'][new_uuid]
|
||||
return new_uuid
|
||||
@@ -639,7 +657,7 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
|
||||
|
||||
# If the processor also has its own Watch implementation
|
||||
watch_class = get_custom_watch_obj_for_processor(apply_extras.get('processor'))
|
||||
new_watch = watch_class(datastore_path=self.datastore_path, url=url)
|
||||
new_watch = watch_class(datastore_path=self.datastore_path, __datastore=self.__data, url=url)
|
||||
|
||||
new_uuid = new_watch.get('uuid')
|
||||
|
||||
|
||||
@@ -182,3 +182,86 @@ def test_check_text_history_view(client, live_server, measure_memory_usage, data
|
||||
assert b'test-one' not in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_history_trim_global_only(client, live_server, measure_memory_usage, datastore_path):
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = None
|
||||
limit = 3
|
||||
|
||||
for i in range(0, 10):
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(f"<html>test {i}</html>")
|
||||
if not uuid:
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
if i ==8:
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
history_n = len(list(watch.history.keys()))
|
||||
logger.debug(f"History length should be at limit {limit} and it is {history_n}")
|
||||
assert history_n == limit
|
||||
|
||||
if i == 6:
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-history_snapshot_max_length": limit},
|
||||
follow_redirects=True
|
||||
)
|
||||
# It will need to detect one more change to start trimming it, which is really at 'start of 7'
|
||||
assert b'Settings updated' in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
def test_history_trim_global_override_in_watch(client, live_server, measure_memory_usage, datastore_path):
|
||||
# Add our URL to the import page
|
||||
test_url = url_for('test_endpoint', _external=True)
|
||||
uuid = None
|
||||
limit = 3
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-history_snapshot_max_length": 10000},
|
||||
follow_redirects=True
|
||||
)
|
||||
# It will need to detect one more change to start trimming it, which is really at 'start of 7'
|
||||
assert b'Settings updated' in res.data
|
||||
|
||||
|
||||
for i in range(0, 10):
|
||||
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:
|
||||
f.write(f"<html>test {i}</html>")
|
||||
if not uuid:
|
||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||
res = client.post(
|
||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||
data={"include_filters": "", "url": test_url, "tags": "", "headers": "", 'fetch_backend': "html_requests",
|
||||
"time_between_check_use_default": "y", "history_snapshot_max_length": str(limit)},
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b"Updated watch." in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||
wait_for_all_checks(client)
|
||||
|
||||
if i == 8:
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
history_n = len(list(watch.history.keys()))
|
||||
logger.debug(f"History length should be at limit {limit} and it is {history_n}")
|
||||
assert history_n == limit
|
||||
|
||||
if i == 6:
|
||||
res = client.post(
|
||||
url_for("settings.settings_page"),
|
||||
data={"application-history_snapshot_max_length": limit},
|
||||
follow_redirects=True
|
||||
)
|
||||
# It will need to detect one more change to start trimming it, which is really at 'start of 7'
|
||||
assert b'Settings updated' in res.data
|
||||
|
||||
delete_all_watches(client)
|
||||
|
||||
|
||||
@@ -13,7 +13,14 @@ class TestDiffBuilder(unittest.TestCase):
|
||||
|
||||
def test_watch_get_suggested_from_diff_timestamp(self):
|
||||
import uuid as uuid_builder
|
||||
watch = Watch.model(datastore_path='/tmp', default={})
|
||||
# Create minimal mock datastore for tests
|
||||
mock_datastore = {
|
||||
'settings': {
|
||||
'application': {}
|
||||
},
|
||||
'watching': {}
|
||||
}
|
||||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||||
watch.ensure_data_dir_exists()
|
||||
|
||||
|
||||
@@ -49,7 +56,7 @@ class TestDiffBuilder(unittest.TestCase):
|
||||
assert p == "109", "Correct when its the same time"
|
||||
|
||||
# new empty one
|
||||
watch = Watch.model(datastore_path='/tmp', default={})
|
||||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||||
p = watch.get_from_version_based_on_last_viewed
|
||||
assert p == None, "None when no history available"
|
||||
|
||||
|
||||
@@ -370,8 +370,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
except Exception as e:
|
||||
import traceback
|
||||
logger.error(f"Worker {worker_id} exception processing watch UUID: {uuid}")
|
||||
logger.error(str(e))
|
||||
logger.error(traceback.format_exc())
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': "Exception: " + str(e)})
|
||||
process_changedetection_results = False
|
||||
|
||||
@@ -435,8 +434,9 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
await send_content_changed_notification(uuid, notification_q, datastore)
|
||||
|
||||
except Exception as e:
|
||||
|
||||
logger.critical(f"Worker {worker_id} exception in process_changedetection_results")
|
||||
logger.critical(str(e))
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
||||
|
||||
# Always record attempt count
|
||||
@@ -451,6 +451,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
logger.debug(f"UUID: {uuid} Page <title> is '{page_title}'")
|
||||
datastore.update_watch(uuid=uuid, update_obj={'page_title': page_title})
|
||||
except Exception as e:
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
logger.warning(f"UUID: {uuid} Exception when extracting <title> - {str(e)}")
|
||||
|
||||
# Record server header
|
||||
@@ -485,12 +486,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
gc.collect()
|
||||
|
||||
except Exception as e:
|
||||
import traceback
|
||||
logger.error(traceback.format_exc())
|
||||
|
||||
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
|
||||
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
|
||||
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
|
||||
# Also update the watch with error information
|
||||
if datastore and uuid in datastore.data['watching']:
|
||||
datastore.update_watch(uuid=uuid, update_obj={'last_error': f"Worker error: {str(e)}"})
|
||||
@@ -504,6 +503,8 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
await update_handler.fetcher.quit(watch=watch)
|
||||
except Exception as e:
|
||||
logger.error(f"Exception while cleaning/quit after calling browser: {e}")
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
|
||||
try:
|
||||
# Release UUID from processing (thread-safe)
|
||||
worker_pool.release_uuid_from_processing(uuid, worker_id=worker_id)
|
||||
@@ -532,6 +533,7 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
||||
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
|
||||
except Exception as cleanup_error:
|
||||
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")
|
||||
logger.exception(f"Worker {worker_id} full exception details:")
|
||||
|
||||
del(uuid)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user