mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-08 18:47:32 +00:00
Compare commits
1 Commits
3376-clean
...
individual
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
6a5e6c7c8b |
@@ -71,6 +71,7 @@ jobs:
|
|||||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model'
|
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model'
|
||||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security'
|
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security'
|
||||||
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_semver'
|
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_semver'
|
||||||
|
docker run test-changedetectionio bash -c 'python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge'
|
||||||
|
|
||||||
- name: Test built container with Pytest (generally as requests/plaintext fetching)
|
- name: Test built container with Pytest (generally as requests/plaintext fetching)
|
||||||
run: |
|
run: |
|
||||||
|
|||||||
@@ -303,7 +303,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle
|
|||||||
watch['ignore_text'] += datastore.data['settings']['application']['global_ignore_text']
|
watch['ignore_text'] += datastore.data['settings']['application']['global_ignore_text']
|
||||||
watch['subtractive_selectors'] += datastore.data['settings']['application']['global_subtractive_selectors']
|
watch['subtractive_selectors'] += datastore.data['settings']['application']['global_subtractive_selectors']
|
||||||
|
|
||||||
watch_json = json.dumps(watch)
|
watch_json = json.dumps(dict(watch))
|
||||||
|
|
||||||
try:
|
try:
|
||||||
r = requests.request(method="POST",
|
r = requests.request(method="POST",
|
||||||
|
|||||||
@@ -44,13 +44,13 @@ class model(watch_base):
|
|||||||
if kw.get('datastore_path'):
|
if kw.get('datastore_path'):
|
||||||
del kw['datastore_path']
|
del kw['datastore_path']
|
||||||
|
|
||||||
super(model, self).__init__(*arg, **kw)
|
# Save default before passing to parent, since parent will delete it
|
||||||
if kw.get('default'):
|
default_values = kw.get('default')
|
||||||
self.update(kw['default'])
|
|
||||||
del kw['default']
|
|
||||||
|
|
||||||
if self.get('default'):
|
super(model, self).__init__(*arg, **kw)
|
||||||
del self['default']
|
|
||||||
|
if default_values:
|
||||||
|
self.update(default_values)
|
||||||
|
|
||||||
# Be sure the cached timestamp is ready
|
# Be sure the cached timestamp is ready
|
||||||
bump = self.history
|
bump = self.history
|
||||||
@@ -227,8 +227,8 @@ class model(watch_base):
|
|||||||
|
|
||||||
@property
|
@property
|
||||||
def has_history(self):
|
def has_history(self):
|
||||||
fname = os.path.join(self.watch_data_dir, "history.txt")
|
fname = self._get_data_file_path("history.txt")
|
||||||
return os.path.isfile(fname)
|
return fname and os.path.isfile(fname)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def has_browser_steps(self):
|
def has_browser_steps(self):
|
||||||
@@ -405,16 +405,16 @@ class model(watch_base):
|
|||||||
return not local_lines.issubset(existing_history)
|
return not local_lines.issubset(existing_history)
|
||||||
|
|
||||||
def get_screenshot(self):
|
def get_screenshot(self):
|
||||||
fname = os.path.join(self.watch_data_dir, "last-screenshot.png")
|
fname = self._get_data_file_path("last-screenshot.png")
|
||||||
if os.path.isfile(fname):
|
if fname and os.path.isfile(fname):
|
||||||
return fname
|
return fname
|
||||||
|
|
||||||
# False is not an option for AppRise, must be type None
|
# False is not an option for AppRise, must be type None
|
||||||
return None
|
return None
|
||||||
|
|
||||||
def __get_file_ctime(self, filename):
|
def __get_file_ctime(self, filename):
|
||||||
fname = os.path.join(self.watch_data_dir, filename)
|
fname = self._get_data_file_path(filename)
|
||||||
if os.path.isfile(fname):
|
if fname and os.path.isfile(fname):
|
||||||
return int(os.path.getmtime(fname))
|
return int(os.path.getmtime(fname))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
@@ -441,20 +441,28 @@ class model(watch_base):
|
|||||||
@property
|
@property
|
||||||
def watch_data_dir(self):
|
def watch_data_dir(self):
|
||||||
# The base dir of the watch data
|
# The base dir of the watch data
|
||||||
return os.path.join(self.__datastore_path, self['uuid']) if self.__datastore_path else None
|
if self.__datastore_path and self.get('uuid'):
|
||||||
|
return os.path.join(self.__datastore_path, self['uuid'])
|
||||||
|
return None
|
||||||
|
|
||||||
|
def _get_data_file_path(self, filename):
|
||||||
|
"""Safely get the full path to a data file, returns None if watch_data_dir is None"""
|
||||||
|
if self.watch_data_dir:
|
||||||
|
return os.path.join(self.watch_data_dir, filename)
|
||||||
|
return None
|
||||||
|
|
||||||
def get_error_text(self):
|
def get_error_text(self):
|
||||||
"""Return the text saved from a previous request that resulted in a non-200 error"""
|
"""Return the text saved from a previous request that resulted in a non-200 error"""
|
||||||
fname = os.path.join(self.watch_data_dir, "last-error.txt")
|
fname = self._get_data_file_path("last-error.txt")
|
||||||
if os.path.isfile(fname):
|
if fname and os.path.isfile(fname):
|
||||||
with open(fname, 'r') as f:
|
with open(fname, 'r') as f:
|
||||||
return f.read()
|
return f.read()
|
||||||
return False
|
return False
|
||||||
|
|
||||||
def get_error_snapshot(self):
|
def get_error_snapshot(self):
|
||||||
"""Return path to the screenshot that resulted in a non-200 error"""
|
"""Return path to the screenshot that resulted in a non-200 error"""
|
||||||
fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png")
|
fname = self._get_data_file_path("last-error-screenshot.png")
|
||||||
if os.path.isfile(fname):
|
if fname and os.path.isfile(fname):
|
||||||
return fname
|
return fname
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|||||||
@@ -1,13 +1,14 @@
|
|||||||
import os
|
import os
|
||||||
import uuid
|
import uuid
|
||||||
|
import json
|
||||||
|
|
||||||
from changedetectionio import strtobool
|
from changedetectionio import strtobool
|
||||||
default_notification_format_for_watch = 'System default'
|
default_notification_format_for_watch = 'System default'
|
||||||
|
|
||||||
class watch_base(dict):
|
class watch_base:
|
||||||
|
|
||||||
def __init__(self, *arg, **kw):
|
def __init__(self, *arg, **kw):
|
||||||
self.update({
|
self.__data = {
|
||||||
# Custom notification content
|
# Custom notification content
|
||||||
# Re #110, so then if this is set to None, we know to use the default value instead
|
# Re #110, so then if this is set to None, we know to use the default value instead
|
||||||
# Requires setting to None on submit if it's the same as the default
|
# Requires setting to None on submit if it's the same as the default
|
||||||
@@ -128,9 +129,78 @@ class watch_base(dict):
|
|||||||
'uuid': str(uuid.uuid4()),
|
'uuid': str(uuid.uuid4()),
|
||||||
'webdriver_delay': None,
|
'webdriver_delay': None,
|
||||||
'webdriver_js_execute_code': None, # Run before change-detection
|
'webdriver_js_execute_code': None, # Run before change-detection
|
||||||
})
|
}
|
||||||
|
|
||||||
super(watch_base, self).__init__(*arg, **kw)
|
if len(arg) == 1 and (isinstance(arg[0], dict) or hasattr(arg[0], 'keys')):
|
||||||
|
self.__data.update(arg[0])
|
||||||
|
if kw:
|
||||||
|
self.__data.update(kw)
|
||||||
|
|
||||||
if self.get('default'):
|
if self.__data.get('default'):
|
||||||
del self['default']
|
del self.__data['default']
|
||||||
|
|
||||||
|
def __getitem__(self, key):
|
||||||
|
return self.__data[key]
|
||||||
|
|
||||||
|
def __setitem__(self, key, value):
|
||||||
|
self.__data[key] = value
|
||||||
|
|
||||||
|
def __delitem__(self, key):
|
||||||
|
del self.__data[key]
|
||||||
|
|
||||||
|
def __iter__(self):
|
||||||
|
return iter(self.__data)
|
||||||
|
|
||||||
|
def __len__(self):
|
||||||
|
return len(self.__data)
|
||||||
|
|
||||||
|
def __contains__(self, key):
|
||||||
|
return key in self.__data
|
||||||
|
|
||||||
|
def __repr__(self):
|
||||||
|
return repr(self.__data)
|
||||||
|
|
||||||
|
def __str__(self):
|
||||||
|
return str(self.__data)
|
||||||
|
|
||||||
|
def keys(self):
|
||||||
|
return self.__data.keys()
|
||||||
|
|
||||||
|
def values(self):
|
||||||
|
return self.__data.values()
|
||||||
|
|
||||||
|
def items(self):
|
||||||
|
return self.__data.items()
|
||||||
|
|
||||||
|
def get(self, key, default=None):
|
||||||
|
return self.__data.get(key, default)
|
||||||
|
|
||||||
|
def pop(self, key, *args):
|
||||||
|
return self.__data.pop(key, *args)
|
||||||
|
|
||||||
|
def popitem(self):
|
||||||
|
return self.__data.popitem()
|
||||||
|
|
||||||
|
def clear(self):
|
||||||
|
self.__data.clear()
|
||||||
|
|
||||||
|
def update(self, *args, **kwargs):
|
||||||
|
self.__data.update(*args, **kwargs)
|
||||||
|
|
||||||
|
def setdefault(self, key, default=None):
|
||||||
|
return self.__data.setdefault(key, default)
|
||||||
|
|
||||||
|
def copy(self):
|
||||||
|
return self.__data.copy()
|
||||||
|
|
||||||
|
def __deepcopy__(self, memo):
|
||||||
|
from copy import deepcopy
|
||||||
|
new_instance = self.__class__()
|
||||||
|
new_instance.__data = deepcopy(self.__data, memo)
|
||||||
|
return new_instance
|
||||||
|
|
||||||
|
def __reduce__(self):
|
||||||
|
return (self.__class__, (self.__data,))
|
||||||
|
|
||||||
|
def to_dict(self):
|
||||||
|
return dict(self.__data)
|
||||||
@@ -42,10 +42,10 @@ class Restock(dict):
|
|||||||
|
|
||||||
# Update with any provided positional arguments (dictionaries)
|
# Update with any provided positional arguments (dictionaries)
|
||||||
if args:
|
if args:
|
||||||
if len(args) == 1 and isinstance(args[0], dict):
|
if len(args) == 1 and (isinstance(args[0], dict) or hasattr(args[0], 'keys')):
|
||||||
self.update(args[0])
|
self.update(args[0])
|
||||||
else:
|
else:
|
||||||
raise ValueError("Only one positional argument of type 'dict' is allowed")
|
raise ValueError("Only one positional argument of type 'dict' or dict-like is allowed")
|
||||||
|
|
||||||
def __setitem__(self, key, value):
|
def __setitem__(self, key, value):
|
||||||
# Custom logic to handle setting price and original_price
|
# Custom logic to handle setting price and original_price
|
||||||
|
|||||||
@@ -23,6 +23,13 @@ from blinker import signal
|
|||||||
from .processors import get_custom_watch_obj_for_processor
|
from .processors import get_custom_watch_obj_for_processor
|
||||||
from .processors.restock_diff import Restock
|
from .processors.restock_diff import Restock
|
||||||
|
|
||||||
|
class WatchEncoder(json.JSONEncoder):
|
||||||
|
def default(self, obj):
|
||||||
|
from .model import watch_base
|
||||||
|
if isinstance(obj, watch_base):
|
||||||
|
return dict(obj)
|
||||||
|
return super().default(obj)
|
||||||
|
|
||||||
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
|
# Because the server will run as a daemon and wont know the URL for notification links when firing off a notification
|
||||||
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
|
BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)'
|
||||||
|
|
||||||
@@ -51,9 +58,6 @@ class ChangeDetectionStore:
|
|||||||
self.needs_write = False
|
self.needs_write = False
|
||||||
self.start_time = time.time()
|
self.start_time = time.time()
|
||||||
self.stop_thread = False
|
self.stop_thread = False
|
||||||
# Base definition for all watchers
|
|
||||||
# deepcopy part of #569 - not sure why its needed exactly
|
|
||||||
self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={}))
|
|
||||||
|
|
||||||
if path.isfile('changedetectionio/source.txt'):
|
if path.isfile('changedetectionio/source.txt'):
|
||||||
with open('changedetectionio/source.txt') as f:
|
with open('changedetectionio/source.txt') as f:
|
||||||
@@ -174,6 +178,14 @@ class ChangeDetectionStore:
|
|||||||
self.__data['settings']['application']['password'] = False
|
self.__data['settings']['application']['password'] = False
|
||||||
self.needs_write = True
|
self.needs_write = True
|
||||||
|
|
||||||
|
def _deep_merge(self, target, source):
|
||||||
|
"""Recursively merge source dict into target dict"""
|
||||||
|
for key, value in source.items():
|
||||||
|
if key in target and isinstance(target[key], dict) and isinstance(value, dict):
|
||||||
|
self._deep_merge(target[key], value)
|
||||||
|
else:
|
||||||
|
target[key] = value
|
||||||
|
|
||||||
def update_watch(self, uuid, update_obj):
|
def update_watch(self, uuid, update_obj):
|
||||||
|
|
||||||
# It's possible that the watch could be deleted before update
|
# It's possible that the watch could be deleted before update
|
||||||
@@ -181,15 +193,8 @@ class ChangeDetectionStore:
|
|||||||
return
|
return
|
||||||
|
|
||||||
with self.lock:
|
with self.lock:
|
||||||
|
# Use recursive merge to handle nested dictionaries properly
|
||||||
# In python 3.9 we have the |= dict operator, but that still will lose data on nested structures...
|
self._deep_merge(self.__data['watching'][uuid], update_obj)
|
||||||
for dict_key, d in self.generic_definition.items():
|
|
||||||
if isinstance(d, dict):
|
|
||||||
if update_obj is not None and dict_key in update_obj:
|
|
||||||
self.__data['watching'][uuid][dict_key].update(update_obj[dict_key])
|
|
||||||
del (update_obj[dict_key])
|
|
||||||
|
|
||||||
self.__data['watching'][uuid].update(update_obj)
|
|
||||||
self.needs_write = True
|
self.needs_write = True
|
||||||
|
|
||||||
@property
|
@property
|
||||||
@@ -393,6 +398,51 @@ class ChangeDetectionStore:
|
|||||||
|
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
import json
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path # just for nicer paths
|
||||||
|
|
||||||
|
JSON_INDENT = 2 # or None in production
|
||||||
|
ENCODER = WatchEncoder # your custom encoder
|
||||||
|
|
||||||
|
def save_json_atomic(self, save_path: str | os.PathLike, data) -> None:
|
||||||
|
"""
|
||||||
|
Atomically (re)write *path* with *data* encoded as JSON.
|
||||||
|
The original file is left untouched if anything fails.
|
||||||
|
"""
|
||||||
|
import tempfile
|
||||||
|
from pathlib import Path # just for nicer paths
|
||||||
|
|
||||||
|
JSON_INDENT = 2 # or None in production
|
||||||
|
ENCODER = WatchEncoder # your custom encoder
|
||||||
|
|
||||||
|
datapath = Path(save_path)
|
||||||
|
directory = datapath.parent
|
||||||
|
|
||||||
|
# 1. create a unique temp file in the same directory
|
||||||
|
fd, tmp_name = tempfile.mkstemp(
|
||||||
|
dir=directory,
|
||||||
|
prefix=f"{datapath.name}.",
|
||||||
|
suffix=".tmp",
|
||||||
|
)
|
||||||
|
try:
|
||||||
|
with os.fdopen(fd, "w", encoding="utf-8") as tmp:
|
||||||
|
json.dump(data, tmp, indent=JSON_INDENT, cls=ENCODER)
|
||||||
|
if os.getenv('JSON_SAVE_FORCE_FLUSH'):
|
||||||
|
tmp.flush() # push Python buffers
|
||||||
|
os.fsync(tmp.fileno()) # force kernel to write to disk
|
||||||
|
os.replace(tmp_name, datapath)
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.critical(f"Failed to write JSON to {datapath} - {str(e)}")
|
||||||
|
# if anything above blew up, ensure we don't leave junk lying around
|
||||||
|
try:
|
||||||
|
os.unlink(tmp_name)
|
||||||
|
finally:
|
||||||
|
raise
|
||||||
|
|
||||||
|
|
||||||
def sync_to_json(self):
|
def sync_to_json(self):
|
||||||
logger.info("Saving JSON..")
|
logger.info("Saving JSON..")
|
||||||
try:
|
try:
|
||||||
@@ -404,18 +454,7 @@ class ChangeDetectionStore:
|
|||||||
self.sync_to_json()
|
self.sync_to_json()
|
||||||
return
|
return
|
||||||
else:
|
else:
|
||||||
|
self.save_json_atomic(save_path = self.json_store_path, data =data)
|
||||||
try:
|
|
||||||
# Re #286 - First write to a temp file, then confirm it looks OK and rename it
|
|
||||||
# This is a fairly basic strategy to deal with the case that the file is corrupted,
|
|
||||||
# system was out of memory, out of RAM etc
|
|
||||||
with open(self.json_store_path+".tmp", 'w') as json_file:
|
|
||||||
# Use compact JSON in production for better performance
|
|
||||||
json.dump(data, json_file, indent=2)
|
|
||||||
os.replace(self.json_store_path+".tmp", self.json_store_path)
|
|
||||||
except Exception as e:
|
|
||||||
logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}")
|
|
||||||
|
|
||||||
self.needs_write = False
|
self.needs_write = False
|
||||||
self.needs_write_urgent = False
|
self.needs_write_urgent = False
|
||||||
|
|
||||||
|
|||||||
291
changedetectionio/tests/unit/test_update_watch_deep_merge.py
Normal file
291
changedetectionio/tests/unit/test_update_watch_deep_merge.py
Normal file
@@ -0,0 +1,291 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
# run from dir above changedetectionio/ dir
|
||||||
|
# python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge
|
||||||
|
|
||||||
|
import unittest
|
||||||
|
import os
|
||||||
|
import tempfile
|
||||||
|
import shutil
|
||||||
|
from unittest.mock import patch
|
||||||
|
|
||||||
|
from changedetectionio import store
|
||||||
|
|
||||||
|
|
||||||
|
class TestUpdateWatchDeepMerge(unittest.TestCase):
|
||||||
|
|
||||||
|
def setUp(self):
|
||||||
|
# Create a temporary directory for test data
|
||||||
|
self.test_datastore_path = tempfile.mkdtemp()
|
||||||
|
self.datastore = store.ChangeDetectionStore(datastore_path=self.test_datastore_path, include_default_watches=False)
|
||||||
|
|
||||||
|
# Create a test watch with known nested structure
|
||||||
|
self.test_uuid = self.datastore.add_watch(url='http://example.com')
|
||||||
|
|
||||||
|
# Set up known initial nested structure
|
||||||
|
initial_data = {
|
||||||
|
'time_between_check': {'weeks': None, 'days': 1, 'hours': 6, 'minutes': 30, 'seconds': None},
|
||||||
|
'headers': {'user-agent': 'test-browser', 'accept': 'text/html'},
|
||||||
|
'time_schedule_limit': {
|
||||||
|
'enabled': True,
|
||||||
|
'monday': {
|
||||||
|
'enabled': True,
|
||||||
|
'start_time': '09:00',
|
||||||
|
'duration': {'hours': '8', 'minutes': '00'}
|
||||||
|
},
|
||||||
|
'tuesday': {
|
||||||
|
'enabled': False,
|
||||||
|
'start_time': '10:00',
|
||||||
|
'duration': {'hours': '6', 'minutes': '30'}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, initial_data)
|
||||||
|
|
||||||
|
def tearDown(self):
|
||||||
|
self.datastore.stop_thread = True
|
||||||
|
# Clean up the temporary directory
|
||||||
|
shutil.rmtree(self.test_datastore_path, ignore_errors=True)
|
||||||
|
|
||||||
|
def test_simple_flat_update(self):
|
||||||
|
"""Test that simple flat updates work as before"""
|
||||||
|
update_obj = {'url': 'http://newexample.com', 'paused': True}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
self.assertEqual(watch['url'], 'http://newexample.com')
|
||||||
|
self.assertEqual(watch['paused'], True)
|
||||||
|
|
||||||
|
def test_time_between_check_partial_update(self):
|
||||||
|
"""Test partial update of time_between_check preserves existing keys"""
|
||||||
|
# Update only hours, should preserve other existing values
|
||||||
|
update_obj = {'time_between_check': {'hours': 2}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
time_check = watch['time_between_check']
|
||||||
|
|
||||||
|
# Updated value
|
||||||
|
self.assertEqual(time_check['hours'], 2)
|
||||||
|
# Preserved existing values
|
||||||
|
self.assertEqual(time_check['days'], 1)
|
||||||
|
self.assertEqual(time_check['minutes'], 30)
|
||||||
|
self.assertEqual(time_check['weeks'], None)
|
||||||
|
self.assertEqual(time_check['seconds'], None)
|
||||||
|
|
||||||
|
def test_time_between_check_multiple_partial_updates(self):
|
||||||
|
"""Test multiple partial updates to time_between_check"""
|
||||||
|
# First update
|
||||||
|
update_obj1 = {'time_between_check': {'minutes': 45}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj1)
|
||||||
|
|
||||||
|
# Second update
|
||||||
|
update_obj2 = {'time_between_check': {'seconds': 15}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj2)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
time_check = watch['time_between_check']
|
||||||
|
|
||||||
|
# Both updates should be preserved
|
||||||
|
self.assertEqual(time_check['minutes'], 45)
|
||||||
|
self.assertEqual(time_check['seconds'], 15)
|
||||||
|
# Original values should be preserved
|
||||||
|
self.assertEqual(time_check['days'], 1)
|
||||||
|
self.assertEqual(time_check['hours'], 6)
|
||||||
|
|
||||||
|
def test_headers_partial_update(self):
|
||||||
|
"""Test partial update of headers preserves existing headers"""
|
||||||
|
update_obj = {'headers': {'authorization': 'Bearer token123'}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
headers = watch['headers']
|
||||||
|
|
||||||
|
# New header added
|
||||||
|
self.assertEqual(headers['authorization'], 'Bearer token123')
|
||||||
|
# Existing headers preserved
|
||||||
|
self.assertEqual(headers['user-agent'], 'test-browser')
|
||||||
|
self.assertEqual(headers['accept'], 'text/html')
|
||||||
|
|
||||||
|
def test_headers_update_existing_key(self):
|
||||||
|
"""Test updating an existing header key"""
|
||||||
|
update_obj = {'headers': {'user-agent': 'new-browser'}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
headers = watch['headers']
|
||||||
|
|
||||||
|
# Updated existing header
|
||||||
|
self.assertEqual(headers['user-agent'], 'new-browser')
|
||||||
|
# Other headers preserved
|
||||||
|
self.assertEqual(headers['accept'], 'text/html')
|
||||||
|
|
||||||
|
def test_time_schedule_limit_deep_nested_update(self):
|
||||||
|
"""Test deep nested update of time_schedule_limit structure"""
|
||||||
|
update_obj = {
|
||||||
|
'time_schedule_limit': {
|
||||||
|
'monday': {
|
||||||
|
'duration': {'hours': '10'} # Only update hours, preserve minutes
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
schedule = watch['time_schedule_limit']
|
||||||
|
|
||||||
|
# Deep nested update applied
|
||||||
|
self.assertEqual(schedule['monday']['duration']['hours'], '10')
|
||||||
|
# Existing nested values preserved
|
||||||
|
self.assertEqual(schedule['monday']['duration']['minutes'], '00')
|
||||||
|
self.assertEqual(schedule['monday']['start_time'], '09:00')
|
||||||
|
self.assertEqual(schedule['monday']['enabled'], True)
|
||||||
|
# Other days preserved
|
||||||
|
self.assertEqual(schedule['tuesday']['enabled'], False)
|
||||||
|
self.assertEqual(schedule['enabled'], True)
|
||||||
|
|
||||||
|
def test_mixed_flat_and_nested_update(self):
|
||||||
|
"""Test update with both flat and nested properties"""
|
||||||
|
update_obj = {
|
||||||
|
'url': 'http://mixed-update.com',
|
||||||
|
'paused': False,
|
||||||
|
'time_between_check': {'days': 2, 'minutes': 15},
|
||||||
|
'headers': {'cookie': 'session=abc123'}
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
|
||||||
|
# Flat updates
|
||||||
|
self.assertEqual(watch['url'], 'http://mixed-update.com')
|
||||||
|
self.assertEqual(watch['paused'], False)
|
||||||
|
|
||||||
|
# Nested updates
|
||||||
|
time_check = watch['time_between_check']
|
||||||
|
self.assertEqual(time_check['days'], 2)
|
||||||
|
self.assertEqual(time_check['minutes'], 15)
|
||||||
|
self.assertEqual(time_check['hours'], 6) # preserved
|
||||||
|
|
||||||
|
headers = watch['headers']
|
||||||
|
self.assertEqual(headers['cookie'], 'session=abc123')
|
||||||
|
self.assertEqual(headers['user-agent'], 'test-browser') # preserved
|
||||||
|
|
||||||
|
def test_overwrite_nested_with_flat(self):
|
||||||
|
"""Test that providing a non-dict value overwrites the entire nested structure"""
|
||||||
|
update_obj = {'time_between_check': 'invalid_value'}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
# Should completely replace the nested dict with the string
|
||||||
|
self.assertEqual(watch['time_between_check'], 'invalid_value')
|
||||||
|
|
||||||
|
def test_add_new_nested_structure(self):
|
||||||
|
"""Test adding a completely new nested dictionary"""
|
||||||
|
update_obj = {
|
||||||
|
'custom_config': {
|
||||||
|
'option1': 'value1',
|
||||||
|
'nested': {
|
||||||
|
'suboption': 'subvalue'
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
self.assertEqual(watch['custom_config']['option1'], 'value1')
|
||||||
|
self.assertEqual(watch['custom_config']['nested']['suboption'], 'subvalue')
|
||||||
|
|
||||||
|
def test_empty_dict_update(self):
|
||||||
|
"""Test updating with empty dictionaries"""
|
||||||
|
update_obj = {'headers': {}}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
# Empty dict should preserve existing headers (no keys to merge)
|
||||||
|
self.assertEqual(watch['headers']['user-agent'], 'test-browser')
|
||||||
|
self.assertEqual(watch['headers']['accept'], 'text/html')
|
||||||
|
|
||||||
|
def test_none_values_in_nested_update(self):
|
||||||
|
"""Test handling None values in nested updates"""
|
||||||
|
update_obj = {
|
||||||
|
'time_between_check': {
|
||||||
|
'hours': None,
|
||||||
|
'days': 3
|
||||||
|
}
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
time_check = watch['time_between_check']
|
||||||
|
|
||||||
|
self.assertEqual(time_check['hours'], None)
|
||||||
|
self.assertEqual(time_check['days'], 3)
|
||||||
|
self.assertEqual(time_check['minutes'], 30) # preserved
|
||||||
|
|
||||||
|
def test_real_world_api_update_scenario(self):
|
||||||
|
"""Test a real-world API update scenario from the codebase analysis"""
|
||||||
|
# Based on actual API call patterns found in the codebase
|
||||||
|
update_obj = {
|
||||||
|
"title": "Updated API Watch",
|
||||||
|
'time_between_check': {'minutes': 60},
|
||||||
|
'headers': {'authorization': 'Bearer api-token', 'user-agent': 'api-client'},
|
||||||
|
'notification_urls': ['https://webhook.example.com']
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
|
||||||
|
# Verify all updates
|
||||||
|
self.assertEqual(watch['title'], 'Updated API Watch')
|
||||||
|
self.assertEqual(watch['time_between_check']['minutes'], 60)
|
||||||
|
self.assertEqual(watch['time_between_check']['days'], 1) # preserved
|
||||||
|
self.assertEqual(watch['headers']['authorization'], 'Bearer api-token')
|
||||||
|
self.assertEqual(watch['headers']['user-agent'], 'api-client') # overwrote existing
|
||||||
|
self.assertEqual(watch['headers']['accept'], 'text/html') # preserved
|
||||||
|
self.assertEqual(watch['notification_urls'], ['https://webhook.example.com'])
|
||||||
|
|
||||||
|
def test_watch_not_found(self):
|
||||||
|
"""Test update_watch with non-existent UUID"""
|
||||||
|
# Should not raise an error, just return silently
|
||||||
|
fake_uuid = 'non-existent-uuid'
|
||||||
|
update_obj = {'url': 'http://should-not-update.com'}
|
||||||
|
|
||||||
|
# Should not raise an exception
|
||||||
|
self.datastore.update_watch(fake_uuid, update_obj)
|
||||||
|
|
||||||
|
# Verify no changes were made to existing watch
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
self.assertNotEqual(watch['url'], 'http://should-not-update.com')
|
||||||
|
|
||||||
|
def test_processor_style_update(self):
|
||||||
|
"""Test the type of updates made by processors during check operations"""
|
||||||
|
# Based on async_update_worker.py patterns
|
||||||
|
update_obj = {
|
||||||
|
'last_notification_error': False,
|
||||||
|
'last_error': False,
|
||||||
|
'previous_md5': 'abc123def456',
|
||||||
|
'content-type': 'application/json',
|
||||||
|
'consecutive_filter_failures': 0,
|
||||||
|
'fetch_time': 1.234,
|
||||||
|
'check_count': 42
|
||||||
|
}
|
||||||
|
self.datastore.update_watch(self.test_uuid, update_obj)
|
||||||
|
|
||||||
|
watch = self.datastore.data['watching'][self.test_uuid]
|
||||||
|
|
||||||
|
# Verify processor updates
|
||||||
|
self.assertEqual(watch['last_notification_error'], False)
|
||||||
|
self.assertEqual(watch['last_error'], False)
|
||||||
|
self.assertEqual(watch['previous_md5'], 'abc123def456')
|
||||||
|
self.assertEqual(watch['content-type'], 'application/json')
|
||||||
|
self.assertEqual(watch['consecutive_filter_failures'], 0)
|
||||||
|
self.assertEqual(watch['fetch_time'], 1.234)
|
||||||
|
self.assertEqual(watch['check_count'], 42)
|
||||||
|
|
||||||
|
# Verify nested structures weren't affected
|
||||||
|
self.assertEqual(watch['time_between_check']['days'], 1)
|
||||||
|
self.assertEqual(watch['headers']['user-agent'], 'test-browser')
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == '__main__':
|
||||||
|
unittest.main()
|
||||||
Reference in New Issue
Block a user