mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-18 23:46:15 +00:00
Compare commits
1 Commits
rss-watch-
...
history-fi
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9356f9467e |
@@ -64,7 +64,7 @@ def count_words_in_history(watch):
|
|||||||
return 0
|
return 0
|
||||||
|
|
||||||
latest_key = list(watch.history.keys())[-1]
|
latest_key = list(watch.history.keys())[-1]
|
||||||
latest_content = watch.get_history_snapshot(latest_key)
|
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||||
return len(latest_content.split())
|
return len(latest_content.split())
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Error counting words: {str(e)}")
|
logger.error(f"Error counting words: {str(e)}")
|
||||||
|
|||||||
@@ -175,7 +175,7 @@ class WatchSingleHistory(Resource):
|
|||||||
response = make_response("No content found", 404)
|
response = make_response("No content found", 404)
|
||||||
response.mimetype = "text/plain"
|
response.mimetype = "text/plain"
|
||||||
else:
|
else:
|
||||||
content = watch.get_history_snapshot(timestamp)
|
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||||
response = make_response(content, 200)
|
response = make_response(content, 200)
|
||||||
response.mimetype = "text/plain"
|
response.mimetype = "text/plain"
|
||||||
|
|
||||||
|
|||||||
@@ -118,8 +118,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
fe.title(title=watch_label)
|
fe.title(title=watch_label)
|
||||||
try:
|
try:
|
||||||
|
|
||||||
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(dates[-2]),
|
html_diff = diff.render_diff(previous_version_file_contents=watch.get_history_snapshot(timestamp=dates[-2]),
|
||||||
newest_version_file_contents=watch.get_history_snapshot(dates[-1]),
|
newest_version_file_contents=watch.get_history_snapshot(timestamp=dates[-1]),
|
||||||
include_equal=False,
|
include_equal=False,
|
||||||
line_feed_sep="<br>"
|
line_feed_sep="<br>"
|
||||||
)
|
)
|
||||||
|
|||||||
@@ -106,7 +106,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
trigger_text = watch.get('trigger_text', [])
|
trigger_text = watch.get('trigger_text', [])
|
||||||
# Add text that was triggered
|
# Add text that was triggered
|
||||||
if len(dates):
|
if len(dates):
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||||
else:
|
else:
|
||||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||||
|
|
||||||
@@ -123,8 +123,8 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
|
|
||||||
|
|
||||||
if len(dates) > 1:
|
if len(dates) > 1:
|
||||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||||
|
|
||||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||||
current_snapshot=current_snapshot,
|
current_snapshot=current_snapshot,
|
||||||
|
|||||||
@@ -47,7 +47,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
|
|
||||||
try:
|
try:
|
||||||
versions = list(watch.history.keys())
|
versions = list(watch.history.keys())
|
||||||
content = watch.get_history_snapshot(timestamp)
|
content = watch.get_history_snapshot(timestamp=timestamp)
|
||||||
|
|
||||||
triggered_line_numbers = html_tools.strip_ignore_text(content=content,
|
triggered_line_numbers = html_tools.strip_ignore_text(content=content,
|
||||||
wordlist=watch['trigger_text'],
|
wordlist=watch['trigger_text'],
|
||||||
|
|||||||
@@ -14,7 +14,7 @@ def count_words_in_history(watch, incoming_text=None):
|
|||||||
elif watch.history.keys():
|
elif watch.history.keys():
|
||||||
# When called from UI extras to count latest snapshot
|
# When called from UI extras to count latest snapshot
|
||||||
latest_key = list(watch.history.keys())[-1]
|
latest_key = list(watch.history.keys())[-1]
|
||||||
latest_content = watch.get_history_snapshot(latest_key)
|
latest_content = watch.get_history_snapshot(timestamp=latest_key)
|
||||||
return len(latest_content.split())
|
return len(latest_content.split())
|
||||||
return 0
|
return 0
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -276,9 +276,17 @@ class model(watch_base):
|
|||||||
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
|
# When the 'last viewed' timestamp is less than the oldest snapshot, return oldest
|
||||||
return sorted_keys[-1]
|
return sorted_keys[-1]
|
||||||
|
|
||||||
def get_history_snapshot(self, timestamp):
|
def get_history_snapshot(self, timestamp=None, filepath=None):
|
||||||
|
"""
|
||||||
|
Accepts either timestamp or filepath
|
||||||
|
:param timestamp:
|
||||||
|
:param filepath:
|
||||||
|
:return:
|
||||||
|
"""
|
||||||
import brotli
|
import brotli
|
||||||
filepath = self.history[timestamp]
|
|
||||||
|
if not filepath:
|
||||||
|
filepath = self.history[timestamp]
|
||||||
|
|
||||||
# See if a brotli versions exists and switch to that
|
# See if a brotli versions exists and switch to that
|
||||||
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
if not filepath.endswith('.br') and os.path.isfile(f"{filepath}.br"):
|
||||||
@@ -382,7 +390,7 @@ class model(watch_base):
|
|||||||
# Compare each lines (set) against each history text file (set) looking for something new..
|
# Compare each lines (set) against each history text file (set) looking for something new..
|
||||||
existing_history = set({})
|
existing_history = set({})
|
||||||
for k, v in self.history.items():
|
for k, v in self.history.items():
|
||||||
content = self.get_history_snapshot(k)
|
content = self.get_history_snapshot(filepath=v)
|
||||||
|
|
||||||
if ignore_whitespace:
|
if ignore_whitespace:
|
||||||
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
|
alist = set([line.translate(TRANSLATE_WHITESPACE_TABLE).lower() for line in content.splitlines()])
|
||||||
@@ -639,7 +647,7 @@ class model(watch_base):
|
|||||||
for k, fname in self.history.items():
|
for k, fname in self.history.items():
|
||||||
if os.path.isfile(fname):
|
if os.path.isfile(fname):
|
||||||
if True:
|
if True:
|
||||||
contents = self.get_history_snapshot(k)
|
contents = self.get_history_snapshot(timestamp=k)
|
||||||
res = re.findall(regex, contents, re.MULTILINE)
|
res = re.findall(regex, contents, re.MULTILINE)
|
||||||
if res:
|
if res:
|
||||||
if not csv_writer:
|
if not csv_writer:
|
||||||
@@ -732,7 +740,7 @@ class model(watch_base):
|
|||||||
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
# If a previous attempt doesnt yet exist, just snarf the previous snapshot instead
|
||||||
dates = list(self.history.keys())
|
dates = list(self.history.keys())
|
||||||
if len(dates):
|
if len(dates):
|
||||||
return self.get_history_snapshot(dates[-1])
|
return self.get_history_snapshot(timestamp=dates[-1])
|
||||||
else:
|
else:
|
||||||
return ''
|
return ''
|
||||||
|
|
||||||
|
|||||||
@@ -133,7 +133,7 @@ class NotificationService:
|
|||||||
|
|
||||||
# Add text that was triggered
|
# Add text that was triggered
|
||||||
if len(dates):
|
if len(dates):
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[-1])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[-1])
|
||||||
else:
|
else:
|
||||||
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
snapshot_contents = "No snapshot/history available, the watch should fetch atleast once."
|
||||||
|
|
||||||
@@ -154,8 +154,8 @@ class NotificationService:
|
|||||||
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
current_snapshot = "Example text: example test\nExample text: change detection is fantastic\nExample text: even more examples\nExample text: a lot more examples"
|
||||||
|
|
||||||
if len(dates) > 1:
|
if len(dates) > 1:
|
||||||
prev_snapshot = watch.get_history_snapshot(dates[-2])
|
prev_snapshot = watch.get_history_snapshot(timestamp=dates[-2])
|
||||||
current_snapshot = watch.get_history_snapshot(dates[-1])
|
current_snapshot = watch.get_history_snapshot(timestamp=dates[-1])
|
||||||
|
|
||||||
|
|
||||||
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
n_object.update(set_basic_notification_vars(snapshot_contents=snapshot_contents,
|
||||||
|
|||||||
@@ -353,7 +353,7 @@ def check_json_ext_filter(json_filter, client, live_server, datastore_path):
|
|||||||
|
|
||||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||||
|
|
||||||
assert snapshot_contents[0] == '['
|
assert snapshot_contents[0] == '['
|
||||||
|
|
||||||
@@ -439,7 +439,7 @@ def test_correct_header_detect(client, live_server, measure_memory_usage, datast
|
|||||||
|
|
||||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||||
|
|
||||||
assert b'"hello": 123,' in res.data # properly html escaped in the front end
|
assert b'"hello": 123,' in res.data # properly html escaped in the front end
|
||||||
|
|
||||||
|
|||||||
@@ -22,7 +22,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
|||||||
|
|
||||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||||
|
|
||||||
# PDF header should not be there (it was converted to text)
|
# PDF header should not be there (it was converted to text)
|
||||||
assert 'PDF' not in snapshot_contents
|
assert 'PDF' not in snapshot_contents
|
||||||
@@ -75,7 +75,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage, datastore_path):
|
|||||||
|
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
# new snapshot was also OK, no HTML
|
# new snapshot was also OK, no HTML
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[1])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[1])
|
||||||
assert 'html' not in snapshot_contents.lower()
|
assert 'html' not in snapshot_contents.lower()
|
||||||
assert f'Original file size - {os.path.getsize(os.path.join(datastore_path, "endpoint-test.pdf"))}' in snapshot_contents
|
assert f'Original file size - {os.path.getsize(os.path.join(datastore_path, "endpoint-test.pdf"))}' in snapshot_contents
|
||||||
assert f'here is a change' in snapshot_contents
|
assert f'here is a change' in snapshot_contents
|
||||||
|
|||||||
@@ -65,7 +65,7 @@ def test_rss_reader_mode(client, live_server, measure_memory_usage, datastore_pa
|
|||||||
|
|
||||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||||
assert 'Wet noodles escape' in snapshot_contents
|
assert 'Wet noodles escape' in snapshot_contents
|
||||||
assert '<br>' not in snapshot_contents
|
assert '<br>' not in snapshot_contents
|
||||||
assert '<' not in snapshot_contents
|
assert '<' not in snapshot_contents
|
||||||
@@ -91,7 +91,7 @@ def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_us
|
|||||||
|
|
||||||
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
dates = list(watch.history.keys())
|
dates = list(watch.history.keys())
|
||||||
snapshot_contents = watch.get_history_snapshot(dates[0])
|
snapshot_contents = watch.get_history_snapshot(timestamp=dates[0])
|
||||||
assert 'Wet noodles escape' not in snapshot_contents
|
assert 'Wet noodles escape' not in snapshot_contents
|
||||||
assert '<br>' not in snapshot_contents
|
assert '<br>' not in snapshot_contents
|
||||||
assert '<' not in snapshot_contents
|
assert '<' not in snapshot_contents
|
||||||
|
|||||||
@@ -55,8 +55,8 @@ class TestTriggerConditions(unittest.TestCase):
|
|||||||
self.assertEqual(len(history), 2)
|
self.assertEqual(len(history), 2)
|
||||||
|
|
||||||
# Retrieve and check snapshots
|
# Retrieve and check snapshots
|
||||||
#snapshot1 = watch.get_history_snapshot(str(timestamp1))
|
#snapshot1 = watch.get_history_snapshot(timestamp=str(timestamp1))
|
||||||
#snapshot2 = watch.get_history_snapshot(str(timestamp2))
|
#snapshot2 = watch.get_history_snapshot(timestamp=str(timestamp2))
|
||||||
|
|
||||||
self.store.data['watching'][self.watch_uuid].update(
|
self.store.data['watching'][self.watch_uuid].update(
|
||||||
{
|
{
|
||||||
|
|||||||
Reference in New Issue
Block a user