mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-30 14:17:40 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			0.50.34
			...
			individual
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 6a5e6c7c8b | 
| @@ -71,6 +71,7 @@ jobs: | ||||
|           docker run test-changedetectionio  bash -c 'python3 -m unittest changedetectionio.tests.unit.test_watch_model' | ||||
|           docker run test-changedetectionio  bash -c 'python3 -m unittest changedetectionio.tests.unit.test_jinja2_security' | ||||
|           docker run test-changedetectionio  bash -c 'python3 -m unittest changedetectionio.tests.unit.test_semver' | ||||
|           docker run test-changedetectionio  bash -c 'python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge' | ||||
|  | ||||
|       - name: Test built container with Pytest (generally as requests/plaintext fetching) | ||||
|         run: | | ||||
|   | ||||
| @@ -303,7 +303,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, worker_handle | ||||
|         watch['ignore_text'] += datastore.data['settings']['application']['global_ignore_text'] | ||||
|         watch['subtractive_selectors'] += datastore.data['settings']['application']['global_subtractive_selectors'] | ||||
|  | ||||
|         watch_json = json.dumps(watch) | ||||
|         watch_json = json.dumps(dict(watch)) | ||||
|  | ||||
|         try: | ||||
|             r = requests.request(method="POST", | ||||
|   | ||||
| @@ -43,14 +43,14 @@ class model(watch_base): | ||||
|         self.__datastore_path = kw.get('datastore_path') | ||||
|         if kw.get('datastore_path'): | ||||
|             del kw['datastore_path'] | ||||
|              | ||||
|          | ||||
|         # Save default before passing to parent, since parent will delete it | ||||
|         default_values = kw.get('default') | ||||
|          | ||||
|         super(model, self).__init__(*arg, **kw) | ||||
|         if kw.get('default'): | ||||
|             self.update(kw['default']) | ||||
|             del kw['default'] | ||||
|  | ||||
|         if self.get('default'): | ||||
|             del self['default'] | ||||
|          | ||||
|         if default_values: | ||||
|             self.update(default_values) | ||||
|  | ||||
|         # Be sure the cached timestamp is ready | ||||
|         bump = self.history | ||||
| @@ -227,8 +227,8 @@ class model(watch_base): | ||||
|  | ||||
|     @property | ||||
|     def has_history(self): | ||||
|         fname = os.path.join(self.watch_data_dir, "history.txt") | ||||
|         return os.path.isfile(fname) | ||||
|         fname = self._get_data_file_path("history.txt") | ||||
|         return fname and os.path.isfile(fname) | ||||
|  | ||||
|     @property | ||||
|     def has_browser_steps(self): | ||||
| @@ -405,16 +405,16 @@ class model(watch_base): | ||||
|         return not local_lines.issubset(existing_history) | ||||
|  | ||||
|     def get_screenshot(self): | ||||
|         fname = os.path.join(self.watch_data_dir, "last-screenshot.png") | ||||
|         if os.path.isfile(fname): | ||||
|         fname = self._get_data_file_path("last-screenshot.png") | ||||
|         if fname and os.path.isfile(fname): | ||||
|             return fname | ||||
|  | ||||
|         # False is not an option for AppRise, must be type None | ||||
|         return None | ||||
|  | ||||
|     def __get_file_ctime(self, filename): | ||||
|         fname = os.path.join(self.watch_data_dir, filename) | ||||
|         if os.path.isfile(fname): | ||||
|         fname = self._get_data_file_path(filename) | ||||
|         if fname and os.path.isfile(fname): | ||||
|             return int(os.path.getmtime(fname)) | ||||
|         return False | ||||
|  | ||||
| @@ -441,20 +441,28 @@ class model(watch_base): | ||||
|     @property | ||||
|     def watch_data_dir(self): | ||||
|         # The base dir of the watch data | ||||
|         return os.path.join(self.__datastore_path, self['uuid']) if self.__datastore_path else None | ||||
|         if self.__datastore_path and self.get('uuid'): | ||||
|             return os.path.join(self.__datastore_path, self['uuid']) | ||||
|         return None | ||||
|  | ||||
|     def _get_data_file_path(self, filename): | ||||
|         """Safely get the full path to a data file, returns None if watch_data_dir is None""" | ||||
|         if self.watch_data_dir: | ||||
|             return os.path.join(self.watch_data_dir, filename) | ||||
|         return None | ||||
|  | ||||
|     def get_error_text(self): | ||||
|         """Return the text saved from a previous request that resulted in a non-200 error""" | ||||
|         fname = os.path.join(self.watch_data_dir, "last-error.txt") | ||||
|         if os.path.isfile(fname): | ||||
|         fname = self._get_data_file_path("last-error.txt") | ||||
|         if fname and os.path.isfile(fname): | ||||
|             with open(fname, 'r') as f: | ||||
|                 return f.read() | ||||
|         return False | ||||
|  | ||||
|     def get_error_snapshot(self): | ||||
|         """Return path to the screenshot that resulted in a non-200 error""" | ||||
|         fname = os.path.join(self.watch_data_dir, "last-error-screenshot.png") | ||||
|         if os.path.isfile(fname): | ||||
|         fname = self._get_data_file_path("last-error-screenshot.png") | ||||
|         if fname and os.path.isfile(fname): | ||||
|             return fname | ||||
|         return False | ||||
|  | ||||
|   | ||||
| @@ -1,13 +1,14 @@ | ||||
| import os | ||||
| import uuid | ||||
| import json | ||||
|  | ||||
| from changedetectionio import strtobool | ||||
| default_notification_format_for_watch = 'System default' | ||||
|  | ||||
| class watch_base(dict): | ||||
| class watch_base: | ||||
|  | ||||
|     def __init__(self, *arg, **kw): | ||||
|         self.update({ | ||||
|         self.__data = { | ||||
|             # Custom notification content | ||||
|             # Re #110, so then if this is set to None, we know to use the default value instead | ||||
|             # Requires setting to None on submit if it's the same as the default | ||||
| @@ -128,9 +129,78 @@ class watch_base(dict): | ||||
|             'uuid': str(uuid.uuid4()), | ||||
|             'webdriver_delay': None, | ||||
|             'webdriver_js_execute_code': None,  # Run before change-detection | ||||
|         }) | ||||
|         } | ||||
|          | ||||
|         if len(arg) == 1 and (isinstance(arg[0], dict) or hasattr(arg[0], 'keys')): | ||||
|             self.__data.update(arg[0]) | ||||
|         if kw: | ||||
|             self.__data.update(kw) | ||||
|  | ||||
|         super(watch_base, self).__init__(*arg, **kw) | ||||
|         if self.__data.get('default'): | ||||
|             del self.__data['default'] | ||||
|  | ||||
|         if self.get('default'): | ||||
|             del self['default'] | ||||
|     def __getitem__(self, key): | ||||
|         return self.__data[key] | ||||
|  | ||||
|     def __setitem__(self, key, value): | ||||
|         self.__data[key] = value | ||||
|  | ||||
|     def __delitem__(self, key): | ||||
|         del self.__data[key] | ||||
|  | ||||
|     def __iter__(self): | ||||
|         return iter(self.__data) | ||||
|  | ||||
|     def __len__(self): | ||||
|         return len(self.__data) | ||||
|  | ||||
|     def __contains__(self, key): | ||||
|         return key in self.__data | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return repr(self.__data) | ||||
|  | ||||
|     def __str__(self): | ||||
|         return str(self.__data) | ||||
|  | ||||
|     def keys(self): | ||||
|         return self.__data.keys() | ||||
|  | ||||
|     def values(self): | ||||
|         return self.__data.values() | ||||
|  | ||||
|     def items(self): | ||||
|         return self.__data.items() | ||||
|  | ||||
|     def get(self, key, default=None): | ||||
|         return self.__data.get(key, default) | ||||
|  | ||||
|     def pop(self, key, *args): | ||||
|         return self.__data.pop(key, *args) | ||||
|  | ||||
|     def popitem(self): | ||||
|         return self.__data.popitem() | ||||
|  | ||||
|     def clear(self): | ||||
|         self.__data.clear() | ||||
|  | ||||
|     def update(self, *args, **kwargs): | ||||
|         self.__data.update(*args, **kwargs) | ||||
|  | ||||
|     def setdefault(self, key, default=None): | ||||
|         return self.__data.setdefault(key, default) | ||||
|  | ||||
|     def copy(self): | ||||
|         return self.__data.copy() | ||||
|  | ||||
|     def __deepcopy__(self, memo): | ||||
|         from copy import deepcopy | ||||
|         new_instance = self.__class__() | ||||
|         new_instance.__data = deepcopy(self.__data, memo) | ||||
|         return new_instance | ||||
|  | ||||
|     def __reduce__(self): | ||||
|         return (self.__class__, (self.__data,)) | ||||
|  | ||||
|     def to_dict(self): | ||||
|         return dict(self.__data) | ||||
| @@ -42,10 +42,10 @@ class Restock(dict): | ||||
|  | ||||
|         # Update with any provided positional arguments (dictionaries) | ||||
|         if args: | ||||
|             if len(args) == 1 and isinstance(args[0], dict): | ||||
|             if len(args) == 1 and (isinstance(args[0], dict) or hasattr(args[0], 'keys')): | ||||
|                 self.update(args[0]) | ||||
|             else: | ||||
|                 raise ValueError("Only one positional argument of type 'dict' is allowed") | ||||
|                 raise ValueError("Only one positional argument of type 'dict' or dict-like is allowed") | ||||
|  | ||||
|     def __setitem__(self, key, value): | ||||
|         # Custom logic to handle setting price and original_price | ||||
|   | ||||
| @@ -23,6 +23,13 @@ from blinker import signal | ||||
| from .processors import get_custom_watch_obj_for_processor | ||||
| from .processors.restock_diff import Restock | ||||
|  | ||||
| class WatchEncoder(json.JSONEncoder): | ||||
|     def default(self, obj): | ||||
|         from .model import watch_base | ||||
|         if isinstance(obj, watch_base): | ||||
|             return dict(obj) | ||||
|         return super().default(obj) | ||||
|  | ||||
| # Because the server will run as a daemon and wont know the URL for notification links when firing off a notification | ||||
| BASE_URL_NOT_SET_TEXT = '("Base URL" not set - see settings - notifications)' | ||||
|  | ||||
| @@ -51,9 +58,6 @@ class ChangeDetectionStore: | ||||
|         self.needs_write = False | ||||
|         self.start_time = time.time() | ||||
|         self.stop_thread = False | ||||
|         # Base definition for all watchers | ||||
|         # deepcopy part of #569 - not sure why its needed exactly | ||||
|         self.generic_definition = deepcopy(Watch.model(datastore_path = datastore_path, default={})) | ||||
|  | ||||
|         if path.isfile('changedetectionio/source.txt'): | ||||
|             with open('changedetectionio/source.txt') as f: | ||||
| @@ -174,6 +178,14 @@ class ChangeDetectionStore: | ||||
|         self.__data['settings']['application']['password'] = False | ||||
|         self.needs_write = True | ||||
|  | ||||
|     def _deep_merge(self, target, source): | ||||
|         """Recursively merge source dict into target dict""" | ||||
|         for key, value in source.items(): | ||||
|             if key in target and isinstance(target[key], dict) and isinstance(value, dict): | ||||
|                 self._deep_merge(target[key], value) | ||||
|             else: | ||||
|                 target[key] = value | ||||
|  | ||||
|     def update_watch(self, uuid, update_obj): | ||||
|  | ||||
|         # It's possible that the watch could be deleted before update | ||||
| @@ -181,15 +193,8 @@ class ChangeDetectionStore: | ||||
|             return | ||||
|  | ||||
|         with self.lock: | ||||
|  | ||||
|             # In python 3.9 we have the |= dict operator, but that still will lose data on nested structures... | ||||
|             for dict_key, d in self.generic_definition.items(): | ||||
|                 if isinstance(d, dict): | ||||
|                     if update_obj is not None and dict_key in update_obj: | ||||
|                         self.__data['watching'][uuid][dict_key].update(update_obj[dict_key]) | ||||
|                         del (update_obj[dict_key]) | ||||
|  | ||||
|             self.__data['watching'][uuid].update(update_obj) | ||||
|             # Use recursive merge to handle nested dictionaries properly | ||||
|             self._deep_merge(self.__data['watching'][uuid], update_obj) | ||||
|         self.needs_write = True | ||||
|  | ||||
|     @property | ||||
| @@ -393,6 +398,51 @@ class ChangeDetectionStore: | ||||
|  | ||||
|         return False | ||||
|  | ||||
|     import json | ||||
|     import os | ||||
|     import tempfile | ||||
|     from pathlib import Path  # just for nicer paths | ||||
|  | ||||
|     JSON_INDENT = 2  # or None in production | ||||
|     ENCODER = WatchEncoder  # your custom encoder | ||||
|  | ||||
|     def save_json_atomic(self, save_path: str | os.PathLike, data) -> None: | ||||
|         """ | ||||
|         Atomically (re)write *path* with *data* encoded as JSON. | ||||
|         The original file is left untouched if anything fails. | ||||
|         """ | ||||
|         import tempfile | ||||
|         from pathlib import Path  # just for nicer paths | ||||
|  | ||||
|         JSON_INDENT = 2  # or None in production | ||||
|         ENCODER = WatchEncoder  # your custom encoder | ||||
|  | ||||
|         datapath = Path(save_path) | ||||
|         directory = datapath.parent | ||||
|  | ||||
|         # 1. create a unique temp file in the same directory | ||||
|         fd, tmp_name = tempfile.mkstemp( | ||||
|             dir=directory, | ||||
|             prefix=f"{datapath.name}.", | ||||
|             suffix=".tmp", | ||||
|         ) | ||||
|         try: | ||||
|             with os.fdopen(fd, "w", encoding="utf-8") as tmp: | ||||
|                 json.dump(data, tmp, indent=JSON_INDENT, cls=ENCODER) | ||||
|                 if os.getenv('JSON_SAVE_FORCE_FLUSH'): | ||||
|                     tmp.flush()  # push Python buffers | ||||
|                     os.fsync(tmp.fileno())  # force kernel to write to disk | ||||
|             os.replace(tmp_name, datapath) | ||||
|  | ||||
|         except Exception as e: | ||||
|             logger.critical(f"Failed to write JSON to {datapath} - {str(e)}") | ||||
|             # if anything above blew up, ensure we don't leave junk lying around | ||||
|             try: | ||||
|                 os.unlink(tmp_name) | ||||
|             finally: | ||||
|                 raise | ||||
|  | ||||
|  | ||||
|     def sync_to_json(self): | ||||
|         logger.info("Saving JSON..") | ||||
|         try: | ||||
| @@ -404,18 +454,7 @@ class ChangeDetectionStore: | ||||
|             self.sync_to_json() | ||||
|             return | ||||
|         else: | ||||
|  | ||||
|             try: | ||||
|                 # Re #286  - First write to a temp file, then confirm it looks OK and rename it | ||||
|                 # This is a fairly basic strategy to deal with the case that the file is corrupted, | ||||
|                 # system was out of memory, out of RAM etc | ||||
|                 with open(self.json_store_path+".tmp", 'w') as json_file: | ||||
|                     # Use compact JSON in production for better performance | ||||
|                     json.dump(data, json_file, indent=2) | ||||
|                     os.replace(self.json_store_path+".tmp", self.json_store_path) | ||||
|             except Exception as e: | ||||
|                 logger.error(f"Error writing JSON!! (Main JSON file save was skipped) : {str(e)}") | ||||
|  | ||||
|             self.save_json_atomic(save_path = self.json_store_path, data =data) | ||||
|             self.needs_write = False | ||||
|             self.needs_write_urgent = False | ||||
|  | ||||
|   | ||||
							
								
								
									
										291
									
								
								changedetectionio/tests/unit/test_update_watch_deep_merge.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										291
									
								
								changedetectionio/tests/unit/test_update_watch_deep_merge.py
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,291 @@ | ||||
| #!/usr/bin/env python3 | ||||
|  | ||||
| # run from dir above changedetectionio/ dir | ||||
| # python3 -m unittest changedetectionio.tests.unit.test_update_watch_deep_merge | ||||
|  | ||||
| import unittest | ||||
| import os | ||||
| import tempfile | ||||
| import shutil | ||||
| from unittest.mock import patch | ||||
|  | ||||
| from changedetectionio import store | ||||
|  | ||||
|  | ||||
| class TestUpdateWatchDeepMerge(unittest.TestCase): | ||||
|  | ||||
|     def setUp(self): | ||||
|         # Create a temporary directory for test data | ||||
|         self.test_datastore_path = tempfile.mkdtemp() | ||||
|         self.datastore = store.ChangeDetectionStore(datastore_path=self.test_datastore_path, include_default_watches=False) | ||||
|          | ||||
|         # Create a test watch with known nested structure | ||||
|         self.test_uuid = self.datastore.add_watch(url='http://example.com') | ||||
|          | ||||
|         # Set up known initial nested structure | ||||
|         initial_data = { | ||||
|             'time_between_check': {'weeks': None, 'days': 1, 'hours': 6, 'minutes': 30, 'seconds': None}, | ||||
|             'headers': {'user-agent': 'test-browser', 'accept': 'text/html'}, | ||||
|             'time_schedule_limit': { | ||||
|                 'enabled': True, | ||||
|                 'monday': { | ||||
|                     'enabled': True, | ||||
|                     'start_time': '09:00', | ||||
|                     'duration': {'hours': '8', 'minutes': '00'} | ||||
|                 }, | ||||
|                 'tuesday': { | ||||
|                     'enabled': False, | ||||
|                     'start_time': '10:00', | ||||
|                     'duration': {'hours': '6', 'minutes': '30'} | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, initial_data) | ||||
|  | ||||
|     def tearDown(self): | ||||
|         self.datastore.stop_thread = True | ||||
|         # Clean up the temporary directory | ||||
|         shutil.rmtree(self.test_datastore_path, ignore_errors=True) | ||||
|  | ||||
|     def test_simple_flat_update(self): | ||||
|         """Test that simple flat updates work as before""" | ||||
|         update_obj = {'url': 'http://newexample.com', 'paused': True} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         self.assertEqual(watch['url'], 'http://newexample.com') | ||||
|         self.assertEqual(watch['paused'], True) | ||||
|  | ||||
|     def test_time_between_check_partial_update(self): | ||||
|         """Test partial update of time_between_check preserves existing keys""" | ||||
|         # Update only hours, should preserve other existing values | ||||
|         update_obj = {'time_between_check': {'hours': 2}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         time_check = watch['time_between_check'] | ||||
|          | ||||
|         # Updated value | ||||
|         self.assertEqual(time_check['hours'], 2) | ||||
|         # Preserved existing values | ||||
|         self.assertEqual(time_check['days'], 1) | ||||
|         self.assertEqual(time_check['minutes'], 30) | ||||
|         self.assertEqual(time_check['weeks'], None) | ||||
|         self.assertEqual(time_check['seconds'], None) | ||||
|  | ||||
|     def test_time_between_check_multiple_partial_updates(self): | ||||
|         """Test multiple partial updates to time_between_check""" | ||||
|         # First update | ||||
|         update_obj1 = {'time_between_check': {'minutes': 45}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj1) | ||||
|          | ||||
|         # Second update | ||||
|         update_obj2 = {'time_between_check': {'seconds': 15}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj2) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         time_check = watch['time_between_check'] | ||||
|          | ||||
|         # Both updates should be preserved | ||||
|         self.assertEqual(time_check['minutes'], 45) | ||||
|         self.assertEqual(time_check['seconds'], 15) | ||||
|         # Original values should be preserved | ||||
|         self.assertEqual(time_check['days'], 1) | ||||
|         self.assertEqual(time_check['hours'], 6) | ||||
|  | ||||
|     def test_headers_partial_update(self): | ||||
|         """Test partial update of headers preserves existing headers""" | ||||
|         update_obj = {'headers': {'authorization': 'Bearer token123'}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         headers = watch['headers'] | ||||
|          | ||||
|         # New header added | ||||
|         self.assertEqual(headers['authorization'], 'Bearer token123') | ||||
|         # Existing headers preserved | ||||
|         self.assertEqual(headers['user-agent'], 'test-browser') | ||||
|         self.assertEqual(headers['accept'], 'text/html') | ||||
|  | ||||
|     def test_headers_update_existing_key(self): | ||||
|         """Test updating an existing header key""" | ||||
|         update_obj = {'headers': {'user-agent': 'new-browser'}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         headers = watch['headers'] | ||||
|          | ||||
|         # Updated existing header | ||||
|         self.assertEqual(headers['user-agent'], 'new-browser') | ||||
|         # Other headers preserved | ||||
|         self.assertEqual(headers['accept'], 'text/html') | ||||
|  | ||||
|     def test_time_schedule_limit_deep_nested_update(self): | ||||
|         """Test deep nested update of time_schedule_limit structure""" | ||||
|         update_obj = { | ||||
|             'time_schedule_limit': { | ||||
|                 'monday': { | ||||
|                     'duration': {'hours': '10'}  # Only update hours, preserve minutes | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         schedule = watch['time_schedule_limit'] | ||||
|          | ||||
|         # Deep nested update applied | ||||
|         self.assertEqual(schedule['monday']['duration']['hours'], '10') | ||||
|         # Existing nested values preserved | ||||
|         self.assertEqual(schedule['monday']['duration']['minutes'], '00') | ||||
|         self.assertEqual(schedule['monday']['start_time'], '09:00') | ||||
|         self.assertEqual(schedule['monday']['enabled'], True) | ||||
|         # Other days preserved | ||||
|         self.assertEqual(schedule['tuesday']['enabled'], False) | ||||
|         self.assertEqual(schedule['enabled'], True) | ||||
|  | ||||
|     def test_mixed_flat_and_nested_update(self): | ||||
|         """Test update with both flat and nested properties""" | ||||
|         update_obj = { | ||||
|             'url': 'http://mixed-update.com', | ||||
|             'paused': False, | ||||
|             'time_between_check': {'days': 2, 'minutes': 15}, | ||||
|             'headers': {'cookie': 'session=abc123'} | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|          | ||||
|         # Flat updates | ||||
|         self.assertEqual(watch['url'], 'http://mixed-update.com') | ||||
|         self.assertEqual(watch['paused'], False) | ||||
|          | ||||
|         # Nested updates | ||||
|         time_check = watch['time_between_check'] | ||||
|         self.assertEqual(time_check['days'], 2) | ||||
|         self.assertEqual(time_check['minutes'], 15) | ||||
|         self.assertEqual(time_check['hours'], 6)  # preserved | ||||
|          | ||||
|         headers = watch['headers'] | ||||
|         self.assertEqual(headers['cookie'], 'session=abc123') | ||||
|         self.assertEqual(headers['user-agent'], 'test-browser')  # preserved | ||||
|  | ||||
|     def test_overwrite_nested_with_flat(self): | ||||
|         """Test that providing a non-dict value overwrites the entire nested structure""" | ||||
|         update_obj = {'time_between_check': 'invalid_value'} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         # Should completely replace the nested dict with the string | ||||
|         self.assertEqual(watch['time_between_check'], 'invalid_value') | ||||
|  | ||||
|     def test_add_new_nested_structure(self): | ||||
|         """Test adding a completely new nested dictionary""" | ||||
|         update_obj = { | ||||
|             'custom_config': { | ||||
|                 'option1': 'value1', | ||||
|                 'nested': { | ||||
|                     'suboption': 'subvalue' | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         self.assertEqual(watch['custom_config']['option1'], 'value1') | ||||
|         self.assertEqual(watch['custom_config']['nested']['suboption'], 'subvalue') | ||||
|  | ||||
|     def test_empty_dict_update(self): | ||||
|         """Test updating with empty dictionaries""" | ||||
|         update_obj = {'headers': {}} | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         # Empty dict should preserve existing headers (no keys to merge) | ||||
|         self.assertEqual(watch['headers']['user-agent'], 'test-browser') | ||||
|         self.assertEqual(watch['headers']['accept'], 'text/html') | ||||
|  | ||||
|     def test_none_values_in_nested_update(self): | ||||
|         """Test handling None values in nested updates""" | ||||
|         update_obj = { | ||||
|             'time_between_check': { | ||||
|                 'hours': None, | ||||
|                 'days': 3 | ||||
|             } | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         time_check = watch['time_between_check'] | ||||
|          | ||||
|         self.assertEqual(time_check['hours'], None) | ||||
|         self.assertEqual(time_check['days'], 3) | ||||
|         self.assertEqual(time_check['minutes'], 30)  # preserved | ||||
|  | ||||
|     def test_real_world_api_update_scenario(self): | ||||
|         """Test a real-world API update scenario from the codebase analysis""" | ||||
|         # Based on actual API call patterns found in the codebase | ||||
|         update_obj = { | ||||
|             "title": "Updated API Watch", | ||||
|             'time_between_check': {'minutes': 60}, | ||||
|             'headers': {'authorization': 'Bearer api-token', 'user-agent': 'api-client'}, | ||||
|             'notification_urls': ['https://webhook.example.com'] | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|          | ||||
|         # Verify all updates | ||||
|         self.assertEqual(watch['title'], 'Updated API Watch') | ||||
|         self.assertEqual(watch['time_between_check']['minutes'], 60) | ||||
|         self.assertEqual(watch['time_between_check']['days'], 1)  # preserved | ||||
|         self.assertEqual(watch['headers']['authorization'], 'Bearer api-token') | ||||
|         self.assertEqual(watch['headers']['user-agent'], 'api-client')  # overwrote existing | ||||
|         self.assertEqual(watch['headers']['accept'], 'text/html')  # preserved | ||||
|         self.assertEqual(watch['notification_urls'], ['https://webhook.example.com']) | ||||
|  | ||||
|     def test_watch_not_found(self): | ||||
|         """Test update_watch with non-existent UUID""" | ||||
|         # Should not raise an error, just return silently | ||||
|         fake_uuid = 'non-existent-uuid' | ||||
|         update_obj = {'url': 'http://should-not-update.com'} | ||||
|          | ||||
|         # Should not raise an exception | ||||
|         self.datastore.update_watch(fake_uuid, update_obj) | ||||
|          | ||||
|         # Verify no changes were made to existing watch | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|         self.assertNotEqual(watch['url'], 'http://should-not-update.com') | ||||
|  | ||||
|     def test_processor_style_update(self): | ||||
|         """Test the type of updates made by processors during check operations""" | ||||
|         # Based on async_update_worker.py patterns | ||||
|         update_obj = { | ||||
|             'last_notification_error': False, | ||||
|             'last_error': False, | ||||
|             'previous_md5': 'abc123def456', | ||||
|             'content-type': 'application/json', | ||||
|             'consecutive_filter_failures': 0, | ||||
|             'fetch_time': 1.234, | ||||
|             'check_count': 42 | ||||
|         } | ||||
|         self.datastore.update_watch(self.test_uuid, update_obj) | ||||
|          | ||||
|         watch = self.datastore.data['watching'][self.test_uuid] | ||||
|          | ||||
|         # Verify processor updates | ||||
|         self.assertEqual(watch['last_notification_error'], False) | ||||
|         self.assertEqual(watch['last_error'], False) | ||||
|         self.assertEqual(watch['previous_md5'], 'abc123def456') | ||||
|         self.assertEqual(watch['content-type'], 'application/json') | ||||
|         self.assertEqual(watch['consecutive_filter_failures'], 0) | ||||
|         self.assertEqual(watch['fetch_time'], 1.234) | ||||
|         self.assertEqual(watch['check_count'], 42) | ||||
|          | ||||
|         # Verify nested structures weren't affected | ||||
|         self.assertEqual(watch['time_between_check']['days'], 1) | ||||
|         self.assertEqual(watch['headers']['user-agent'], 'test-browser') | ||||
|  | ||||
|  | ||||
| if __name__ == '__main__': | ||||
|     unittest.main() | ||||
		Reference in New Issue
	
	Block a user