mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 06:37:41 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			puppeteer-
			...
			thread-rec
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 56b88624d7 | ||
|   | c3c0f62662 | 
| @@ -18,6 +18,7 @@ import threading | ||||
| import time | ||||
| from copy import deepcopy | ||||
| from threading import Event | ||||
| from PriorityThreadPoolExecutor import PriorityThreadPoolExecutor | ||||
|  | ||||
| import flask_login | ||||
| import logging | ||||
| @@ -49,12 +50,12 @@ __version__ = '0.39.18' | ||||
| datastore = None | ||||
|  | ||||
| # Local | ||||
| running_update_threads = [] | ||||
| running_update_uuids = set() | ||||
| ticker_thread = None | ||||
|  | ||||
| extra_stylesheets = [] | ||||
|  | ||||
| update_q = queue.PriorityQueue() | ||||
| pool = None | ||||
|  | ||||
| notification_q = queue.Queue() | ||||
|  | ||||
| @@ -105,10 +106,9 @@ def init_app_secret(datastore_path): | ||||
| # running or something similar. | ||||
| @app.template_filter('format_last_checked_time') | ||||
| def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): | ||||
|     # Worker thread tells us which UUID it is currently processing. | ||||
|     for t in running_update_threads: | ||||
|         if t.current_uuid == watch_obj['uuid']: | ||||
|             return '<span class="loader"></span><span> Checking now</span>' | ||||
|  | ||||
|     if watch_obj['uuid'] in running_update_uuids: | ||||
|         return '<span class="loader"></span><span> Checking now</span>' | ||||
|  | ||||
|     if watch_obj['last_checked'] == 0: | ||||
|         return 'Not yet' | ||||
| @@ -178,13 +178,15 @@ class User(flask_login.UserMixin): | ||||
|  | ||||
| def changedetection_app(config=None, datastore_o=None): | ||||
|     global datastore | ||||
|     global pool | ||||
|     datastore = datastore_o | ||||
|  | ||||
|     # so far just for read-only via tests, but this will be moved eventually to be the main source | ||||
|     # (instead of the global var) | ||||
|     app.config['DATASTORE']=datastore_o | ||||
|  | ||||
|     #app.config.update(config or {}) | ||||
|     pool = PriorityThreadPoolExecutor(max_workers=int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))) | ||||
|  | ||||
|  | ||||
|     login_manager = flask_login.LoginManager(app) | ||||
|     login_manager.login_view = 'login' | ||||
| @@ -193,20 +195,17 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|  | ||||
|     watch_api.add_resource(api_v1.WatchSingleHistory, | ||||
|                            '/api/v1/watch/<string:uuid>/history/<string:timestamp>', | ||||
|                            resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) | ||||
|                            resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) | ||||
|  | ||||
|     watch_api.add_resource(api_v1.WatchHistory, | ||||
|                            '/api/v1/watch/<string:uuid>/history', | ||||
|                            resource_class_kwargs={'datastore': datastore}) | ||||
|  | ||||
|     watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch', | ||||
|                            resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) | ||||
|                            resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) | ||||
|  | ||||
|     watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>', | ||||
|                            resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) | ||||
|  | ||||
|  | ||||
|  | ||||
|                            resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch}) | ||||
|  | ||||
|  | ||||
|     # Setup cors headers to allow all domains | ||||
| @@ -417,8 +416,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|                                  # Don't link to hosting when we're on the hosting environment | ||||
|                                  hosted_sticky=os.getenv("SALTED_PASS", False) == False, | ||||
|                                  guid=datastore.data['app_guid'], | ||||
|                                  queued_uuids=[uuid for p,uuid in update_q.queue]) | ||||
|  | ||||
|                                  queued_uuids=get_uuids_in_queue()) | ||||
|  | ||||
|         if session.get('share-link'): | ||||
|             del(session['share-link']) | ||||
| @@ -632,7 +630,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|             datastore.needs_write_urgent = True | ||||
|  | ||||
|             # Queue the watch for immediate recheck, with a higher priority | ||||
|             update_q.put((1, uuid)) | ||||
|             queue_single_watch(uuid=uuid, priority=1) | ||||
|  | ||||
|             # Diff page [edit] link should go back to diff page | ||||
|             if request.args.get("next") and request.args.get("next") == 'diff': | ||||
| @@ -749,7 +747,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|                 importer = import_url_list() | ||||
|                 importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore) | ||||
|                 for uuid in importer.new_uuids: | ||||
|                     update_q.put((1, uuid)) | ||||
|                     queue_single_watch(uuid=uuid, priority=1) | ||||
|  | ||||
|                 if len(importer.remaining_data) == 0: | ||||
|                     return redirect(url_for('index')) | ||||
| @@ -762,7 +760,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|                 d_importer = import_distill_io_json() | ||||
|                 d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore) | ||||
|                 for uuid in d_importer.new_uuids: | ||||
|                     update_q.put((1, uuid)) | ||||
|                     queue_single_watch(uuid=uuid, priority=1) | ||||
|  | ||||
|  | ||||
|  | ||||
| @@ -1107,7 +1105,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|  | ||||
|         if not add_paused and new_uuid: | ||||
|             # Straight into the queue. | ||||
|             update_q.put((1, new_uuid)) | ||||
|             queue_single_watch(uuid=new_uuid, priority=1) | ||||
|             flash("Watch added.") | ||||
|  | ||||
|         if add_paused: | ||||
| @@ -1144,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|             uuid = list(datastore.data['watching'].keys()).pop() | ||||
|  | ||||
|         new_uuid = datastore.clone(uuid) | ||||
|         update_q.put((5, new_uuid)) | ||||
|         queue_single_watch(uuid=uuid, priority=5) | ||||
|         flash('Cloned.') | ||||
|  | ||||
|         return redirect(url_for('index')) | ||||
| @@ -1157,31 +1155,25 @@ def changedetection_app(config=None, datastore_o=None): | ||||
|         uuid = request.args.get('uuid') | ||||
|         i = 0 | ||||
|  | ||||
|         running_uuids = [] | ||||
|         for t in running_update_threads: | ||||
|             running_uuids.append(t.current_uuid) | ||||
|  | ||||
|         # @todo check thread is running and skip | ||||
|  | ||||
|         if uuid: | ||||
|             if uuid not in running_uuids: | ||||
|                 update_q.put((1, uuid)) | ||||
|             if uuid not in get_uuids_in_queue(): | ||||
|                 queue_single_watch(uuid=uuid, priority=1) | ||||
|             i = 1 | ||||
|  | ||||
|         elif tag != None: | ||||
|             # Items that have this current tag | ||||
|             for watch_uuid, watch in datastore.data['watching'].items(): | ||||
|                 if (tag != None and tag in watch['tag']): | ||||
|                     if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: | ||||
|                         update_q.put((1, watch_uuid)) | ||||
|                     if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']: | ||||
|                         queue_single_watch(uuid=watch_uuid, priority=1) | ||||
|                         i += 1 | ||||
|  | ||||
|         else: | ||||
|             # No tag, no uuid, add everything. | ||||
|             for watch_uuid, watch in datastore.data['watching'].items(): | ||||
|  | ||||
|                 if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']: | ||||
|                     update_q.put((1, watch_uuid)) | ||||
|                 if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']: | ||||
|                     queue_single_watch(uuid=watch_uuid, priority=1) | ||||
|                     i += 1 | ||||
|         flash("{} watches are queued for rechecking.".format(i)) | ||||
|         return redirect(url_for('index', tag=tag)) | ||||
| @@ -1346,33 +1338,31 @@ def notification_runner(): | ||||
|             # Trim the log length | ||||
|             notification_debug_log = notification_debug_log[-100:] | ||||
|  | ||||
| # Thread runner to check every minute, look for new watches to feed into the Queue. | ||||
| def queue_single_watch(uuid, priority=1): | ||||
|     pool.submit(process_single_watch, uuid, priority=int(time.time()) - priority) | ||||
|  | ||||
| def process_single_watch(uuid): | ||||
|     running_update_uuids.add(uuid) | ||||
|     from changedetectionio import update_worker | ||||
|     worker = update_worker.update_worker(notification_q=notification_q, datastore=datastore) | ||||
|     worker.run(uuid) | ||||
|     running_update_uuids.remove(uuid) | ||||
|  | ||||
| def get_uuids_in_queue(): | ||||
|     return [workitem.args[0] for p, workitem in pool._work_queue.queue] | ||||
|  | ||||
| # Thread runner to load watch jobs into the queue as they become ready/due for checking again | ||||
| def ticker_thread_check_time_launch_checks(): | ||||
|     import random | ||||
|     from changedetectionio import update_worker | ||||
|  | ||||
|     recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) | ||||
|     print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) | ||||
|  | ||||
|     # Spin up Workers that do the fetching | ||||
|     # Can be overriden by ENV or use the default settings | ||||
|     n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) | ||||
|     for _ in range(n_workers): | ||||
|         new_worker = update_worker.update_worker(update_q, notification_q, app, datastore) | ||||
|         running_update_threads.append(new_worker) | ||||
|         new_worker.start() | ||||
|  | ||||
|     while not app.config.exit.is_set(): | ||||
|  | ||||
|         # Get a list of watches by UUID that are currently fetching data | ||||
|         running_uuids = [] | ||||
|         for t in running_update_threads: | ||||
|             if t.current_uuid: | ||||
|                 running_uuids.append(t.current_uuid) | ||||
|  | ||||
|         # Re #232 - Deepcopy the data incase it changes while we're iterating through it all | ||||
|         watch_uuid_list = [] | ||||
|         while True: | ||||
|         while not app.config.exit.is_set(): | ||||
|             try: | ||||
|                 watch_uuid_list = datastore.data['watching'].keys() | ||||
|             except RuntimeError as e: | ||||
| @@ -1382,8 +1372,9 @@ def ticker_thread_check_time_launch_checks(): | ||||
|                 break | ||||
|  | ||||
|         # Re #438 - Don't place more watches in the queue to be checked if the queue is already large | ||||
|         while update_q.qsize() >= 2000: | ||||
|             time.sleep(1) | ||||
|         while pool._work_queue.qsize() >= 2000: | ||||
|             if not app.config.exit.is_set(): | ||||
|                 time.sleep(1) | ||||
|  | ||||
|  | ||||
|         recheck_time_system_seconds = int(datastore.threshold_seconds) | ||||
| @@ -1414,7 +1405,8 @@ def ticker_thread_check_time_launch_checks(): | ||||
|  | ||||
|             seconds_since_last_recheck = now - watch['last_checked'] | ||||
|             if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: | ||||
|                 if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]: | ||||
|                 #@todo check 'not in running_uuids' | ||||
|                 if not uuid and uuid not in get_uuids_in_queue(): | ||||
|                     # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. | ||||
|                     priority = int(time.time()) | ||||
|                     print( | ||||
| @@ -1425,8 +1417,8 @@ def ticker_thread_check_time_launch_checks(): | ||||
|                             priority, | ||||
|                             watch.jitter_seconds, | ||||
|                             now - watch['last_checked'])) | ||||
|                     # Into the queue with you | ||||
|                     update_q.put((priority, uuid)) | ||||
|  | ||||
|                     queue_single_watch(uuid=uuid, priority=priority) | ||||
|  | ||||
|                     # Reset for next time | ||||
|                     watch.jitter_seconds = 0 | ||||
|   | ||||
| @@ -1,6 +1,8 @@ | ||||
| from flask_restful import abort, Resource | ||||
| from flask import request, make_response | ||||
|  | ||||
| import validators | ||||
|  | ||||
| from . import auth | ||||
|  | ||||
|  | ||||
| @@ -11,7 +13,7 @@ class Watch(Resource): | ||||
|     def __init__(self, **kwargs): | ||||
|         # datastore is a black box dependency | ||||
|         self.datastore = kwargs['datastore'] | ||||
|         self.update_q = kwargs['update_q'] | ||||
|         self.queue_single_watch = kwargs['queue_single_watch'] | ||||
|  | ||||
|     # Get information about a single watch, excluding the history list (can be large) | ||||
|     # curl http://localhost:4000/api/v1/watch/<string:uuid> | ||||
| @@ -24,7 +26,7 @@ class Watch(Resource): | ||||
|             abort(404, message='No watch exists with the UUID of {}'.format(uuid)) | ||||
|  | ||||
|         if request.args.get('recheck'): | ||||
|             self.update_q.put((1, uuid)) | ||||
|             self.queue_single_watch(uuid, priority=1) | ||||
|             return "OK", 200 | ||||
|  | ||||
|         # Return without history, get that via another API call | ||||
| @@ -86,7 +88,7 @@ class CreateWatch(Resource): | ||||
|     def __init__(self, **kwargs): | ||||
|         # datastore is a black box dependency | ||||
|         self.datastore = kwargs['datastore'] | ||||
|         self.update_q = kwargs['update_q'] | ||||
|         self.queue_single_watch = kwargs['queue_single_watch'] | ||||
|  | ||||
|     @auth.check_token | ||||
|     def post(self): | ||||
| @@ -100,7 +102,7 @@ class CreateWatch(Resource): | ||||
|         extras = {'title': json_data['title'].strip()} if json_data.get('title') else {} | ||||
|  | ||||
|         new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras) | ||||
|         self.update_q.put((1, new_uuid)) | ||||
|         self.queue_single_watch(new_uuid, priority=1) | ||||
|         return {'uuid': new_uuid}, 201 | ||||
|  | ||||
|     # Return concise list of available watches and some very basic info | ||||
| @@ -118,7 +120,7 @@ class CreateWatch(Resource): | ||||
|  | ||||
|         if request.args.get('recheck_all'): | ||||
|             for uuid in self.datastore.data['watching'].keys(): | ||||
|                 self.update_q.put((1, uuid)) | ||||
|                 self.queue_single_watch(uuid, priority=1) | ||||
|             return {'status': "OK"}, 200 | ||||
|  | ||||
|         return list, 200 | ||||
|   | ||||
| @@ -1,8 +1,7 @@ | ||||
| import logging | ||||
| import os | ||||
| import threading | ||||
| import queue | ||||
| import time | ||||
|  | ||||
| logging.basicConfig(level=logging.DEBUG) | ||||
| from changedetectionio import content_fetcher | ||||
| from changedetectionio.html_tools import FilterNotFoundInResponse | ||||
|  | ||||
| @@ -12,15 +11,12 @@ from changedetectionio.html_tools import FilterNotFoundInResponse | ||||
| # (another process inserts watches into the queue that are time-ready for checking) | ||||
|  | ||||
|  | ||||
| class update_worker(threading.Thread): | ||||
| class update_worker(): | ||||
|     current_uuid = None | ||||
|  | ||||
|     def __init__(self, q, notification_q, app, datastore, *args, **kwargs): | ||||
|         self.q = q | ||||
|         self.app = app | ||||
|     def __init__(self,  notification_q, datastore): | ||||
|         self.notification_q = notification_q | ||||
|         self.datastore = datastore | ||||
|         super().__init__(*args, **kwargs) | ||||
|  | ||||
|     def send_content_changed_notification(self, t, watch_uuid): | ||||
|  | ||||
| @@ -116,182 +112,168 @@ class update_worker(threading.Thread): | ||||
|             if os.path.isfile(full_path): | ||||
|                 os.unlink(full_path) | ||||
|  | ||||
|     def run(self): | ||||
|     def run(self, uuid): | ||||
|         from changedetectionio import fetch_site_status | ||||
|  | ||||
|         update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) | ||||
|  | ||||
|         while not self.app.config.exit.is_set(): | ||||
|         self.current_uuid = uuid | ||||
|  | ||||
|         if uuid in list(self.datastore.data['watching'].keys()): | ||||
|             changed_detected = False | ||||
|             contents = b'' | ||||
|             screenshot = False | ||||
|             update_obj= {} | ||||
|             xpath_data = False | ||||
|             process_changedetection_results = True | ||||
|             print("> Processing UUID {} Priority {} URL {}".format(uuid, 1, self.datastore.data['watching'][uuid]['url'])) | ||||
|             now = time.time() | ||||
|  | ||||
|             try: | ||||
|                 priority, uuid = self.q.get(block=False) | ||||
|             except queue.Empty: | ||||
|                 pass | ||||
|                 changed_detected, update_obj, contents = update_handler.run(uuid) | ||||
|                 # Re #342 | ||||
|                 # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. | ||||
|                 # We then convert/.decode('utf-8') for the notification etc | ||||
|                 if not isinstance(contents, (bytes, bytearray)): | ||||
|                     raise Exception("Error - returned data from the fetch handler SHOULD be bytes") | ||||
|             except PermissionError as e: | ||||
|                 logging.error("File permission error updating", uuid, str(e)) | ||||
|                 process_changedetection_results = False | ||||
|             except content_fetcher.ReplyWithContentButNoText as e: | ||||
|                 # Totally fine, it's by choice - just continue on, nothing more to care about | ||||
|                 # Page had elements/content but no renderable text | ||||
|                 # Backend (not filters) gave zero output | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)}) | ||||
|                 if e.screenshot: | ||||
|                     self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot) | ||||
|                 process_changedetection_results = False | ||||
|  | ||||
|             except content_fetcher.Non200ErrorCodeReceived as e: | ||||
|                 if e.status_code == 403: | ||||
|                     err_text = "Error - 403 (Access denied) received" | ||||
|                 elif e.status_code == 404: | ||||
|                     err_text = "Error - 404 (Page not found) received" | ||||
|                 elif e.status_code == 500: | ||||
|                     err_text = "Error - 500 (Internal server Error) received" | ||||
|                 else: | ||||
|                     err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code)) | ||||
|  | ||||
|                 if e.screenshot: | ||||
|                     self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|                 if e.xpath_data: | ||||
|                     self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True) | ||||
|                 if e.page_text: | ||||
|                     self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text) | ||||
|  | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    # So that we get a trigger when the content is added again | ||||
|                                                                    'previous_md5': ''}) | ||||
|                 process_changedetection_results = False | ||||
|  | ||||
|             except FilterNotFoundInResponse as e: | ||||
|                 err_text = "Warning, filter '{}' not found".format(str(e)) | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    # So that we get a trigger when the content is added again | ||||
|                                                                    'previous_md5': ''}) | ||||
|  | ||||
|                 # Only when enabled, send the notification | ||||
|                 if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): | ||||
|                     c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) | ||||
|                     c += 1 | ||||
|                     # Send notification if we reached the threshold? | ||||
|                     threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', | ||||
|                                                                                    0) | ||||
|                     print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) | ||||
|                     if threshold > 0 and c >= threshold: | ||||
|                         if not self.datastore.data['watching'][uuid].get('notification_muted'): | ||||
|                             self.send_filter_failure_notification(uuid) | ||||
|                         c = 0 | ||||
|  | ||||
|                     self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) | ||||
|  | ||||
|                 process_changedetection_results = True | ||||
|  | ||||
|             except content_fetcher.EmptyReply as e: | ||||
|                 # Some kind of custom to-str handler in the exception handler that does this? | ||||
|                 err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    'last_check_status': e.status_code}) | ||||
|             except content_fetcher.ScreenshotUnavailable as e: | ||||
|                 err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    'last_check_status': e.status_code}) | ||||
|                 process_changedetection_results = False | ||||
|             except content_fetcher.JSActionExceptions as e: | ||||
|                 err_text = "Error running JS Actions - Page request - "+e.message | ||||
|                 if e.screenshot: | ||||
|                     self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    'last_check_status': e.status_code}) | ||||
|             except content_fetcher.PageUnloadable as e: | ||||
|                 err_text = "Page request from server didnt respond correctly" | ||||
|                 if e.message: | ||||
|                     err_text = "{} - {}".format(err_text, e.message) | ||||
|  | ||||
|                 if e.screenshot: | ||||
|                     self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|  | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                    'last_check_status': e.status_code}) | ||||
|             except Exception as e: | ||||
|                 logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                 self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|                 # Other serious error | ||||
|                 process_changedetection_results = False | ||||
|             else: | ||||
|                 self.current_uuid = uuid | ||||
|                 # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) | ||||
|                 if not self.datastore.data['watching'].get(uuid): | ||||
|                     return | ||||
|  | ||||
|                 if uuid in list(self.datastore.data['watching'].keys()): | ||||
|                     changed_detected = False | ||||
|                     contents = b'' | ||||
|                     screenshot = False | ||||
|                     update_obj= {} | ||||
|                     xpath_data = False | ||||
|                     process_changedetection_results = True | ||||
|                     print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url'])) | ||||
|                     now = time.time() | ||||
|                 # Mark that we never had any failures | ||||
|                 if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): | ||||
|                     update_obj['consecutive_filter_failures'] = 0 | ||||
|  | ||||
|                     try: | ||||
|                         changed_detected, update_obj, contents = update_handler.run(uuid) | ||||
|                         # Re #342 | ||||
|                         # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. | ||||
|                         # We then convert/.decode('utf-8') for the notification etc | ||||
|                         if not isinstance(contents, (bytes, bytearray)): | ||||
|                             raise Exception("Error - returned data from the fetch handler SHOULD be bytes") | ||||
|                     except PermissionError as e: | ||||
|                         self.app.logger.error("File permission error updating", uuid, str(e)) | ||||
|                         process_changedetection_results = False | ||||
|                     except content_fetcher.ReplyWithContentButNoText as e: | ||||
|                         # Totally fine, it's by choice - just continue on, nothing more to care about | ||||
|                         # Page had elements/content but no renderable text | ||||
|                         # Backend (not filters) gave zero output | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)}) | ||||
|                         if e.screenshot: | ||||
|                             self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot) | ||||
|                         process_changedetection_results = False | ||||
|                 self.cleanup_error_artifacts(uuid) | ||||
|  | ||||
|                     except content_fetcher.Non200ErrorCodeReceived as e: | ||||
|                         if e.status_code == 403: | ||||
|                             err_text = "Error - 403 (Access denied) received" | ||||
|                         elif e.status_code == 404: | ||||
|                             err_text = "Error - 404 (Page not found) received" | ||||
|                         elif e.status_code == 500: | ||||
|                             err_text = "Error - 500 (Internal server Error) received" | ||||
|                         else: | ||||
|                             err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code)) | ||||
|             # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc | ||||
|             if process_changedetection_results: | ||||
|                 try: | ||||
|                     watch = self.datastore.data['watching'][uuid] | ||||
|                     fname = "" # Saved history text filename | ||||
|  | ||||
|                         if e.screenshot: | ||||
|                             self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|                         if e.xpath_data: | ||||
|                             self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True) | ||||
|                         if e.page_text: | ||||
|                             self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text) | ||||
|                     # For the FIRST time we check a site, or a change detected, save the snapshot. | ||||
|                     if changed_detected or not watch['last_checked']: | ||||
|                         # A change was detected | ||||
|                         watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) | ||||
|  | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            # So that we get a trigger when the content is added again | ||||
|                                                                            'previous_md5': ''}) | ||||
|                         process_changedetection_results = False | ||||
|                     self.datastore.update_watch(uuid=uuid, update_obj=update_obj) | ||||
|  | ||||
|                     except FilterNotFoundInResponse as e: | ||||
|                         err_text = "Warning, filter '{}' not found".format(str(e)) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            # So that we get a trigger when the content is added again | ||||
|                                                                            'previous_md5': ''}) | ||||
|                     # A change was detected | ||||
|                     if changed_detected: | ||||
|                         print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) | ||||
|  | ||||
|                         # Only when enabled, send the notification | ||||
|                         if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False): | ||||
|                             c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) | ||||
|                             c += 1 | ||||
|                             # Send notification if we reached the threshold? | ||||
|                             threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', | ||||
|                                                                                            0) | ||||
|                             print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) | ||||
|                             if threshold > 0 and c >= threshold: | ||||
|                                 if not self.datastore.data['watching'][uuid].get('notification_muted'): | ||||
|                                     self.send_filter_failure_notification(uuid) | ||||
|                                 c = 0 | ||||
|  | ||||
|                             self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c}) | ||||
|  | ||||
|                         process_changedetection_results = True | ||||
|  | ||||
|                     except content_fetcher.EmptyReply as e: | ||||
|                         # Some kind of custom to-str handler in the exception handler that does this? | ||||
|                         err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except content_fetcher.ScreenshotUnavailable as e: | ||||
|                         err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                         process_changedetection_results = False | ||||
|                     except content_fetcher.JSActionExceptions as e: | ||||
|                         err_text = "Error running JS Actions - Page request - "+e.message | ||||
|                         if e.screenshot: | ||||
|                             self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except content_fetcher.PageUnloadable as e: | ||||
|                         err_text = "Page request from server didnt respond correctly" | ||||
|                         if e.message: | ||||
|                             err_text = "{} - {}".format(err_text, e.message) | ||||
|  | ||||
|                         if e.screenshot: | ||||
|                             self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True) | ||||
|  | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, | ||||
|                                                                            'last_check_status': e.status_code}) | ||||
|                     except Exception as e: | ||||
|                         self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                         self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|                         # Other serious error | ||||
|                         process_changedetection_results = False | ||||
|                     else: | ||||
|                         # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc) | ||||
|                         if not self.datastore.data['watching'].get(uuid): | ||||
|                             continue | ||||
|  | ||||
|                         # Mark that we never had any failures | ||||
|                         if not self.datastore.data['watching'][uuid].get('ignore_status_codes'): | ||||
|                             update_obj['consecutive_filter_failures'] = 0 | ||||
|  | ||||
|                         self.cleanup_error_artifacts(uuid) | ||||
|  | ||||
|                     # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc | ||||
|                     if process_changedetection_results: | ||||
|                         try: | ||||
|                             watch = self.datastore.data['watching'][uuid] | ||||
|                             fname = "" # Saved history text filename | ||||
|  | ||||
|                             # For the FIRST time we check a site, or a change detected, save the snapshot. | ||||
|                             if changed_detected or not watch['last_checked']: | ||||
|                                 # A change was detected | ||||
|                                 watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) | ||||
|  | ||||
|                             self.datastore.update_watch(uuid=uuid, update_obj=update_obj) | ||||
|  | ||||
|                             # A change was detected | ||||
|                             if changed_detected: | ||||
|                                 print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) | ||||
|  | ||||
|                                 # Notifications should only trigger on the second time (first time, we gather the initial snapshot) | ||||
|                                 if watch.history_n >= 2: | ||||
|                                     if not self.datastore.data['watching'][uuid].get('notification_muted'): | ||||
|                                         self.send_content_changed_notification(self, watch_uuid=uuid) | ||||
|                         # Notifications should only trigger on the second time (first time, we gather the initial snapshot) | ||||
|                         if watch.history_n >= 2: | ||||
|                             if not self.datastore.data['watching'][uuid].get('notification_muted'): | ||||
|                                 self.send_content_changed_notification(self, watch_uuid=uuid) | ||||
|  | ||||
|  | ||||
|                         except Exception as e: | ||||
|                             # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! | ||||
|                             print("!!!! Exception in update_worker !!!\n", e) | ||||
|                             self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                             self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|                 except Exception as e: | ||||
|                     # Catch everything possible here, so that if a worker crashes, we don't lose it until restart! | ||||
|                     print("!!!! Exception in update_worker !!!\n", e) | ||||
|                     logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) | ||||
|                     self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) | ||||
|  | ||||
|  | ||||
|                     # Always record that we atleast tried | ||||
|                     self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), | ||||
|                                                                        'last_checked': round(time.time())}) | ||||
|             # Always record that we atleast tried | ||||
|             self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), | ||||
|                                                                'last_checked': round(time.time())}) | ||||
|  | ||||
|                     # Always save the screenshot if it's available | ||||
|                     if update_handler.screenshot: | ||||
|                         self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot) | ||||
|                     if update_handler.xpath_data: | ||||
|                         self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data) | ||||
|             # Always save the screenshot if it's available | ||||
|             if update_handler.screenshot: | ||||
|                 self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot) | ||||
|             if update_handler.xpath_data: | ||||
|                 self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data) | ||||
|  | ||||
|  | ||||
|                 self.current_uuid = None  # Done | ||||
|                 self.q.task_done() | ||||
|  | ||||
|                 # Give the CPU time to interrupt | ||||
|                 time.sleep(0.1) | ||||
|  | ||||
|             self.app.config.exit.wait(1) | ||||
|         self.current_uuid = None | ||||
| @@ -33,6 +33,8 @@ bs4 | ||||
| # XPath filtering, lxml is required by bs4 anyway, but put it here to be safe. | ||||
| lxml | ||||
|  | ||||
| PriorityThreadPoolExecutor | ||||
|  | ||||
| # 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0 | ||||
| selenium ~= 4.1.0 | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user