mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-07 01:56:53 +00:00
Compare commits
1 Commits
orjson
...
history-fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9356f9467e |
@@ -64,7 +64,7 @@ def count_words_in_history(watch):
|
||||
return 0
|
||||
|
||||
latest_key = list(watch.history.keys())[-1]
|
||||
latest_content = watch.get_history_snapshot(latest_key)
|
||||
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||
return len(latest_content.split())
|
||||
except Exception as e:
|
||||
logger.error(f"Error counting words: {str(e)}")
|
||||
|
||||
@@ -175,7 +175,7 @@ class WatchSingleHistory(Resource):
|
||||
response = make_response("No content found", 404)
|
||||
response.mimetype = "text/plain"
|
||||
else:
|
||||
content = watch.get_history_snapshot(timestamp)
|
||||
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||
response = make_response(content, 200)
|
||||
response.mimetype = "text/plain"
|
||||
|
||||
|
||||
@@ -118,8 +118,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
fe.title(title=watch_label)
|
||||
try:
|
||||
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
|
||||
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
|
||||
include_equal=False,
|
||||
line_feed_sep="<br>"
|
||||
)
|
||||
|
||||
@@ -106,7 +106,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
trigger_text = watch.get('trigger_text', [])
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
@@ -123,8 +123,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
||||
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
|
||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||
current_snapshot=current_snapshot,
|
||||
|
||||
@@ -47,7 +47,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
||||
|
||||
try:
|
||||
versions = list(watch.history.keys())
|
||||
content = watch.get_history_snapshot(timestamp)
|
||||
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||
|
||||
triggered_line_numbers = html_tools.strip_ignore_text(content=content,
|
||||
wordlist=watch['trigger_text'],
|
||||
|
||||
@@ -14,7 +14,7 @@ def count_words_in_history(watch, incoming_text=None):
|
||||
elif watch.history.keys():
|
||||
# When called from UI extras to count latest snapshot
|
||||
latest_key = list(watch.history.keys())[-1]
|
||||
latest_content = watch.get_history_snapshot(latest_key)
|
||||
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||
return len(latest_content.split())
|
||||
return 0
|
||||
except Exception as e:
|
||||
|
||||
@@ -276,9 +276,17 @@ class model(watch_base):
|
||||
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
|
||||
return sorted_keys[-1]
|
||||
|
||||
def get_history_snapshot(self, timestamp):
|
||||
def get_history_snapshot(self, timestamp=None, filepath=None):
|
||||
"""
|
||||
Accepts either timestamp or filepath
|
||||
:param timestamp:
|
||||
:param filepath:
|
||||
:return:
|
||||
"""
|
||||
import brotli
|
||||
filepath = self.history[timestamp]
|
||||
|
||||
if not filepath:
|
||||
filepath = self.history[timestamp]
|
||||
|
||||
# See if a brotli versions exists and switch to that
|
||||
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
||||
@@ -382,7 +390,7 @@ class model(watch_base):
|
||||
# Compare each lines (set) against each history text file (set) looking for something new..
|
||||
existing_history = set({})
|
||||
for k, v in self.history.items():
|
||||
content = self.get_history_snapshot(k)
|
||||
content = self.get_history_snapshot(filepath=v)
|
||||
|
||||
if ignore_whitespace:
|
||||
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
|
||||
@@ -639,7 +647,7 @@ class model(watch_base):
|
||||
for k, fname in self.history.items():
|
||||
if os.path.isfile(fname):
|
||||
if True:
|
||||
contents = self.get_history_snapshot(k)
|
||||
contents = self.get_history_snapshot(timestamp=k)
|
||||
res = re.findall(regex, contents, re.MULTILINE)
|
||||
if res:
|
||||
if not csv_writer:
|
||||
@@ -732,7 +740,7 @@ class model(watch_base):
|
||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||
dates = list(self.history.keys())
|
||||
if len(dates):
|
||||
return self.get_history_snapshot(dates[-1])
|
||||
return self.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
return ''
|
||||
|
||||
|
||||
@@ -133,7 +133,7 @@ class NotificationService:
|
||||
|
||||
# Add text that was triggered
|
||||
if len(dates):
|
||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
else:
|
||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||
|
||||
@@ -154,8 +154,8 @@ class NotificationService:
|
||||
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
||||
|
||||
if len(dates) > 1:
|
||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
||||
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||
|
||||
|
||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||
|
||||
@@ -353,7 +353,7 @@ def check_json_ext_filter(json_filter, client, live_server, datastore_path):
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
assert snapshot_contents[0] == '['
|
||||
|
||||
@@ -439,7 +439,7 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
assert b'"hello": 123,' in res.data # properly html escaped in the front end
|
||||
|
||||
|
||||
@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
|
||||
# PDF header should not be there (it was converted to text)
|
||||
assert 'PDF' not in snapshot_contents
|
||||
@@ -75,7 +75,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
||||
|
||||
dates = list(watch.history.keys())
|
||||
# new snapshot was also OK, no HTML
|
||||
snapshot_contents = watch.get_history_snapshot(dates[1])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[1])
|
||||
assert 'html' not in snapshot_contents.lower()
|
||||
assert f'Original file size - {os.path.getsize(os.path.join(datastore_path, "endpoint-test.pdf"))}' in snapshot_contents
|
||||
assert f'here is a change' in snapshot_contents
|
||||
|
||||
@@ -65,7 +65,7 @@ def test_rss_reader_mode(client, live_server, measure_memory_usage, datastore_pa
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
assert 'Wet noodles escape' in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
@@ -91,7 +91,7 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
|
||||
|
||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||
dates = list(watch.history.keys())
|
||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||
assert 'Wet noodles escape' not in snapshot_contents
|
||||
assert '<br>' not in snapshot_contents
|
||||
assert '<' not in snapshot_contents
|
||||
|
||||
@@ -55,8 +55,8 @@ class TestTriggerConditions(unittest.TestCase):
|
||||
self.assertEqual(len(history), 2)
|
||||
|
||||
# Retrieve and check snapshots
|
||||
#snapshot1 = watch.get_history_snapshot(str(timestamp1))
|
||||
#snapshot2 = watch.get_history_snapshot(str(timestamp2))
|
||||
#snapshot1 = watch.get_history_snapshot(timestamp=str(timestamp1))
|
||||
#snapshot2 = watch.get_history_snapshot(timestamp=str(timestamp2))
|
||||
|
||||
self.store.data['watching'][self.watch_uuid].update(
|
||||
{
|
||||
|
||||
Reference in New Issue
Block a user