mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-04-30 14:50:39 +00:00
383 lines
17 KiB
Python
383 lines
17 KiB
Python
#!/usr/bin/env python3
|
||
|
||
# run from dir above changedetectionio/ dir
|
||
# python3 -m unittest changedetectionio.tests.unit.test_notification_diff
|
||
|
||
import unittest
|
||
import os
|
||
import pickle
|
||
from copy import deepcopy
|
||
|
||
from changedetectionio.model import Watch, Tag
|
||
|
||
# mostly
|
||
class TestDiffBuilder(unittest.TestCase):
|
||
|
||
def test_watch_get_suggested_from_diff_timestamp(self):
|
||
import uuid as uuid_builder
|
||
# Create minimal mock datastore for tests
|
||
mock_datastore = {
|
||
'settings': {
|
||
'application': {}
|
||
},
|
||
'watching': {}
|
||
}
|
||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||
watch.ensure_data_dir_exists()
|
||
|
||
|
||
# Contents from the browser are always returned from the browser/requests/etc as str, str is basically UTF-16 in python
|
||
watch.save_history_blob(contents="hello world", timestamp=100, snapshot_id=str(uuid_builder.uuid4()))
|
||
watch.save_history_blob(contents="hello world", timestamp=105, snapshot_id=str(uuid_builder.uuid4()))
|
||
watch.save_history_blob(contents="hello world", timestamp=109, snapshot_id=str(uuid_builder.uuid4()))
|
||
watch.save_history_blob(contents="hello world", timestamp=112, snapshot_id=str(uuid_builder.uuid4()))
|
||
watch.save_history_blob(contents="hello world", timestamp=115, snapshot_id=str(uuid_builder.uuid4()))
|
||
watch.save_history_blob(contents="hello world", timestamp=117, snapshot_id=str(uuid_builder.uuid4()))
|
||
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "100", "Correct 'last viewed' timestamp was detected"
|
||
|
||
watch['last_viewed'] = 110
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "109", "Correct 'last viewed' timestamp was detected"
|
||
|
||
watch['last_viewed'] = 116
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "115", "Correct 'last viewed' timestamp was detected"
|
||
|
||
watch['last_viewed'] = 99
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "100", "When the 'last viewed' timestamp is less than the oldest snapshot, return oldest"
|
||
|
||
watch['last_viewed'] = 200
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "115", "When the 'last viewed' timestamp is greater than the newest snapshot, return second newest"
|
||
|
||
watch['last_viewed'] = 109
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "109", "Correct when its the same time"
|
||
|
||
# new empty one
|
||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == None, "None when no history available"
|
||
|
||
watch.save_history_blob(contents="hello world", timestamp=100, snapshot_id=str(uuid_builder.uuid4()))
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "100", "Correct with only one history snapshot"
|
||
|
||
watch['last_viewed'] = 200
|
||
p = watch.get_from_version_based_on_last_viewed
|
||
assert p == "100", "Correct with only one history snapshot"
|
||
|
||
def test_watch_deepcopy_doesnt_copy_datastore(self):
|
||
"""
|
||
CRITICAL: Ensure deepcopy(watch) shares __datastore instead of copying it.
|
||
|
||
Without this, deepcopy causes exponential memory growth:
|
||
- 100 watches × deepcopy each = 10,000 watch objects in memory (100²)
|
||
- Memory grows from 120MB → 2GB
|
||
|
||
This test prevents regressions in the __deepcopy__ implementation.
|
||
"""
|
||
# Create mock datastore with multiple watches
|
||
mock_datastore = {
|
||
'settings': {'application': {'history_snapshot_max_length': 10}},
|
||
'watching': {}
|
||
}
|
||
|
||
# Create 3 watches that all reference the same datastore
|
||
watches = []
|
||
for i in range(3):
|
||
watch = Watch.model(
|
||
__datastore=mock_datastore,
|
||
datastore_path='/tmp/test',
|
||
default={'url': f'https://example{i}.com', 'title': f'Watch {i}'}
|
||
)
|
||
mock_datastore['watching'][watch['uuid']] = watch
|
||
watches.append(watch)
|
||
|
||
# Test 1: Deepcopy shares datastore reference (doesn't copy it)
|
||
watch_copy = deepcopy(watches[0])
|
||
|
||
self.assertIsNotNone(watch_copy._datastore,
|
||
"__datastore should exist in copied watch")
|
||
self.assertIs(watch_copy._datastore, watches[0]._datastore,
|
||
"__datastore should be SHARED (same object), not copied")
|
||
self.assertIs(watch_copy._datastore, mock_datastore,
|
||
"__datastore should reference the original datastore")
|
||
|
||
# Test 2: Dict data is properly copied (not shared)
|
||
self.assertEqual(watch_copy['title'], 'Watch 0', "Dict data should be copied")
|
||
watch_copy['title'] = 'MODIFIED'
|
||
self.assertNotEqual(watches[0]['title'], 'MODIFIED',
|
||
"Modifying copy should not affect original")
|
||
|
||
# Test 3: Verify no nested datastore copies in watch dict
|
||
# The dict should only contain watch settings, not the datastore
|
||
watch_dict = dict(watch_copy)
|
||
self.assertNotIn('__datastore', watch_dict,
|
||
"__datastore should not be in dict keys")
|
||
self.assertNotIn('_model__datastore', watch_dict,
|
||
"_model__datastore should not be in dict keys")
|
||
|
||
# Test 4: Multiple deepcopies don't cause exponential memory growth
|
||
# If datastore was copied, each copy would contain 3 watches,
|
||
# and those watches would contain the datastore, etc. (infinite recursion)
|
||
copies = []
|
||
for _ in range(5):
|
||
copies.append(deepcopy(watches[0]))
|
||
|
||
# All copies should share the same datastore
|
||
for copy in copies:
|
||
self.assertIs(copy._datastore, mock_datastore,
|
||
"All copies should share the original datastore")
|
||
|
||
def test_watch_pickle_doesnt_serialize_datastore(self):
|
||
"""
|
||
Ensure pickle/unpickle doesn't serialize __datastore.
|
||
|
||
This is important for multiprocessing and caching - we don't want
|
||
to serialize the entire datastore when pickling a watch.
|
||
"""
|
||
mock_datastore = {
|
||
'settings': {'application': {}},
|
||
'watching': {}
|
||
}
|
||
|
||
watch = Watch.model(
|
||
__datastore=mock_datastore,
|
||
datastore_path='/tmp/test',
|
||
default={'url': 'https://example.com', 'title': 'Test Watch'}
|
||
)
|
||
|
||
# Pickle and unpickle
|
||
pickled = pickle.dumps(watch)
|
||
unpickled_watch = pickle.loads(pickled)
|
||
|
||
# Test 1: Watch data is preserved
|
||
self.assertEqual(unpickled_watch['url'], 'https://example.com',
|
||
"Dict data should be preserved after pickle/unpickle")
|
||
|
||
# Test 2: __datastore is NOT serialized (attribute shouldn't exist after unpickle)
|
||
self.assertFalse(hasattr(unpickled_watch, '_datastore'),
|
||
"__datastore attribute should not exist after unpickle (not serialized)")
|
||
|
||
# Test 3: Pickled data shouldn't contain the large datastore object
|
||
# If datastore was serialized, the pickle size would be much larger
|
||
pickle_size = len(pickled)
|
||
# A single watch should be small (< 10KB), not include entire datastore
|
||
self.assertLess(pickle_size, 10000,
|
||
f"Pickled watch too large ({pickle_size} bytes) - might include datastore")
|
||
|
||
def test_tag_deepcopy_works(self):
|
||
"""
|
||
Ensure Tag objects (which also inherit from watch_base) can be deepcopied.
|
||
|
||
Tags now have optional __datastore for consistency with Watch objects.
|
||
"""
|
||
mock_datastore = {
|
||
'settings': {'application': {}},
|
||
'watching': {}
|
||
}
|
||
|
||
# Test 1: Tag without datastore (backward compatibility)
|
||
tag_without_ds = Tag.model(
|
||
datastore_path='/tmp/test',
|
||
default={'title': 'Test Tag', 'overrides_watch': True}
|
||
)
|
||
tag_copy1 = deepcopy(tag_without_ds)
|
||
self.assertEqual(tag_copy1['title'], 'Test Tag', "Tag data should be copied")
|
||
|
||
# Test 2: Tag with datastore (new pattern for consistency)
|
||
tag_with_ds = Tag.model(
|
||
datastore_path='/tmp/test',
|
||
__datastore=mock_datastore,
|
||
default={'title': 'Test Tag With DS', 'overrides_watch': True}
|
||
)
|
||
|
||
# Deepcopy should work
|
||
tag_copy2 = deepcopy(tag_with_ds)
|
||
|
||
# Test 3: Dict data is copied
|
||
self.assertEqual(tag_copy2['title'], 'Test Tag With DS', "Tag data should be copied")
|
||
|
||
# Test 4: Modifications to copy don't affect original
|
||
tag_copy2['title'] = 'MODIFIED'
|
||
self.assertNotEqual(tag_with_ds['title'], 'MODIFIED',
|
||
"Modifying copy should not affect original")
|
||
|
||
# Test 5: Tag with datastore shares it (doesn't copy it)
|
||
if hasattr(tag_with_ds, '_datastore'):
|
||
self.assertIs(tag_copy2._datastore, tag_with_ds._datastore,
|
||
"Tag should share __datastore reference like Watch does")
|
||
|
||
def test_watch_copy_performance(self):
|
||
"""
|
||
Verify that our __deepcopy__ implementation doesn't cause performance issues.
|
||
|
||
With the fix, deepcopy should be fast because we're sharing datastore
|
||
instead of copying it.
|
||
"""
|
||
import time
|
||
|
||
# Create a watch with large datastore (many watches)
|
||
mock_datastore = {
|
||
'settings': {'application': {}},
|
||
'watching': {}
|
||
}
|
||
|
||
# Add 100 watches to the datastore
|
||
for i in range(100):
|
||
w = Watch.model(
|
||
__datastore=mock_datastore,
|
||
datastore_path='/tmp/test',
|
||
default={'url': f'https://example{i}.com'}
|
||
)
|
||
mock_datastore['watching'][w['uuid']] = w
|
||
|
||
# Time how long deepcopy takes
|
||
watch = list(mock_datastore['watching'].values())[0]
|
||
|
||
start = time.time()
|
||
for _ in range(10):
|
||
_ = deepcopy(watch)
|
||
elapsed = time.time() - start
|
||
|
||
# Should be fast (< 0.1 seconds for 10 copies)
|
||
# If datastore was copied, it would take much longer
|
||
self.assertLess(elapsed, 0.5,
|
||
f"Deepcopy too slow ({elapsed:.3f}s for 10 copies) - might be copying datastore")
|
||
|
||
class TestLLMDiffSummaryCache(unittest.TestCase):
|
||
"""Tests for get_llm_diff_summary / save_llm_diff_summary — version-pair + prompt-hash caching."""
|
||
|
||
PROMPT = 'List what changed as bullet points'
|
||
|
||
def _make_watch(self):
|
||
mock_datastore = {'settings': {'application': {}}, 'watching': {}}
|
||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||
watch.ensure_data_dir_exists()
|
||
return watch
|
||
|
||
def test_returns_empty_when_no_file_exists(self):
|
||
watch = self._make_watch()
|
||
assert watch.get_llm_diff_summary('1000', '2000', prompt=self.PROMPT) == ''
|
||
|
||
def test_save_and_retrieve(self):
|
||
watch = self._make_watch()
|
||
watch.save_llm_diff_summary('Price dropped to $199', '1000', '2000', prompt=self.PROMPT)
|
||
assert watch.get_llm_diff_summary('1000', '2000', prompt=self.PROMPT) == 'Price dropped to $199'
|
||
|
||
def test_different_version_pairs_are_independent(self):
|
||
watch = self._make_watch()
|
||
watch.save_llm_diff_summary('Summary A', '1000', '2000', prompt=self.PROMPT)
|
||
watch.save_llm_diff_summary('Summary B', '2000', '3000', prompt=self.PROMPT)
|
||
assert watch.get_llm_diff_summary('1000', '2000', prompt=self.PROMPT) == 'Summary A'
|
||
assert watch.get_llm_diff_summary('2000', '3000', prompt=self.PROMPT) == 'Summary B'
|
||
|
||
def test_unknown_pair_returns_empty(self):
|
||
watch = self._make_watch()
|
||
watch.save_llm_diff_summary('Summary A', '1000', '2000', prompt=self.PROMPT)
|
||
assert watch.get_llm_diff_summary('9999', '8888', prompt=self.PROMPT) == ''
|
||
|
||
def test_changed_prompt_is_a_cache_miss(self):
|
||
"""Changing the prompt must invalidate the cached summary for the same version pair."""
|
||
watch = self._make_watch()
|
||
watch.save_llm_diff_summary('Old summary', '1000', '2000', prompt='original prompt')
|
||
# Different prompt → different hash → different filename → miss
|
||
assert watch.get_llm_diff_summary('1000', '2000', prompt='new different prompt') == ''
|
||
|
||
def test_file_named_by_versions_and_prompt_hash(self):
|
||
"""Cache file must be named change-summary-{from}-to-{to}-{hash}.txt."""
|
||
import hashlib
|
||
watch = self._make_watch()
|
||
prompt = 'my summary prompt'
|
||
watch.save_llm_diff_summary('Test summary', '1776000000', '1776001000', prompt=prompt)
|
||
prompt_hash = hashlib.md5(prompt.encode()).hexdigest()[:8]
|
||
expected_path = os.path.join(
|
||
watch.data_dir,
|
||
f'change-summary-1776000000-to-1776001000-{prompt_hash}.txt'
|
||
)
|
||
assert os.path.isfile(expected_path), \
|
||
f"Expected cache file not found: {expected_path}"
|
||
with open(expected_path, 'r') as f:
|
||
assert f.read().strip() == 'Test summary'
|
||
|
||
def test_overwrite_same_pair_and_prompt(self):
|
||
watch = self._make_watch()
|
||
watch.save_llm_diff_summary('First summary', '1000', '2000', prompt=self.PROMPT)
|
||
watch.save_llm_diff_summary('Updated summary', '1000', '2000', prompt=self.PROMPT)
|
||
assert watch.get_llm_diff_summary('1000', '2000', prompt=self.PROMPT) == 'Updated summary'
|
||
|
||
|
||
class TestHistoryPathTraversal(unittest.TestCase):
|
||
"""GHSA-8757-69j2-hx56: history.txt must not allow reads outside the watch data dir."""
|
||
|
||
def _make_watch(self):
|
||
mock_datastore = {'settings': {'application': {}}, 'watching': {}}
|
||
watch = Watch.model(datastore_path='/tmp', __datastore=mock_datastore, default={})
|
||
watch.ensure_data_dir_exists()
|
||
return watch
|
||
|
||
def _write_history_txt(self, watch, lines):
|
||
"""Directly write raw lines to history.txt to simulate a restored backup."""
|
||
fname = os.path.join(watch.data_dir, watch.history_index_filename)
|
||
with open(fname, 'w', encoding='utf-8') as f:
|
||
f.writelines(lines)
|
||
|
||
def test_absolute_path_in_history_is_rejected(self):
|
||
"""An absolute path like /etc/passwd must not appear in history."""
|
||
watch = self._make_watch()
|
||
self._write_history_txt(watch, ['1000000000,/etc/passwd\n'])
|
||
history = watch.history
|
||
self.assertEqual(history, {}, "Absolute path entry must be rejected")
|
||
|
||
def test_traversal_path_in_history_is_rejected(self):
|
||
"""A relative traversal path like ../../etc/passwd must not appear in history."""
|
||
watch = self._make_watch()
|
||
self._write_history_txt(watch, ['1000000000,../../etc/passwd\n'])
|
||
history = watch.history
|
||
self.assertEqual(history, {}, "Path traversal entry must be rejected")
|
||
|
||
def test_normal_snapshot_entry_is_accepted(self):
|
||
"""A bare filename written by save_history_blob must still load correctly."""
|
||
import uuid as uuid_builder
|
||
watch = self._make_watch()
|
||
watch.save_history_blob(contents="hello world", timestamp=1000000000, snapshot_id=str(uuid_builder.uuid4()))
|
||
history = watch.history
|
||
self.assertEqual(len(history), 1, "Normal snapshot entry must be accepted")
|
||
self.assertTrue(
|
||
list(history.values())[0].startswith(watch.data_dir),
|
||
"Resolved path must be inside the watch data directory"
|
||
)
|
||
|
||
def test_get_history_snapshot_blocks_outside_path_directly(self):
|
||
"""get_history_snapshot(filepath=...) must raise if the path escapes data_dir."""
|
||
watch = self._make_watch()
|
||
with self.assertRaises(PermissionError):
|
||
watch.get_history_snapshot(filepath='/etc/passwd')
|
||
|
||
def test_get_history_snapshot_blocks_traversal_directly(self):
|
||
"""get_history_snapshot(filepath=...) must raise on ../../ traversal paths."""
|
||
watch = self._make_watch()
|
||
with self.assertRaises(PermissionError):
|
||
watch.get_history_snapshot(filepath=os.path.join(watch.data_dir, '../../etc/passwd'))
|
||
|
||
def test_resolved_path_stays_inside_data_dir(self):
|
||
"""All resolved history paths must reside within the watch's data_dir."""
|
||
import uuid as uuid_builder
|
||
watch = self._make_watch()
|
||
for ts in [1000000001, 1000000002, 1000000003]:
|
||
watch.save_history_blob(contents=f"content {ts}", timestamp=ts, snapshot_id=str(uuid_builder.uuid4()))
|
||
safe_dir = os.path.realpath(watch.data_dir)
|
||
for path in watch.history.values():
|
||
self.assertTrue(
|
||
os.path.realpath(path).startswith(safe_dir),
|
||
f"Path {path!r} escapes the watch data directory"
|
||
)
|
||
|
||
|
||
if __name__ == '__main__':
|
||
unittest.main()
|