mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-06 17:46:06 +00:00
Compare commits
3 Commits
0.49.11
...
threading-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
9d8558fbc9 | ||
|
|
a7a8ba58ed | ||
|
|
7823140442 |
@@ -105,9 +105,10 @@ def init_app_secret(datastore_path):
|
|||||||
# running or something similar.
|
# running or something similar.
|
||||||
@app.template_filter('format_last_checked_time')
|
@app.template_filter('format_last_checked_time')
|
||||||
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
|
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
|
||||||
|
|
||||||
# Worker thread tells us which UUID it is currently processing.
|
# Worker thread tells us which UUID it is currently processing.
|
||||||
for t in running_update_threads:
|
for t in threading.enumerate():
|
||||||
if t.current_uuid == watch_obj['uuid']:
|
if t.name == 'update_worker' and t.current_uuid == watch_obj['uuid']:
|
||||||
return '<span class="loader"></span><span> Checking now</span>'
|
return '<span class="loader"></span><span> Checking now</span>'
|
||||||
|
|
||||||
if watch_obj['last_checked'] == 0:
|
if watch_obj['last_checked'] == 0:
|
||||||
@@ -1213,6 +1214,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
# @todo handle ctrl break
|
# @todo handle ctrl break
|
||||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||||
|
threading.Thread(target=ticker_thread_job_queue_processor).start()
|
||||||
|
|
||||||
threading.Thread(target=notification_runner).start()
|
threading.Thread(target=notification_runner).start()
|
||||||
|
|
||||||
@@ -1288,25 +1290,63 @@ def notification_runner():
|
|||||||
# Trim the log length
|
# Trim the log length
|
||||||
notification_debug_log = notification_debug_log[-100:]
|
notification_debug_log = notification_debug_log[-100:]
|
||||||
|
|
||||||
|
# Check the queue, when a job exists, start a fresh thread of update_worker
|
||||||
|
def ticker_thread_job_queue_processor():
|
||||||
|
|
||||||
|
from changedetectionio import update_worker
|
||||||
|
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||||
|
|
||||||
|
while not app.config.exit.is_set():
|
||||||
|
time.sleep(0.3)
|
||||||
|
|
||||||
|
# Check that some threads are free
|
||||||
|
running = 0
|
||||||
|
for t in threading.enumerate():
|
||||||
|
if t.name == 'update_worker':
|
||||||
|
running += 1
|
||||||
|
|
||||||
|
if running >= n_workers:
|
||||||
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
uuid = update_q.get(block=False)
|
||||||
|
except queue.Empty:
|
||||||
|
# Go back to waiting for exit and/or another entry from the queue
|
||||||
|
continue
|
||||||
|
print ("Starting a thread fetch")
|
||||||
|
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Launch the update_worker thread that will handle picking items off a queue and sending them off
|
||||||
|
# in the event that playwright or others have a memory leak, this should clean it up better than gc.collect()
|
||||||
|
# (By letting it exit entirely)
|
||||||
|
update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start()
|
||||||
|
except Exception as e:
|
||||||
|
print ("Error launching update_worker for UUID {}.".format(uuid))
|
||||||
|
print (str(e))
|
||||||
|
|
||||||
|
print ("Running now {}", running)
|
||||||
|
|
||||||
|
|
||||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
||||||
def ticker_thread_check_time_launch_checks():
|
def ticker_thread_check_time_launch_checks():
|
||||||
import random
|
import random
|
||||||
from changedetectionio import update_worker
|
|
||||||
|
|
||||||
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
||||||
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
|
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
|
||||||
|
|
||||||
|
# Can go in its own function
|
||||||
|
|
||||||
|
# Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch
|
||||||
|
# This is to be totally sure that they don't leak memory
|
||||||
# Spin up Workers that do the fetching
|
# Spin up Workers that do the fetching
|
||||||
# Can be overriden by ENV or use the default settings
|
# Can be overriden by ENV or use the default settings
|
||||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
|
||||||
for _ in range(n_workers):
|
|
||||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
|
||||||
running_update_threads.append(new_worker)
|
|
||||||
new_worker.start()
|
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
|
|
||||||
# Get a list of watches by UUID that are currently fetching data
|
# Update our list of watches by UUID that are currently fetching data, used in the UI
|
||||||
running_uuids = []
|
running_uuids = []
|
||||||
for t in running_update_threads:
|
for t in running_update_threads:
|
||||||
if t.current_uuid:
|
if t.current_uuid:
|
||||||
|
|||||||
@@ -4,8 +4,6 @@ from typing import List
|
|||||||
from bs4 import BeautifulSoup
|
from bs4 import BeautifulSoup
|
||||||
from jsonpath_ng.ext import parse
|
from jsonpath_ng.ext import parse
|
||||||
import re
|
import re
|
||||||
from inscriptis import get_text
|
|
||||||
from inscriptis.model.config import ParserConfig
|
|
||||||
|
|
||||||
class FilterNotFoundInResponse(ValueError):
|
class FilterNotFoundInResponse(ValueError):
|
||||||
def __init__(self, msg):
|
def __init__(self, msg):
|
||||||
@@ -183,9 +181,16 @@ def strip_ignore_text(content, wordlist, mode="content"):
|
|||||||
|
|
||||||
|
|
||||||
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
|
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
|
||||||
|
import multiprocessing
|
||||||
|
|
||||||
|
from inscriptis.model.config import ParserConfig
|
||||||
|
|
||||||
"""Converts html string to a string with just the text. If ignoring
|
"""Converts html string to a string with just the text. If ignoring
|
||||||
rendering anchor tag content is enable, anchor tag content are also
|
rendering anchor tag content is enable, anchor tag content are also
|
||||||
included in the text
|
included in the text
|
||||||
|
|
||||||
|
@NOTE: HORRIBLE LXML INDUCED MEMORY LEAK WORKAROUND HERE
|
||||||
|
https://www.reddit.com/r/Python/comments/j0gl8t/psa_pythonlxml_memory_leaks_and_a_solution/
|
||||||
|
|
||||||
:param html_content: string with html content
|
:param html_content: string with html content
|
||||||
:param render_anchor_tag_content: boolean flag indicating whether to extract
|
:param render_anchor_tag_content: boolean flag indicating whether to extract
|
||||||
@@ -207,8 +212,19 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
|
|||||||
else:
|
else:
|
||||||
parser_config = None
|
parser_config = None
|
||||||
|
|
||||||
# get text and annotations via inscriptis
|
|
||||||
text_content = get_text(html_content, config=parser_config)
|
def parse_function(html_content, parser_config, results_queue):
|
||||||
|
from inscriptis import get_text
|
||||||
|
# get text and annotations via inscriptis
|
||||||
|
text_content = get_text(html_content, config=parser_config)
|
||||||
|
results_queue.put(text_content)
|
||||||
|
|
||||||
|
results_queue = multiprocessing.Queue()
|
||||||
|
parse_process = multiprocessing.Process(target=parse_function, args=(html_content, parser_config, results_queue))
|
||||||
|
parse_process.daemon = True
|
||||||
|
parse_process.start()
|
||||||
|
text_content = results_queue.get() # blocks until results are available
|
||||||
|
parse_process.terminate()
|
||||||
|
|
||||||
return text_content
|
return text_content
|
||||||
|
|
||||||
|
|||||||
@@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
|
|||||||
|
|
||||||
# A single update worker
|
# A single update worker
|
||||||
#
|
#
|
||||||
# Requests for checking on a single site(watch) from a queue of watches
|
#
|
||||||
# (another process inserts watches into the queue that are time-ready for checking)
|
|
||||||
|
|
||||||
|
|
||||||
class update_worker(threading.Thread):
|
class update_worker(threading.Thread):
|
||||||
current_uuid = None
|
current_uuid = None
|
||||||
|
|
||||||
def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
|
def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs):
|
||||||
self.q = q
|
self.q = q
|
||||||
|
|
||||||
self.app = app
|
self.app = app
|
||||||
self.notification_q = notification_q
|
self.notification_q = notification_q
|
||||||
self.datastore = datastore
|
self.datastore = datastore
|
||||||
|
self.current_uuid = uuid
|
||||||
super().__init__(*args, **kwargs)
|
super().__init__(*args, **kwargs)
|
||||||
|
self.name = "update_worker"
|
||||||
|
|
||||||
def send_filter_failure_notification(self, uuid):
|
def send_filter_failure_notification(self, uuid):
|
||||||
|
|
||||||
@@ -47,169 +48,170 @@ class update_worker(threading.Thread):
|
|||||||
self.notification_q.put(n_object)
|
self.notification_q.put(n_object)
|
||||||
print("Sent filter not found notification for {}".format(uuid))
|
print("Sent filter not found notification for {}".format(uuid))
|
||||||
|
|
||||||
|
# Pick one job off the list, process it threaded, exist
|
||||||
def run(self):
|
def run(self):
|
||||||
|
# Go talk to the website
|
||||||
|
self.perform_site_update()
|
||||||
|
|
||||||
|
self.current_uuid = None # Done
|
||||||
|
self.q.task_done()
|
||||||
|
|
||||||
|
# Let the thread die after processing 1
|
||||||
|
# We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc)
|
||||||
|
print ("EXITING THREAD!")
|
||||||
|
self.app.config.exit.wait(1)
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def perform_site_update(self):
|
||||||
|
|
||||||
from changedetectionio import fetch_site_status
|
from changedetectionio import fetch_site_status
|
||||||
|
|
||||||
|
if not self.current_uuid in list(self.datastore.data['watching'].keys()):
|
||||||
|
return
|
||||||
|
|
||||||
|
|
||||||
|
changed_detected = False
|
||||||
|
contents = ""
|
||||||
|
screenshot = False
|
||||||
|
update_obj= {}
|
||||||
|
xpath_data = False
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
|
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
|
||||||
|
try:
|
||||||
|
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid)
|
||||||
|
# Re #342
|
||||||
|
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
|
||||||
|
# We then convert/.decode('utf-8') for the notification etc
|
||||||
|
if not isinstance(contents, (bytes, bytearray)):
|
||||||
|
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
|
||||||
|
except PermissionError as e:
|
||||||
|
self.app.logger.error("File permission error updating", self.current_uuid, str(e))
|
||||||
|
except content_fetcher.ReplyWithContentButNoText as e:
|
||||||
|
# Totally fine, it's by choice - just continue on, nothing more to care about
|
||||||
|
# Page had elements/content but no renderable text
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."})
|
||||||
|
except FilterNotFoundInResponse as e:
|
||||||
|
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
|
||||||
|
c = 0
|
||||||
|
if self.datastore.data['watching'].get(self.current_uuid, False):
|
||||||
|
c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5)
|
||||||
|
c += 1
|
||||||
|
|
||||||
while not self.app.config.exit.is_set():
|
# Send notification if we reached the threshold?
|
||||||
|
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
|
||||||
|
print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c))
|
||||||
|
if threshold >0 and c >= threshold:
|
||||||
|
self.send_filter_failure_notification(self.current_uuid)
|
||||||
|
c = 0
|
||||||
|
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
|
||||||
|
'consecutive_filter_failures': c})
|
||||||
|
except content_fetcher.EmptyReply as e:
|
||||||
|
# Some kind of custom to-str handler in the exception handler that does this?
|
||||||
|
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except content_fetcher.ScreenshotUnavailable as e:
|
||||||
|
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except content_fetcher.PageUnloadable as e:
|
||||||
|
err_text = "Page request from server didnt respond correctly"
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except Exception as e:
|
||||||
|
self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
|
||||||
|
|
||||||
|
else:
|
||||||
try:
|
try:
|
||||||
uuid = self.q.get(block=False)
|
watch = self.datastore.data['watching'][self.current_uuid]
|
||||||
except queue.Empty:
|
fname = "" # Saved history text filename
|
||||||
pass
|
|
||||||
|
|
||||||
else:
|
# For the FIRST time we check a site, or a change detected, save the snapshot.
|
||||||
self.current_uuid = uuid
|
if changed_detected or not watch['last_checked']:
|
||||||
|
# A change was detected
|
||||||
|
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
|
||||||
|
|
||||||
if uuid in list(self.datastore.data['watching'].keys()):
|
# Generally update anything interesting returned
|
||||||
|
update_obj['consecutive_filter_failures'] = 0
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj)
|
||||||
|
|
||||||
changed_detected = False
|
# A change was detected
|
||||||
contents = ""
|
if changed_detected:
|
||||||
screenshot = False
|
n_object = {}
|
||||||
update_obj= {}
|
print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url']))
|
||||||
xpath_data = False
|
|
||||||
now = time.time()
|
|
||||||
|
|
||||||
try:
|
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
|
||||||
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid)
|
if watch.history_n >= 2:
|
||||||
# Re #342
|
# Atleast 2, means there really was a change
|
||||||
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)})
|
||||||
# We then convert/.decode('utf-8') for the notification etc
|
|
||||||
if not isinstance(contents, (bytes, bytearray)):
|
|
||||||
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
|
|
||||||
except PermissionError as e:
|
|
||||||
self.app.logger.error("File permission error updating", uuid, str(e))
|
|
||||||
except content_fetcher.ReplyWithContentButNoText as e:
|
|
||||||
# Totally fine, it's by choice - just continue on, nothing more to care about
|
|
||||||
# Page had elements/content but no renderable text
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."})
|
|
||||||
except FilterNotFoundInResponse as e:
|
|
||||||
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
|
|
||||||
c = 0
|
|
||||||
if self.datastore.data['watching'].get(uuid, False):
|
|
||||||
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
|
|
||||||
c += 1
|
|
||||||
|
|
||||||
# Send notification if we reached the threshold?
|
watch_history = watch.history
|
||||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
|
dates = list(watch_history.keys())
|
||||||
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
|
# Theoretically it's possible that this could be just 1 long,
|
||||||
if threshold >0 and c >= threshold:
|
# - In the case that the timestamp key was not unique
|
||||||
self.send_filter_failure_notification(uuid)
|
if len(dates) == 1:
|
||||||
c = 0
|
raise ValueError(
|
||||||
|
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
|
||||||
|
)
|
||||||
|
prev_fname = watch_history[dates[-2]]
|
||||||
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
# Did it have any notification alerts to hit?
|
||||||
'consecutive_filter_failures': c})
|
if len(watch['notification_urls']):
|
||||||
except content_fetcher.EmptyReply as e:
|
print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid))
|
||||||
# Some kind of custom to-str handler in the exception handler that does this?
|
n_object['notification_urls'] = watch['notification_urls']
|
||||||
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
|
n_object['notification_title'] = watch['notification_title']
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
n_object['notification_body'] = watch['notification_body']
|
||||||
'last_check_status': e.status_code})
|
n_object['notification_format'] = watch['notification_format']
|
||||||
except content_fetcher.ScreenshotUnavailable as e:
|
|
||||||
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
except content_fetcher.PageUnloadable as e:
|
|
||||||
err_text = "Page request from server didnt respond correctly"
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
except Exception as e:
|
|
||||||
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
|
||||||
|
|
||||||
else:
|
# No? maybe theres a global setting, queue them all
|
||||||
try:
|
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
||||||
watch = self.datastore.data['watching'][uuid]
|
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid))
|
||||||
fname = "" # Saved history text filename
|
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
||||||
|
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
|
||||||
|
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
|
||||||
|
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
|
||||||
|
else:
|
||||||
|
print(">>> NO notifications queued, watch and global notification URLs were empty.")
|
||||||
|
|
||||||
# For the FIRST time we check a site, or a change detected, save the snapshot.
|
# Only prepare to notify if the rules above matched
|
||||||
if changed_detected or not watch['last_checked']:
|
if 'notification_urls' in n_object:
|
||||||
# A change was detected
|
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
||||||
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
|
if n_object['notification_format'] == 'HTML':
|
||||||
|
line_feed_sep = "</br>"
|
||||||
|
else:
|
||||||
|
line_feed_sep = "\n"
|
||||||
|
|
||||||
# Generally update anything interesting returned
|
from changedetectionio import diff
|
||||||
update_obj['consecutive_filter_failures'] = 0
|
n_object.update({
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
|
'watch_url': watch['url'],
|
||||||
|
'uuid': self.current_uuid,
|
||||||
|
'current_snapshot': contents.decode('utf-8'),
|
||||||
|
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
|
||||||
|
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
|
||||||
|
})
|
||||||
|
|
||||||
# A change was detected
|
self.notification_q.put(n_object)
|
||||||
if changed_detected:
|
|
||||||
n_object = {}
|
|
||||||
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
|
|
||||||
|
|
||||||
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
|
except Exception as e:
|
||||||
if watch.history_n >= 2:
|
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
|
||||||
# Atleast 2, means there really was a change
|
print("!!!! Exception in update_worker !!!\n", e)
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)})
|
self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
|
||||||
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
|
||||||
|
|
||||||
watch_history = watch.history
|
finally:
|
||||||
dates = list(watch_history.keys())
|
# Always record that we atleast tried
|
||||||
# Theoretically it's possible that this could be just 1 long,
|
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3),
|
||||||
# - In the case that the timestamp key was not unique
|
'last_checked': round(time.time())})
|
||||||
if len(dates) == 1:
|
|
||||||
raise ValueError(
|
|
||||||
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
|
|
||||||
)
|
|
||||||
prev_fname = watch_history[dates[-2]]
|
|
||||||
|
|
||||||
# Did it have any notification alerts to hit?
|
# Always save the screenshot if it's available
|
||||||
if len(watch['notification_urls']):
|
if screenshot:
|
||||||
print(">>> Notifications queued for UUID from watch {}".format(uuid))
|
self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot)
|
||||||
n_object['notification_urls'] = watch['notification_urls']
|
if xpath_data:
|
||||||
n_object['notification_title'] = watch['notification_title']
|
self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data)
|
||||||
n_object['notification_body'] = watch['notification_body']
|
|
||||||
n_object['notification_format'] = watch['notification_format']
|
|
||||||
|
|
||||||
# No? maybe theres a global setting, queue them all
|
|
||||||
elif len(self.datastore.data['settings']['application']['notification_urls']):
|
|
||||||
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
|
|
||||||
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
|
|
||||||
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
|
|
||||||
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
|
|
||||||
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
|
|
||||||
else:
|
|
||||||
print(">>> NO notifications queued, watch and global notification URLs were empty.")
|
|
||||||
|
|
||||||
# Only prepare to notify if the rules above matched
|
|
||||||
if 'notification_urls' in n_object:
|
|
||||||
# HTML needs linebreak, but MarkDown and Text can use a linefeed
|
|
||||||
if n_object['notification_format'] == 'HTML':
|
|
||||||
line_feed_sep = "</br>"
|
|
||||||
else:
|
|
||||||
line_feed_sep = "\n"
|
|
||||||
|
|
||||||
from changedetectionio import diff
|
|
||||||
n_object.update({
|
|
||||||
'watch_url': watch['url'],
|
|
||||||
'uuid': uuid,
|
|
||||||
'current_snapshot': contents.decode('utf-8'),
|
|
||||||
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
|
|
||||||
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
|
|
||||||
})
|
|
||||||
|
|
||||||
self.notification_q.put(n_object)
|
|
||||||
|
|
||||||
except Exception as e:
|
|
||||||
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
|
|
||||||
print("!!!! Exception in update_worker !!!\n", e)
|
|
||||||
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
|
||||||
|
|
||||||
finally:
|
|
||||||
# Always record that we atleast tried
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
|
|
||||||
'last_checked': round(time.time())})
|
|
||||||
|
|
||||||
# Always save the screenshot if it's available
|
|
||||||
if screenshot:
|
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot)
|
|
||||||
if xpath_data:
|
|
||||||
self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data)
|
|
||||||
|
|
||||||
|
|
||||||
self.current_uuid = None # Done
|
|
||||||
self.q.task_done()
|
|
||||||
|
|
||||||
# Give the CPU time to interrupt
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
self.app.config.exit.wait(1)
|
|
||||||
|
|||||||
Reference in New Issue
Block a user