mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-30 22:27:52 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			crash-prot
			...
			threading-
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 9d8558fbc9 | ||
|   | a7a8ba58ed | ||
|   | 7823140442 | 
| @@ -105,9 +105,10 @@ def init_app_secret(datastore_path): | ||||
| # running or something similar. | ||||
| @app.template_filter('format_last_checked_time') | ||||
| def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): | ||||
|  | ||||
|     # Worker thread tells us which UUID it is currently processing. | ||||
|     for t in running_update_threads: | ||||
|         if t.current_uuid == watch_obj['uuid']: | ||||
|     for t in threading.enumerate(): | ||||
|         if t.name == 'update_worker' and t.current_uuid == watch_obj['uuid']: | ||||
|             return '<span class="loader"></span><span> Checking now</span>' | ||||
|  | ||||
|     if watch_obj['last_checked'] == 0: | ||||
| @@ -1213,6 +1214,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|  | ||||
|     # @todo handle ctrl break | ||||
|     ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() | ||||
|     threading.Thread(target=ticker_thread_job_queue_processor).start() | ||||
|  | ||||
|     threading.Thread(target=notification_runner).start() | ||||
|  | ||||
| @@ -1288,25 +1290,63 @@ def notification_runner(): | ||||
|             # Trim the log length | ||||
|             notification_debug_log = notification_debug_log[-100:] | ||||
|  | ||||
| # Check the queue, when a job exists, start a fresh thread of update_worker | ||||
| def ticker_thread_job_queue_processor(): | ||||
|  | ||||
|     from changedetectionio import update_worker | ||||
|     n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) | ||||
|  | ||||
|     while not app.config.exit.is_set(): | ||||
|         time.sleep(0.3) | ||||
|  | ||||
|         # Check that some threads are free | ||||
|         running = 0 | ||||
|         for t in threading.enumerate(): | ||||
|             if t.name == 'update_worker': | ||||
|                 running += 1 | ||||
|  | ||||
|         if running >= n_workers: | ||||
|             continue | ||||
|  | ||||
|         try: | ||||
|             uuid = update_q.get(block=False) | ||||
|         except queue.Empty: | ||||
|             # Go back to waiting for exit and/or another entry from the queue | ||||
|             continue | ||||
|         print ("Starting a thread fetch") | ||||
|  | ||||
|  | ||||
|         try: | ||||
|             # Launch the update_worker thread that will handle picking items off a queue and sending them off | ||||
|             # in the event that playwright or others have a memory leak, this should clean it up better than gc.collect() | ||||
|             # (By letting it exit entirely) | ||||
|             update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start() | ||||
|         except Exception as e: | ||||
|             print ("Error launching update_worker for UUID {}.".format(uuid)) | ||||
|             print (str(e)) | ||||
|  | ||||
|         print ("Running now {}", running) | ||||
|  | ||||
|  | ||||
| # Thread runner to check every minute, look for new watches to feed into the Queue. | ||||
| def ticker_thread_check_time_launch_checks(): | ||||
|     import random | ||||
|     from changedetectionio import update_worker | ||||
|  | ||||
|  | ||||
|     recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) | ||||
|     print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) | ||||
|  | ||||
|     # Can go in its own function | ||||
|  | ||||
|     # Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch | ||||
|     # This is to be totally sure that they don't leak memory | ||||
|     # Spin up Workers that do the fetching | ||||
|     # Can be overriden by ENV or use the default settings | ||||
|     n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) | ||||
|     for _ in range(n_workers): | ||||
|         new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) | ||||
|         running_update_threads.append(new_worker) | ||||
|         new_worker.start() | ||||
|  | ||||
|  | ||||
|     while not app.config.exit.is_set(): | ||||
|  | ||||
|         # Get a list of watches by UUID that are currently fetching data | ||||
|         # Update our list of watches by UUID that are currently fetching data, used in the UI | ||||
|         running_uuids = [] | ||||
|         for t in running_update_threads: | ||||
|             if t.current_uuid: | ||||
|   | ||||
| @@ -4,8 +4,6 @@ from typing import List | ||||
| from bs4 import BeautifulSoup | ||||
| from jsonpath_ng.ext import parse | ||||
| import re | ||||
| from inscriptis import get_text | ||||
| from inscriptis.model.config import ParserConfig | ||||
|  | ||||
| class FilterNotFoundInResponse(ValueError): | ||||
|     def __init__(self, msg): | ||||
| @@ -183,10 +181,17 @@ def strip_ignore_text(content, wordlist, mode="content"): | ||||
|  | ||||
|  | ||||
| def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: | ||||
|     import multiprocessing | ||||
|  | ||||
|     from inscriptis.model.config import ParserConfig | ||||
|  | ||||
|     """Converts html string to a string with just the text. If ignoring | ||||
|     rendering anchor tag content is enable, anchor tag content are also | ||||
|     included in the text | ||||
|      | ||||
|     @NOTE: HORRIBLE LXML INDUCED MEMORY LEAK WORKAROUND HERE  | ||||
|            https://www.reddit.com/r/Python/comments/j0gl8t/psa_pythonlxml_memory_leaks_and_a_solution/  | ||||
|  | ||||
|     :param html_content: string with html content | ||||
|     :param render_anchor_tag_content: boolean flag indicating whether to extract | ||||
|     hyperlinks (the anchor tag content) together with text. This refers to the | ||||
| @@ -207,8 +212,19 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: | ||||
|     else: | ||||
|         parser_config = None | ||||
|  | ||||
|     # get text and annotations via inscriptis | ||||
|     text_content = get_text(html_content, config=parser_config) | ||||
|  | ||||
|     def parse_function(html_content, parser_config, results_queue): | ||||
|         from inscriptis import get_text | ||||
|         # get text and annotations via inscriptis | ||||
|         text_content = get_text(html_content, config=parser_config) | ||||
|         results_queue.put(text_content) | ||||
|  | ||||
|     results_queue = multiprocessing.Queue() | ||||
|     parse_process = multiprocessing.Process(target=parse_function, args=(html_content, parser_config, results_queue)) | ||||
|     parse_process.daemon = True | ||||
|     parse_process.start() | ||||
|     text_content = results_queue.get()  # blocks until results are available | ||||
|     parse_process.terminate() | ||||
|  | ||||
|     return text_content | ||||
|  | ||||
|   | ||||
| @@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse | ||||
|  | ||||
| # A single update worker | ||||
| # | ||||
| # Requests for checking on a single site(watch) from a queue of watches | ||||
| # (another process inserts watches into the queue that are time-ready for checking) | ||||
|  | ||||
| # | ||||
|  | ||||
| class update_worker(threading.Thread): | ||||
|     current_uuid = None | ||||
|  | ||||
|     def __init__(self, q, notification_q, app, datastore, *args, **kwargs): | ||||
|     def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs): | ||||
|         self.q = q | ||||
|  | ||||
|         self.app = app | ||||
|         self.notification_q = notification_q | ||||
|         self.datastore = datastore | ||||
|         self.current_uuid = uuid | ||||
|         super().__init__(*args, **kwargs) | ||||
|         self.name = "update_worker" | ||||
|  | ||||
|     def send_filter_failure_notification(self, uuid): | ||||
|  | ||||
| @@ -47,169 +48,170 @@ class update_worker(threading.Thread): | ||||
|             self.notification_q.put(n_object) | ||||
|             print("Sent filter not found notification for {}".format(uuid)) | ||||
|  | ||||
|     # Pick one job off the list, process it threaded, exist | ||||
|     def run(self): | ||||
|         # Go talk to the website | ||||
|         self.perform_site_update() | ||||
|  | ||||
|         self.current_uuid = None  # Done | ||||
|         self.q.task_done() | ||||
|  | ||||
|         # Let the thread die after processing 1 | ||||
|         # We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc) | ||||
|         print ("EXITING THREAD!") | ||||
|         self.app.config.exit.wait(1) | ||||
|         return | ||||
|  | ||||
|  | ||||
|  | ||||
|     def perform_site_update(self): | ||||
|  | ||||
|         from changedetectionio import fetch_site_status | ||||
|  | ||||
|         if not self.current_uuid in list(self.datastore.data['watching'].keys()): | ||||
|             return | ||||
|  | ||||
|  | ||||
|         changed_detected = False | ||||
|         contents = "" | ||||
|         screenshot = False | ||||
|         update_obj= {} | ||||
|         xpath_data = False | ||||
|         now = time.time() | ||||
|  | ||||
|         update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) | ||||
|         try: | ||||
|             changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid) | ||||
|             # Re #342 | ||||
|             # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. | ||||
|             # We then convert/.decode('utf-8') for the notification etc | ||||
|             if not isinstance(contents, (bytes, bytearray)): | ||||
|                 raise Exception("Error - returned data from the fetch handler SHOULD be bytes") | ||||
|         except PermissionError as e: | ||||
|             self.app.logger.error("File permission error updating", self.current_uuid, str(e)) | ||||
|         except content_fetcher.ReplyWithContentButNoText as e: | ||||
|             # Totally fine, it's by choice - just continue on, nothing more to care about | ||||
|             # Page had elements/content but no renderable text | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."}) | ||||
|         except FilterNotFoundInResponse as e: | ||||
|             err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e)) | ||||
|             c = 0 | ||||
|             if self.datastore.data['watching'].get(self.current_uuid, False): | ||||
|                 c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5) | ||||
|             c += 1 | ||||
|  | ||||
|         while not self.app.config.exit.is_set(): | ||||
|             # Send notification if we reached the threshold? | ||||
|             threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) | ||||
|             print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c)) | ||||
|             if threshold >0 and c >= threshold: | ||||
|                 self.send_filter_failure_notification(self.current_uuid) | ||||
|                 c = 0 | ||||
|  | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text, | ||||
|                                                                'consecutive_filter_failures': c}) | ||||
|         except content_fetcher.EmptyReply as e: | ||||
|             # Some kind of custom to-str handler in the exception handler that does this? | ||||
|             err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text, | ||||
|                                                                'last_check_status': e.status_code}) | ||||
|         except content_fetcher.ScreenshotUnavailable as e: | ||||
|             err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text, | ||||
|                                                                'last_check_status': e.status_code}) | ||||
|         except content_fetcher.PageUnloadable as e: | ||||
|             err_text = "Page request from server didnt respond correctly" | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text, | ||||
|                                                                'last_check_status': e.status_code}) | ||||
|         except Exception as e: | ||||
|             self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e)) | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)}) | ||||
|  | ||||
|         else: | ||||
|             try: | ||||
|                 uuid = self.q.get(block=False) | ||||
|             except queue.Empty: | ||||
|                 pass | ||||
|                 watch = self.datastore.data['watching'][self.current_uuid] | ||||
|                 fname = "" # Saved history text filename | ||||
|  | ||||
|             else: | ||||
|                 self.current_uuid = uuid | ||||
|                 # For the FIRST time we check a site, or a change detected, save the snapshot. | ||||
|                 if changed_detected or not watch['last_checked']: | ||||
|                     # A change was detected | ||||
|                     fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) | ||||
|  | ||||
|                 if uuid in list(self.datastore.data['watching'].keys()): | ||||
|                 # Generally update anything interesting returned | ||||
|                 update_obj['consecutive_filter_failures'] = 0 | ||||
|                 self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj) | ||||
|  | ||||
|                     changed_detected = False | ||||
|                     contents = "" | ||||
|                     screenshot = False | ||||
|                     update_obj= {} | ||||
|                     xpath_data = False | ||||
|                     now = time.time() | ||||
|                 # A change was detected | ||||
|                 if changed_detected: | ||||
|                     n_object = {} | ||||
|                     print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url'])) | ||||
|  | ||||
|                     try: | ||||
|                         changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid) | ||||
|                         # Re #342 | ||||
|                         # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. | ||||
|                         # We then convert/.decode('utf-8') for the notification etc | ||||
|                         if not isinstance(contents, (bytes, bytearray)): | ||||
|                             raise Exception("Error - returned data from the fetch handler SHOULD be bytes") | ||||
|                     except PermissionError as e: | ||||
|                         self.app.logger.error("File permission error updating", uuid, str(e)) | ||||
|                     except content_fetcher.ReplyWithContentButNoText as e: | ||||
|                         # Totally fine, it's by choice - just continue on, nothing more to care about | ||||
|                         # Page had elements/content but no renderable text | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."}) | ||||
|                     except FilterNotFoundInResponse as e: | ||||
|                         err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e)) | ||||
|                         c = 0 | ||||
|                         if self.datastore.data['watching'].get(uuid, False): | ||||
|                             c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) | ||||
|                         c += 1 | ||||
|                     # Notifications should only trigger on the second time (first time, we gather the initial snapshot) | ||||
|                     if watch.history_n >= 2: | ||||
|                         # Atleast 2, means there really was a change | ||||
|                         self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)}) | ||||
|  | ||||
|                         # Send notification if we reached the threshold? | ||||
|                         threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) | ||||
|                         print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) | ||||
|                         if threshold >0 and c >= threshold: | ||||
|                             self.send_filter_failure_notification(uuid) | ||||
|                             c = 0 | ||||
|                         watch_history = watch.history | ||||
|                         dates = list(watch_history.keys()) | ||||
|                         # Theoretically it's possible that this could be just 1 long, | ||||
|                         # - In the case that the timestamp key was not unique | ||||
|                         if len(dates) == 1: | ||||
|                             raise ValueError( | ||||
|                                 "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?" | ||||
|                             ) | ||||
|                         prev_fname = watch_history[dates[-2]] | ||||
|  | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'consecutive_filter_failures': c}) | ||||
|                     except content_fetcher.EmptyReply as e: | ||||
|                         # Some kind of custom to-str handler in the exception handler that does this? | ||||
|                         err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except content_fetcher.ScreenshotUnavailable as e: | ||||
|                         err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except content_fetcher.PageUnloadable as e: | ||||
|                         err_text = "Page request from server didnt respond correctly" | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except Exception as e: | ||||
|                         self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|                         # Did it have any notification alerts to hit? | ||||
|                         if len(watch['notification_urls']): | ||||
|                             print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid)) | ||||
|                             n_object['notification_urls'] = watch['notification_urls'] | ||||
|                             n_object['notification_title'] = watch['notification_title'] | ||||
|                             n_object['notification_body'] = watch['notification_body'] | ||||
|                             n_object['notification_format'] = watch['notification_format'] | ||||
|  | ||||
|                     else: | ||||
|                         try: | ||||
|                             watch = self.datastore.data['watching'][uuid] | ||||
|                             fname = "" # Saved history text filename | ||||
|                         # No? maybe theres a global setting, queue them all | ||||
|                         elif len(self.datastore.data['settings']['application']['notification_urls']): | ||||
|                             print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid)) | ||||
|                             n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] | ||||
|                             n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title'] | ||||
|                             n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body'] | ||||
|                             n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format'] | ||||
|                         else: | ||||
|                             print(">>> NO notifications queued, watch and global notification URLs were empty.") | ||||
|  | ||||
|                             # For the FIRST time we check a site, or a change detected, save the snapshot. | ||||
|                             if changed_detected or not watch['last_checked']: | ||||
|                                 # A change was detected | ||||
|                                 fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) | ||||
|                         # Only prepare to notify if the rules above matched | ||||
|                         if 'notification_urls' in n_object: | ||||
|                             # HTML needs linebreak, but MarkDown and Text can use a linefeed | ||||
|                             if n_object['notification_format'] == 'HTML': | ||||
|                                 line_feed_sep = "</br>" | ||||
|                             else: | ||||
|                                 line_feed_sep = "\n" | ||||
|  | ||||
|                             # Generally update anything interesting returned | ||||
|                             update_obj['consecutive_filter_failures'] = 0 | ||||
|                             self.datastore.update_watch(uuid=uuid, update_obj=update_obj) | ||||
|                             from changedetectionio import diff | ||||
|                             n_object.update({ | ||||
|                                 'watch_url': watch['url'], | ||||
|                                 'uuid': self.current_uuid, | ||||
|                                 'current_snapshot': contents.decode('utf-8'), | ||||
|                                 'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep), | ||||
|                                 'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep) | ||||
|                             }) | ||||
|  | ||||
|                             # A change was detected | ||||
|                             if changed_detected: | ||||
|                                 n_object = {} | ||||
|                                 print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) | ||||
|                             self.notification_q.put(n_object) | ||||
|  | ||||
|                                 # Notifications should only trigger on the second time (first time, we gather the initial snapshot) | ||||
|                                 if watch.history_n >= 2: | ||||
|                                     # Atleast 2, means there really was a change | ||||
|                                     self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)}) | ||||
|             except Exception as e: | ||||
|                 # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! | ||||
|                 print("!!!! Exception in update_worker !!!\n", e) | ||||
|                 self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e)) | ||||
|                 self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)}) | ||||
|  | ||||
|                                     watch_history = watch.history | ||||
|                                     dates = list(watch_history.keys()) | ||||
|                                     # Theoretically it's possible that this could be just 1 long, | ||||
|                                     # - In the case that the timestamp key was not unique | ||||
|                                     if len(dates) == 1: | ||||
|                                         raise ValueError( | ||||
|                                             "History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?" | ||||
|                                         ) | ||||
|                                     prev_fname = watch_history[dates[-2]] | ||||
|         finally: | ||||
|             # Always record that we atleast tried | ||||
|             self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3), | ||||
|                                                                'last_checked': round(time.time())}) | ||||
|  | ||||
|                                     # Did it have any notification alerts to hit? | ||||
|                                     if len(watch['notification_urls']): | ||||
|                                         print(">>> Notifications queued for UUID from watch {}".format(uuid)) | ||||
|                                         n_object['notification_urls'] = watch['notification_urls'] | ||||
|                                         n_object['notification_title'] = watch['notification_title'] | ||||
|                                         n_object['notification_body'] = watch['notification_body'] | ||||
|                                         n_object['notification_format'] = watch['notification_format'] | ||||
|  | ||||
|                                     # No? maybe theres a global setting, queue them all | ||||
|                                     elif len(self.datastore.data['settings']['application']['notification_urls']): | ||||
|                                         print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid)) | ||||
|                                         n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] | ||||
|                                         n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title'] | ||||
|                                         n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body'] | ||||
|                                         n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format'] | ||||
|                                     else: | ||||
|                                         print(">>> NO notifications queued, watch and global notification URLs were empty.") | ||||
|  | ||||
|                                     # Only prepare to notify if the rules above matched | ||||
|                                     if 'notification_urls' in n_object: | ||||
|                                         # HTML needs linebreak, but MarkDown and Text can use a linefeed | ||||
|                                         if n_object['notification_format'] == 'HTML': | ||||
|                                             line_feed_sep = "</br>" | ||||
|                                         else: | ||||
|                                             line_feed_sep = "\n" | ||||
|  | ||||
|                                         from changedetectionio import diff | ||||
|                                         n_object.update({ | ||||
|                                             'watch_url': watch['url'], | ||||
|                                             'uuid': uuid, | ||||
|                                             'current_snapshot': contents.decode('utf-8'), | ||||
|                                             'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep), | ||||
|                                             'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep) | ||||
|                                         }) | ||||
|  | ||||
|                                         self.notification_q.put(n_object) | ||||
|  | ||||
|                         except Exception as e: | ||||
|                             # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! | ||||
|                             print("!!!! Exception in update_worker !!!\n", e) | ||||
|                             self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                             self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|  | ||||
|                     finally: | ||||
|                         # Always record that we atleast tried | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), | ||||
|                                                                            'last_checked': round(time.time())}) | ||||
|  | ||||
|                         # Always save the screenshot if it's available | ||||
|                         if screenshot: | ||||
|                             self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot) | ||||
|                         if xpath_data: | ||||
|                             self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data) | ||||
|             # Always save the screenshot if it's available | ||||
|             if screenshot: | ||||
|                 self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot) | ||||
|             if xpath_data: | ||||
|                 self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data) | ||||
|  | ||||
|  | ||||
|                 self.current_uuid = None  # Done | ||||
|                 self.q.task_done() | ||||
|  | ||||
|                 # Give the CPU time to interrupt | ||||
|                 time.sleep(0.1) | ||||
|  | ||||
|             self.app.config.exit.wait(1) | ||||
|   | ||||
		Reference in New Issue
	
	Block a user