mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			python-sli
			...
			threading-
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					9d8558fbc9 | ||
| 
						 | 
					a7a8ba58ed | ||
| 
						 | 
					7823140442 | 
@@ -105,9 +105,10 @@ def init_app_secret(datastore_path):
 | 
			
		||||
# running or something similar.
 | 
			
		||||
@app.template_filter('format_last_checked_time')
 | 
			
		||||
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
 | 
			
		||||
 | 
			
		||||
    # Worker thread tells us which UUID it is currently processing.
 | 
			
		||||
    for t in running_update_threads:
 | 
			
		||||
        if t.current_uuid == watch_obj['uuid']:
 | 
			
		||||
    for t in threading.enumerate():
 | 
			
		||||
        if t.name == 'update_worker' and t.current_uuid == watch_obj['uuid']:
 | 
			
		||||
            return '<span class="loader"></span><span> Checking now</span>'
 | 
			
		||||
 | 
			
		||||
    if watch_obj['last_checked'] == 0:
 | 
			
		||||
@@ -1213,6 +1214,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
 | 
			
		||||
    # @todo handle ctrl break
 | 
			
		||||
    ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
 | 
			
		||||
    threading.Thread(target=ticker_thread_job_queue_processor).start()
 | 
			
		||||
 | 
			
		||||
    threading.Thread(target=notification_runner).start()
 | 
			
		||||
 | 
			
		||||
@@ -1288,25 +1290,63 @@ def notification_runner():
 | 
			
		||||
            # Trim the log length
 | 
			
		||||
            notification_debug_log = notification_debug_log[-100:]
 | 
			
		||||
 | 
			
		||||
# Check the queue, when a job exists, start a fresh thread of update_worker
 | 
			
		||||
def ticker_thread_job_queue_processor():
 | 
			
		||||
 | 
			
		||||
    from changedetectionio import update_worker
 | 
			
		||||
    n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
 | 
			
		||||
 | 
			
		||||
    while not app.config.exit.is_set():
 | 
			
		||||
        time.sleep(0.3)
 | 
			
		||||
 | 
			
		||||
        # Check that some threads are free
 | 
			
		||||
        running = 0
 | 
			
		||||
        for t in threading.enumerate():
 | 
			
		||||
            if t.name == 'update_worker':
 | 
			
		||||
                running += 1
 | 
			
		||||
 | 
			
		||||
        if running >= n_workers:
 | 
			
		||||
            continue
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            uuid = update_q.get(block=False)
 | 
			
		||||
        except queue.Empty:
 | 
			
		||||
            # Go back to waiting for exit and/or another entry from the queue
 | 
			
		||||
            continue
 | 
			
		||||
        print ("Starting a thread fetch")
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            # Launch the update_worker thread that will handle picking items off a queue and sending them off
 | 
			
		||||
            # in the event that playwright or others have a memory leak, this should clean it up better than gc.collect()
 | 
			
		||||
            # (By letting it exit entirely)
 | 
			
		||||
            update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start()
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            print ("Error launching update_worker for UUID {}.".format(uuid))
 | 
			
		||||
            print (str(e))
 | 
			
		||||
 | 
			
		||||
        print ("Running now {}", running)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
 | 
			
		||||
def ticker_thread_check_time_launch_checks():
 | 
			
		||||
    import random
 | 
			
		||||
    from changedetectionio import update_worker
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
 | 
			
		||||
    print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
 | 
			
		||||
 | 
			
		||||
    # Can go in its own function
 | 
			
		||||
 | 
			
		||||
    # Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch
 | 
			
		||||
    # This is to be totally sure that they don't leak memory
 | 
			
		||||
    # Spin up Workers that do the fetching
 | 
			
		||||
    # Can be overriden by ENV or use the default settings
 | 
			
		||||
    n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
 | 
			
		||||
    for _ in range(n_workers):
 | 
			
		||||
        new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
 | 
			
		||||
        running_update_threads.append(new_worker)
 | 
			
		||||
        new_worker.start()
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    while not app.config.exit.is_set():
 | 
			
		||||
 | 
			
		||||
        # Get a list of watches by UUID that are currently fetching data
 | 
			
		||||
        # Update our list of watches by UUID that are currently fetching data, used in the UI
 | 
			
		||||
        running_uuids = []
 | 
			
		||||
        for t in running_update_threads:
 | 
			
		||||
            if t.current_uuid:
 | 
			
		||||
 
 | 
			
		||||
@@ -4,8 +4,6 @@ from typing import List
 | 
			
		||||
from bs4 import BeautifulSoup
 | 
			
		||||
from jsonpath_ng.ext import parse
 | 
			
		||||
import re
 | 
			
		||||
from inscriptis import get_text
 | 
			
		||||
from inscriptis.model.config import ParserConfig
 | 
			
		||||
 | 
			
		||||
class FilterNotFoundInResponse(ValueError):
 | 
			
		||||
    def __init__(self, msg):
 | 
			
		||||
@@ -183,9 +181,16 @@ def strip_ignore_text(content, wordlist, mode="content"):
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
 | 
			
		||||
    import multiprocessing
 | 
			
		||||
 | 
			
		||||
    from inscriptis.model.config import ParserConfig
 | 
			
		||||
 | 
			
		||||
    """Converts html string to a string with just the text. If ignoring
 | 
			
		||||
    rendering anchor tag content is enable, anchor tag content are also
 | 
			
		||||
    included in the text
 | 
			
		||||
    
 | 
			
		||||
    @NOTE: HORRIBLE LXML INDUCED MEMORY LEAK WORKAROUND HERE 
 | 
			
		||||
           https://www.reddit.com/r/Python/comments/j0gl8t/psa_pythonlxml_memory_leaks_and_a_solution/ 
 | 
			
		||||
 | 
			
		||||
    :param html_content: string with html content
 | 
			
		||||
    :param render_anchor_tag_content: boolean flag indicating whether to extract
 | 
			
		||||
@@ -207,8 +212,19 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
 | 
			
		||||
    else:
 | 
			
		||||
        parser_config = None
 | 
			
		||||
 | 
			
		||||
    # get text and annotations via inscriptis
 | 
			
		||||
    text_content = get_text(html_content, config=parser_config)
 | 
			
		||||
 | 
			
		||||
    def parse_function(html_content, parser_config, results_queue):
 | 
			
		||||
        from inscriptis import get_text
 | 
			
		||||
        # get text and annotations via inscriptis
 | 
			
		||||
        text_content = get_text(html_content, config=parser_config)
 | 
			
		||||
        results_queue.put(text_content)
 | 
			
		||||
 | 
			
		||||
    results_queue = multiprocessing.Queue()
 | 
			
		||||
    parse_process = multiprocessing.Process(target=parse_function, args=(html_content, parser_config, results_queue))
 | 
			
		||||
    parse_process.daemon = True
 | 
			
		||||
    parse_process.start()
 | 
			
		||||
    text_content = results_queue.get()  # blocks until results are available
 | 
			
		||||
    parse_process.terminate()
 | 
			
		||||
 | 
			
		||||
    return text_content
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
@@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
 | 
			
		||||
 | 
			
		||||
# A single update worker
 | 
			
		||||
#
 | 
			
		||||
# Requests for checking on a single site(watch) from a queue of watches
 | 
			
		||||
# (another process inserts watches into the queue that are time-ready for checking)
 | 
			
		||||
 | 
			
		||||
#
 | 
			
		||||
 | 
			
		||||
class update_worker(threading.Thread):
 | 
			
		||||
    current_uuid = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
 | 
			
		||||
    def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs):
 | 
			
		||||
        self.q = q
 | 
			
		||||
 | 
			
		||||
        self.app = app
 | 
			
		||||
        self.notification_q = notification_q
 | 
			
		||||
        self.datastore = datastore
 | 
			
		||||
        self.current_uuid = uuid
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
        self.name = "update_worker"
 | 
			
		||||
 | 
			
		||||
    def send_filter_failure_notification(self, uuid):
 | 
			
		||||
 | 
			
		||||
@@ -47,169 +48,170 @@ class update_worker(threading.Thread):
 | 
			
		||||
            self.notification_q.put(n_object)
 | 
			
		||||
            print("Sent filter not found notification for {}".format(uuid))
 | 
			
		||||
 | 
			
		||||
    # Pick one job off the list, process it threaded, exist
 | 
			
		||||
    def run(self):
 | 
			
		||||
        # Go talk to the website
 | 
			
		||||
        self.perform_site_update()
 | 
			
		||||
 | 
			
		||||
        self.current_uuid = None  # Done
 | 
			
		||||
        self.q.task_done()
 | 
			
		||||
 | 
			
		||||
        # Let the thread die after processing 1
 | 
			
		||||
        # We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc)
 | 
			
		||||
        print ("EXITING THREAD!")
 | 
			
		||||
        self.app.config.exit.wait(1)
 | 
			
		||||
        return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def perform_site_update(self):
 | 
			
		||||
 | 
			
		||||
        from changedetectionio import fetch_site_status
 | 
			
		||||
 | 
			
		||||
        if not self.current_uuid in list(self.datastore.data['watching'].keys()):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        changed_detected = False
 | 
			
		||||
        contents = ""
 | 
			
		||||
        screenshot = False
 | 
			
		||||
        update_obj= {}
 | 
			
		||||
        xpath_data = False
 | 
			
		||||
        now = time.time()
 | 
			
		||||
 | 
			
		||||
        update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
 | 
			
		||||
        try:
 | 
			
		||||
            changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid)
 | 
			
		||||
            # Re #342
 | 
			
		||||
            # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
 | 
			
		||||
            # We then convert/.decode('utf-8') for the notification etc
 | 
			
		||||
            if not isinstance(contents, (bytes, bytearray)):
 | 
			
		||||
                raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
 | 
			
		||||
        except PermissionError as e:
 | 
			
		||||
            self.app.logger.error("File permission error updating", self.current_uuid, str(e))
 | 
			
		||||
        except content_fetcher.ReplyWithContentButNoText as e:
 | 
			
		||||
            # Totally fine, it's by choice - just continue on, nothing more to care about
 | 
			
		||||
            # Page had elements/content but no renderable text
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."})
 | 
			
		||||
        except FilterNotFoundInResponse as e:
 | 
			
		||||
            err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
 | 
			
		||||
            c = 0
 | 
			
		||||
            if self.datastore.data['watching'].get(self.current_uuid, False):
 | 
			
		||||
                c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5)
 | 
			
		||||
            c += 1
 | 
			
		||||
 | 
			
		||||
        while not self.app.config.exit.is_set():
 | 
			
		||||
            # Send notification if we reached the threshold?
 | 
			
		||||
            threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
 | 
			
		||||
            print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c))
 | 
			
		||||
            if threshold >0 and c >= threshold:
 | 
			
		||||
                self.send_filter_failure_notification(self.current_uuid)
 | 
			
		||||
                c = 0
 | 
			
		||||
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                               'consecutive_filter_failures': c})
 | 
			
		||||
        except content_fetcher.EmptyReply as e:
 | 
			
		||||
            # Some kind of custom to-str handler in the exception handler that does this?
 | 
			
		||||
            err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                               'last_check_status': e.status_code})
 | 
			
		||||
        except content_fetcher.ScreenshotUnavailable as e:
 | 
			
		||||
            err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                               'last_check_status': e.status_code})
 | 
			
		||||
        except content_fetcher.PageUnloadable as e:
 | 
			
		||||
            err_text = "Page request from server didnt respond correctly"
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                               'last_check_status': e.status_code})
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            try:
 | 
			
		||||
                uuid = self.q.get(block=False)
 | 
			
		||||
            except queue.Empty:
 | 
			
		||||
                pass
 | 
			
		||||
                watch = self.datastore.data['watching'][self.current_uuid]
 | 
			
		||||
                fname = "" # Saved history text filename
 | 
			
		||||
 | 
			
		||||
            else:
 | 
			
		||||
                self.current_uuid = uuid
 | 
			
		||||
                # For the FIRST time we check a site, or a change detected, save the snapshot.
 | 
			
		||||
                if changed_detected or not watch['last_checked']:
 | 
			
		||||
                    # A change was detected
 | 
			
		||||
                    fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
 | 
			
		||||
 | 
			
		||||
                if uuid in list(self.datastore.data['watching'].keys()):
 | 
			
		||||
                # Generally update anything interesting returned
 | 
			
		||||
                update_obj['consecutive_filter_failures'] = 0
 | 
			
		||||
                self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj)
 | 
			
		||||
 | 
			
		||||
                    changed_detected = False
 | 
			
		||||
                    contents = ""
 | 
			
		||||
                    screenshot = False
 | 
			
		||||
                    update_obj= {}
 | 
			
		||||
                    xpath_data = False
 | 
			
		||||
                    now = time.time()
 | 
			
		||||
                # A change was detected
 | 
			
		||||
                if changed_detected:
 | 
			
		||||
                    n_object = {}
 | 
			
		||||
                    print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url']))
 | 
			
		||||
 | 
			
		||||
                    try:
 | 
			
		||||
                        changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid)
 | 
			
		||||
                        # Re #342
 | 
			
		||||
                        # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
 | 
			
		||||
                        # We then convert/.decode('utf-8') for the notification etc
 | 
			
		||||
                        if not isinstance(contents, (bytes, bytearray)):
 | 
			
		||||
                            raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
 | 
			
		||||
                    except PermissionError as e:
 | 
			
		||||
                        self.app.logger.error("File permission error updating", uuid, str(e))
 | 
			
		||||
                    except content_fetcher.ReplyWithContentButNoText as e:
 | 
			
		||||
                        # Totally fine, it's by choice - just continue on, nothing more to care about
 | 
			
		||||
                        # Page had elements/content but no renderable text
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."})
 | 
			
		||||
                    except FilterNotFoundInResponse as e:
 | 
			
		||||
                        err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
 | 
			
		||||
                        c = 0
 | 
			
		||||
                        if self.datastore.data['watching'].get(uuid, False):
 | 
			
		||||
                            c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
 | 
			
		||||
                        c += 1
 | 
			
		||||
                    # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
 | 
			
		||||
                    if watch.history_n >= 2:
 | 
			
		||||
                        # Atleast 2, means there really was a change
 | 
			
		||||
                        self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)})
 | 
			
		||||
 | 
			
		||||
                        # Send notification if we reached the threshold?
 | 
			
		||||
                        threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
 | 
			
		||||
                        print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
 | 
			
		||||
                        if threshold >0 and c >= threshold:
 | 
			
		||||
                            self.send_filter_failure_notification(uuid)
 | 
			
		||||
                            c = 0
 | 
			
		||||
                        watch_history = watch.history
 | 
			
		||||
                        dates = list(watch_history.keys())
 | 
			
		||||
                        # Theoretically it's possible that this could be just 1 long,
 | 
			
		||||
                        # - In the case that the timestamp key was not unique
 | 
			
		||||
                        if len(dates) == 1:
 | 
			
		||||
                            raise ValueError(
 | 
			
		||||
                                "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
 | 
			
		||||
                            )
 | 
			
		||||
                        prev_fname = watch_history[dates[-2]]
 | 
			
		||||
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'consecutive_filter_failures': c})
 | 
			
		||||
                    except content_fetcher.EmptyReply as e:
 | 
			
		||||
                        # Some kind of custom to-str handler in the exception handler that does this?
 | 
			
		||||
                        err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except content_fetcher.ScreenshotUnavailable as e:
 | 
			
		||||
                        err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except content_fetcher.PageUnloadable as e:
 | 
			
		||||
                        err_text = "Page request from server didnt respond correctly"
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
                        # Did it have any notification alerts to hit?
 | 
			
		||||
                        if len(watch['notification_urls']):
 | 
			
		||||
                            print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid))
 | 
			
		||||
                            n_object['notification_urls'] = watch['notification_urls']
 | 
			
		||||
                            n_object['notification_title'] = watch['notification_title']
 | 
			
		||||
                            n_object['notification_body'] = watch['notification_body']
 | 
			
		||||
                            n_object['notification_format'] = watch['notification_format']
 | 
			
		||||
 | 
			
		||||
                    else:
 | 
			
		||||
                        try:
 | 
			
		||||
                            watch = self.datastore.data['watching'][uuid]
 | 
			
		||||
                            fname = "" # Saved history text filename
 | 
			
		||||
                        # No? maybe theres a global setting, queue them all
 | 
			
		||||
                        elif len(self.datastore.data['settings']['application']['notification_urls']):
 | 
			
		||||
                            print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid))
 | 
			
		||||
                            n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
 | 
			
		||||
                            n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
 | 
			
		||||
                            n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
 | 
			
		||||
                            n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
 | 
			
		||||
                        else:
 | 
			
		||||
                            print(">>> NO notifications queued, watch and global notification URLs were empty.")
 | 
			
		||||
 | 
			
		||||
                            # For the FIRST time we check a site, or a change detected, save the snapshot.
 | 
			
		||||
                            if changed_detected or not watch['last_checked']:
 | 
			
		||||
                                # A change was detected
 | 
			
		||||
                                fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
 | 
			
		||||
                        # Only prepare to notify if the rules above matched
 | 
			
		||||
                        if 'notification_urls' in n_object:
 | 
			
		||||
                            # HTML needs linebreak, but MarkDown and Text can use a linefeed
 | 
			
		||||
                            if n_object['notification_format'] == 'HTML':
 | 
			
		||||
                                line_feed_sep = "</br>"
 | 
			
		||||
                            else:
 | 
			
		||||
                                line_feed_sep = "\n"
 | 
			
		||||
 | 
			
		||||
                            # Generally update anything interesting returned
 | 
			
		||||
                            update_obj['consecutive_filter_failures'] = 0
 | 
			
		||||
                            self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
 | 
			
		||||
                            from changedetectionio import diff
 | 
			
		||||
                            n_object.update({
 | 
			
		||||
                                'watch_url': watch['url'],
 | 
			
		||||
                                'uuid': self.current_uuid,
 | 
			
		||||
                                'current_snapshot': contents.decode('utf-8'),
 | 
			
		||||
                                'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
 | 
			
		||||
                                'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
 | 
			
		||||
                            })
 | 
			
		||||
 | 
			
		||||
                            # A change was detected
 | 
			
		||||
                            if changed_detected:
 | 
			
		||||
                                n_object = {}
 | 
			
		||||
                                print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
 | 
			
		||||
                            self.notification_q.put(n_object)
 | 
			
		||||
 | 
			
		||||
                                # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
 | 
			
		||||
                                if watch.history_n >= 2:
 | 
			
		||||
                                    # Atleast 2, means there really was a change
 | 
			
		||||
                                    self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)})
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
 | 
			
		||||
                print("!!!! Exception in update_worker !!!\n", e)
 | 
			
		||||
                self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
 | 
			
		||||
                self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
 | 
			
		||||
                                    watch_history = watch.history
 | 
			
		||||
                                    dates = list(watch_history.keys())
 | 
			
		||||
                                    # Theoretically it's possible that this could be just 1 long,
 | 
			
		||||
                                    # - In the case that the timestamp key was not unique
 | 
			
		||||
                                    if len(dates) == 1:
 | 
			
		||||
                                        raise ValueError(
 | 
			
		||||
                                            "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
 | 
			
		||||
                                        )
 | 
			
		||||
                                    prev_fname = watch_history[dates[-2]]
 | 
			
		||||
        finally:
 | 
			
		||||
            # Always record that we atleast tried
 | 
			
		||||
            self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3),
 | 
			
		||||
                                                               'last_checked': round(time.time())})
 | 
			
		||||
 | 
			
		||||
                                    # Did it have any notification alerts to hit?
 | 
			
		||||
                                    if len(watch['notification_urls']):
 | 
			
		||||
                                        print(">>> Notifications queued for UUID from watch {}".format(uuid))
 | 
			
		||||
                                        n_object['notification_urls'] = watch['notification_urls']
 | 
			
		||||
                                        n_object['notification_title'] = watch['notification_title']
 | 
			
		||||
                                        n_object['notification_body'] = watch['notification_body']
 | 
			
		||||
                                        n_object['notification_format'] = watch['notification_format']
 | 
			
		||||
 | 
			
		||||
                                    # No? maybe theres a global setting, queue them all
 | 
			
		||||
                                    elif len(self.datastore.data['settings']['application']['notification_urls']):
 | 
			
		||||
                                        print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
 | 
			
		||||
                                        n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
 | 
			
		||||
                                        n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
 | 
			
		||||
                                        n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
 | 
			
		||||
                                        n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
 | 
			
		||||
                                    else:
 | 
			
		||||
                                        print(">>> NO notifications queued, watch and global notification URLs were empty.")
 | 
			
		||||
 | 
			
		||||
                                    # Only prepare to notify if the rules above matched
 | 
			
		||||
                                    if 'notification_urls' in n_object:
 | 
			
		||||
                                        # HTML needs linebreak, but MarkDown and Text can use a linefeed
 | 
			
		||||
                                        if n_object['notification_format'] == 'HTML':
 | 
			
		||||
                                            line_feed_sep = "</br>"
 | 
			
		||||
                                        else:
 | 
			
		||||
                                            line_feed_sep = "\n"
 | 
			
		||||
 | 
			
		||||
                                        from changedetectionio import diff
 | 
			
		||||
                                        n_object.update({
 | 
			
		||||
                                            'watch_url': watch['url'],
 | 
			
		||||
                                            'uuid': uuid,
 | 
			
		||||
                                            'current_snapshot': contents.decode('utf-8'),
 | 
			
		||||
                                            'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
 | 
			
		||||
                                            'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
 | 
			
		||||
                                        })
 | 
			
		||||
 | 
			
		||||
                                        self.notification_q.put(n_object)
 | 
			
		||||
 | 
			
		||||
                        except Exception as e:
 | 
			
		||||
                            # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
 | 
			
		||||
                            print("!!!! Exception in update_worker !!!\n", e)
 | 
			
		||||
                            self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                            self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
 | 
			
		||||
                    finally:
 | 
			
		||||
                        # Always record that we atleast tried
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
 | 
			
		||||
                                                                           'last_checked': round(time.time())})
 | 
			
		||||
 | 
			
		||||
                        # Always save the screenshot if it's available
 | 
			
		||||
                        if screenshot:
 | 
			
		||||
                            self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot)
 | 
			
		||||
                        if xpath_data:
 | 
			
		||||
                            self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data)
 | 
			
		||||
            # Always save the screenshot if it's available
 | 
			
		||||
            if screenshot:
 | 
			
		||||
                self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot)
 | 
			
		||||
            if xpath_data:
 | 
			
		||||
                self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                self.current_uuid = None  # Done
 | 
			
		||||
                self.q.task_done()
 | 
			
		||||
 | 
			
		||||
                # Give the CPU time to interrupt
 | 
			
		||||
                time.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
            self.app.config.exit.wait(1)
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user