mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			2 Commits
		
	
	
		
			0.50.26
			...
			thread-rec
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					56b88624d7 | ||
| 
						 | 
					c3c0f62662 | 
@@ -18,6 +18,7 @@ import threading
 | 
			
		||||
import time
 | 
			
		||||
from copy import deepcopy
 | 
			
		||||
from threading import Event
 | 
			
		||||
from PriorityThreadPoolExecutor import PriorityThreadPoolExecutor
 | 
			
		||||
 | 
			
		||||
import flask_login
 | 
			
		||||
import logging
 | 
			
		||||
@@ -49,12 +50,12 @@ __version__ = '0.39.18'
 | 
			
		||||
datastore = None
 | 
			
		||||
 | 
			
		||||
# Local
 | 
			
		||||
running_update_threads = []
 | 
			
		||||
running_update_uuids = set()
 | 
			
		||||
ticker_thread = None
 | 
			
		||||
 | 
			
		||||
extra_stylesheets = []
 | 
			
		||||
 | 
			
		||||
update_q = queue.PriorityQueue()
 | 
			
		||||
pool = None
 | 
			
		||||
 | 
			
		||||
notification_q = queue.Queue()
 | 
			
		||||
 | 
			
		||||
@@ -105,10 +106,9 @@ def init_app_secret(datastore_path):
 | 
			
		||||
# running or something similar.
 | 
			
		||||
@app.template_filter('format_last_checked_time')
 | 
			
		||||
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
 | 
			
		||||
    # Worker thread tells us which UUID it is currently processing.
 | 
			
		||||
    for t in running_update_threads:
 | 
			
		||||
        if t.current_uuid == watch_obj['uuid']:
 | 
			
		||||
            return '<span class="loader"></span><span> Checking now</span>'
 | 
			
		||||
 | 
			
		||||
    if watch_obj['uuid'] in running_update_uuids:
 | 
			
		||||
        return '<span class="loader"></span><span> Checking now</span>'
 | 
			
		||||
 | 
			
		||||
    if watch_obj['last_checked'] == 0:
 | 
			
		||||
        return 'Not yet'
 | 
			
		||||
@@ -178,13 +178,15 @@ class User(flask_login.UserMixin):
 | 
			
		||||
 | 
			
		||||
def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
    global datastore
 | 
			
		||||
    global pool
 | 
			
		||||
    datastore = datastore_o
 | 
			
		||||
 | 
			
		||||
    # so far just for read-only via tests, but this will be moved eventually to be the main source
 | 
			
		||||
    # (instead of the global var)
 | 
			
		||||
    app.config['DATASTORE']=datastore_o
 | 
			
		||||
 | 
			
		||||
    #app.config.update(config or {})
 | 
			
		||||
    pool = PriorityThreadPoolExecutor(max_workers=int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])))
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    login_manager = flask_login.LoginManager(app)
 | 
			
		||||
    login_manager.login_view = 'login'
 | 
			
		||||
@@ -193,20 +195,17 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
 | 
			
		||||
    watch_api.add_resource(api_v1.WatchSingleHistory,
 | 
			
		||||
                           '/api/v1/watch/<string:uuid>/history/<string:timestamp>',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
 | 
			
		||||
 | 
			
		||||
    watch_api.add_resource(api_v1.WatchHistory,
 | 
			
		||||
                           '/api/v1/watch/<string:uuid>/history',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore})
 | 
			
		||||
 | 
			
		||||
    watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
 | 
			
		||||
 | 
			
		||||
    watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>',
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                           resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    # Setup cors headers to allow all domains
 | 
			
		||||
@@ -417,8 +416,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
                                 # Don't link to hosting when we're on the hosting environment
 | 
			
		||||
                                 hosted_sticky=os.getenv("SALTED_PASS", False) == False,
 | 
			
		||||
                                 guid=datastore.data['app_guid'],
 | 
			
		||||
                                 queued_uuids=[uuid for p,uuid in update_q.queue])
 | 
			
		||||
 | 
			
		||||
                                 queued_uuids=get_uuids_in_queue())
 | 
			
		||||
 | 
			
		||||
        if session.get('share-link'):
 | 
			
		||||
            del(session['share-link'])
 | 
			
		||||
@@ -632,7 +630,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
            datastore.needs_write_urgent = True
 | 
			
		||||
 | 
			
		||||
            # Queue the watch for immediate recheck, with a higher priority
 | 
			
		||||
            update_q.put((1, uuid))
 | 
			
		||||
            queue_single_watch(uuid=uuid, priority=1)
 | 
			
		||||
 | 
			
		||||
            # Diff page [edit] link should go back to diff page
 | 
			
		||||
            if request.args.get("next") and request.args.get("next") == 'diff':
 | 
			
		||||
@@ -749,7 +747,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
                importer = import_url_list()
 | 
			
		||||
                importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore)
 | 
			
		||||
                for uuid in importer.new_uuids:
 | 
			
		||||
                    update_q.put((1, uuid))
 | 
			
		||||
                    queue_single_watch(uuid=uuid, priority=1)
 | 
			
		||||
 | 
			
		||||
                if len(importer.remaining_data) == 0:
 | 
			
		||||
                    return redirect(url_for('index'))
 | 
			
		||||
@@ -762,7 +760,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
                d_importer = import_distill_io_json()
 | 
			
		||||
                d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
 | 
			
		||||
                for uuid in d_importer.new_uuids:
 | 
			
		||||
                    update_q.put((1, uuid))
 | 
			
		||||
                    queue_single_watch(uuid=uuid, priority=1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -1107,7 +1105,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
 | 
			
		||||
        if not add_paused and new_uuid:
 | 
			
		||||
            # Straight into the queue.
 | 
			
		||||
            update_q.put((1, new_uuid))
 | 
			
		||||
            queue_single_watch(uuid=new_uuid, priority=1)
 | 
			
		||||
            flash("Watch added.")
 | 
			
		||||
 | 
			
		||||
        if add_paused:
 | 
			
		||||
@@ -1144,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
            uuid = list(datastore.data['watching'].keys()).pop()
 | 
			
		||||
 | 
			
		||||
        new_uuid = datastore.clone(uuid)
 | 
			
		||||
        update_q.put((5, new_uuid))
 | 
			
		||||
        queue_single_watch(uuid=uuid, priority=5)
 | 
			
		||||
        flash('Cloned.')
 | 
			
		||||
 | 
			
		||||
        return redirect(url_for('index'))
 | 
			
		||||
@@ -1157,31 +1155,25 @@ def changedetection_app(config=None, datastore_o=None):
 | 
			
		||||
        uuid = request.args.get('uuid')
 | 
			
		||||
        i = 0
 | 
			
		||||
 | 
			
		||||
        running_uuids = []
 | 
			
		||||
        for t in running_update_threads:
 | 
			
		||||
            running_uuids.append(t.current_uuid)
 | 
			
		||||
 | 
			
		||||
        # @todo check thread is running and skip
 | 
			
		||||
 | 
			
		||||
        if uuid:
 | 
			
		||||
            if uuid not in running_uuids:
 | 
			
		||||
                update_q.put((1, uuid))
 | 
			
		||||
            if uuid not in get_uuids_in_queue():
 | 
			
		||||
                queue_single_watch(uuid=uuid, priority=1)
 | 
			
		||||
            i = 1
 | 
			
		||||
 | 
			
		||||
        elif tag != None:
 | 
			
		||||
            # Items that have this current tag
 | 
			
		||||
            for watch_uuid, watch in datastore.data['watching'].items():
 | 
			
		||||
                if (tag != None and tag in watch['tag']):
 | 
			
		||||
                    if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
 | 
			
		||||
                        update_q.put((1, watch_uuid))
 | 
			
		||||
                    if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
 | 
			
		||||
                        queue_single_watch(uuid=watch_uuid, priority=1)
 | 
			
		||||
                        i += 1
 | 
			
		||||
 | 
			
		||||
        else:
 | 
			
		||||
            # No tag, no uuid, add everything.
 | 
			
		||||
            for watch_uuid, watch in datastore.data['watching'].items():
 | 
			
		||||
 | 
			
		||||
                if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
 | 
			
		||||
                    update_q.put((1, watch_uuid))
 | 
			
		||||
                if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
 | 
			
		||||
                    queue_single_watch(uuid=watch_uuid, priority=1)
 | 
			
		||||
                    i += 1
 | 
			
		||||
        flash("{} watches are queued for rechecking.".format(i))
 | 
			
		||||
        return redirect(url_for('index', tag=tag))
 | 
			
		||||
@@ -1346,33 +1338,31 @@ def notification_runner():
 | 
			
		||||
            # Trim the log length
 | 
			
		||||
            notification_debug_log = notification_debug_log[-100:]
 | 
			
		||||
 | 
			
		||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
 | 
			
		||||
def queue_single_watch(uuid, priority=1):
 | 
			
		||||
    pool.submit(process_single_watch, uuid, priority=int(time.time()) - priority)
 | 
			
		||||
 | 
			
		||||
def process_single_watch(uuid):
 | 
			
		||||
    running_update_uuids.add(uuid)
 | 
			
		||||
    from changedetectionio import update_worker
 | 
			
		||||
    worker = update_worker.update_worker(notification_q=notification_q, datastore=datastore)
 | 
			
		||||
    worker.run(uuid)
 | 
			
		||||
    running_update_uuids.remove(uuid)
 | 
			
		||||
 | 
			
		||||
def get_uuids_in_queue():
 | 
			
		||||
    return [workitem.args[0] for p, workitem in pool._work_queue.queue]
 | 
			
		||||
 | 
			
		||||
# Thread runner to load watch jobs into the queue as they become ready/due for checking again
 | 
			
		||||
def ticker_thread_check_time_launch_checks():
 | 
			
		||||
    import random
 | 
			
		||||
    from changedetectionio import update_worker
 | 
			
		||||
 | 
			
		||||
    recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
 | 
			
		||||
    print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
 | 
			
		||||
 | 
			
		||||
    # Spin up Workers that do the fetching
 | 
			
		||||
    # Can be overriden by ENV or use the default settings
 | 
			
		||||
    n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
 | 
			
		||||
    for _ in range(n_workers):
 | 
			
		||||
        new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
 | 
			
		||||
        running_update_threads.append(new_worker)
 | 
			
		||||
        new_worker.start()
 | 
			
		||||
 | 
			
		||||
    while not app.config.exit.is_set():
 | 
			
		||||
 | 
			
		||||
        # Get a list of watches by UUID that are currently fetching data
 | 
			
		||||
        running_uuids = []
 | 
			
		||||
        for t in running_update_threads:
 | 
			
		||||
            if t.current_uuid:
 | 
			
		||||
                running_uuids.append(t.current_uuid)
 | 
			
		||||
 | 
			
		||||
        # Re #232 - Deepcopy the data incase it changes while we're iterating through it all
 | 
			
		||||
        watch_uuid_list = []
 | 
			
		||||
        while True:
 | 
			
		||||
        while not app.config.exit.is_set():
 | 
			
		||||
            try:
 | 
			
		||||
                watch_uuid_list = datastore.data['watching'].keys()
 | 
			
		||||
            except RuntimeError as e:
 | 
			
		||||
@@ -1382,8 +1372,9 @@ def ticker_thread_check_time_launch_checks():
 | 
			
		||||
                break
 | 
			
		||||
 | 
			
		||||
        # Re #438 - Don't place more watches in the queue to be checked if the queue is already large
 | 
			
		||||
        while update_q.qsize() >= 2000:
 | 
			
		||||
            time.sleep(1)
 | 
			
		||||
        while pool._work_queue.qsize() >= 2000:
 | 
			
		||||
            if not app.config.exit.is_set():
 | 
			
		||||
                time.sleep(1)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
        recheck_time_system_seconds = int(datastore.threshold_seconds)
 | 
			
		||||
@@ -1414,7 +1405,8 @@ def ticker_thread_check_time_launch_checks():
 | 
			
		||||
 | 
			
		||||
            seconds_since_last_recheck = now - watch['last_checked']
 | 
			
		||||
            if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
 | 
			
		||||
                if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]:
 | 
			
		||||
                #@todo check 'not in running_uuids'
 | 
			
		||||
                if not uuid and uuid not in get_uuids_in_queue():
 | 
			
		||||
                    # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
 | 
			
		||||
                    priority = int(time.time())
 | 
			
		||||
                    print(
 | 
			
		||||
@@ -1425,8 +1417,8 @@ def ticker_thread_check_time_launch_checks():
 | 
			
		||||
                            priority,
 | 
			
		||||
                            watch.jitter_seconds,
 | 
			
		||||
                            now - watch['last_checked']))
 | 
			
		||||
                    # Into the queue with you
 | 
			
		||||
                    update_q.put((priority, uuid))
 | 
			
		||||
 | 
			
		||||
                    queue_single_watch(uuid=uuid, priority=priority)
 | 
			
		||||
 | 
			
		||||
                    # Reset for next time
 | 
			
		||||
                    watch.jitter_seconds = 0
 | 
			
		||||
 
 | 
			
		||||
@@ -1,6 +1,8 @@
 | 
			
		||||
from flask_restful import abort, Resource
 | 
			
		||||
from flask import request, make_response
 | 
			
		||||
 | 
			
		||||
import validators
 | 
			
		||||
 | 
			
		||||
from . import auth
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
@@ -11,7 +13,7 @@ class Watch(Resource):
 | 
			
		||||
    def __init__(self, **kwargs):
 | 
			
		||||
        # datastore is a black box dependency
 | 
			
		||||
        self.datastore = kwargs['datastore']
 | 
			
		||||
        self.update_q = kwargs['update_q']
 | 
			
		||||
        self.queue_single_watch = kwargs['queue_single_watch']
 | 
			
		||||
 | 
			
		||||
    # Get information about a single watch, excluding the history list (can be large)
 | 
			
		||||
    # curl http://localhost:4000/api/v1/watch/<string:uuid>
 | 
			
		||||
@@ -24,7 +26,7 @@ class Watch(Resource):
 | 
			
		||||
            abort(404, message='No watch exists with the UUID of {}'.format(uuid))
 | 
			
		||||
 | 
			
		||||
        if request.args.get('recheck'):
 | 
			
		||||
            self.update_q.put((1, uuid))
 | 
			
		||||
            self.queue_single_watch(uuid, priority=1)
 | 
			
		||||
            return "OK", 200
 | 
			
		||||
 | 
			
		||||
        # Return without history, get that via another API call
 | 
			
		||||
@@ -86,7 +88,7 @@ class CreateWatch(Resource):
 | 
			
		||||
    def __init__(self, **kwargs):
 | 
			
		||||
        # datastore is a black box dependency
 | 
			
		||||
        self.datastore = kwargs['datastore']
 | 
			
		||||
        self.update_q = kwargs['update_q']
 | 
			
		||||
        self.queue_single_watch = kwargs['queue_single_watch']
 | 
			
		||||
 | 
			
		||||
    @auth.check_token
 | 
			
		||||
    def post(self):
 | 
			
		||||
@@ -100,7 +102,7 @@ class CreateWatch(Resource):
 | 
			
		||||
        extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
 | 
			
		||||
 | 
			
		||||
        new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
 | 
			
		||||
        self.update_q.put((1, new_uuid))
 | 
			
		||||
        self.queue_single_watch(new_uuid, priority=1)
 | 
			
		||||
        return {'uuid': new_uuid}, 201
 | 
			
		||||
 | 
			
		||||
    # Return concise list of available watches and some very basic info
 | 
			
		||||
@@ -118,7 +120,7 @@ class CreateWatch(Resource):
 | 
			
		||||
 | 
			
		||||
        if request.args.get('recheck_all'):
 | 
			
		||||
            for uuid in self.datastore.data['watching'].keys():
 | 
			
		||||
                self.update_q.put((1, uuid))
 | 
			
		||||
                self.queue_single_watch(uuid, priority=1)
 | 
			
		||||
            return {'status': "OK"}, 200
 | 
			
		||||
 | 
			
		||||
        return list, 200
 | 
			
		||||
 
 | 
			
		||||
@@ -1,8 +1,7 @@
 | 
			
		||||
import logging
 | 
			
		||||
import os
 | 
			
		||||
import threading
 | 
			
		||||
import queue
 | 
			
		||||
import time
 | 
			
		||||
 | 
			
		||||
logging.basicConfig(level=logging.DEBUG)
 | 
			
		||||
from changedetectionio import content_fetcher
 | 
			
		||||
from changedetectionio.html_tools import FilterNotFoundInResponse
 | 
			
		||||
 | 
			
		||||
@@ -12,15 +11,12 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
 | 
			
		||||
# (another process inserts watches into the queue that are time-ready for checking)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
class update_worker(threading.Thread):
 | 
			
		||||
class update_worker():
 | 
			
		||||
    current_uuid = None
 | 
			
		||||
 | 
			
		||||
    def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
 | 
			
		||||
        self.q = q
 | 
			
		||||
        self.app = app
 | 
			
		||||
    def __init__(self,  notification_q, datastore):
 | 
			
		||||
        self.notification_q = notification_q
 | 
			
		||||
        self.datastore = datastore
 | 
			
		||||
        super().__init__(*args, **kwargs)
 | 
			
		||||
 | 
			
		||||
    def send_content_changed_notification(self, t, watch_uuid):
 | 
			
		||||
 | 
			
		||||
@@ -116,182 +112,168 @@ class update_worker(threading.Thread):
 | 
			
		||||
            if os.path.isfile(full_path):
 | 
			
		||||
                os.unlink(full_path)
 | 
			
		||||
 | 
			
		||||
    def run(self):
 | 
			
		||||
    def run(self, uuid):
 | 
			
		||||
        from changedetectionio import fetch_site_status
 | 
			
		||||
 | 
			
		||||
        update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
 | 
			
		||||
 | 
			
		||||
        while not self.app.config.exit.is_set():
 | 
			
		||||
        self.current_uuid = uuid
 | 
			
		||||
 | 
			
		||||
        if uuid in list(self.datastore.data['watching'].keys()):
 | 
			
		||||
            changed_detected = False
 | 
			
		||||
            contents = b''
 | 
			
		||||
            screenshot = False
 | 
			
		||||
            update_obj= {}
 | 
			
		||||
            xpath_data = False
 | 
			
		||||
            process_changedetection_results = True
 | 
			
		||||
            print("> Processing UUID {} Priority {} URL {}".format(uuid, 1, self.datastore.data['watching'][uuid]['url']))
 | 
			
		||||
            now = time.time()
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                priority, uuid = self.q.get(block=False)
 | 
			
		||||
            except queue.Empty:
 | 
			
		||||
                pass
 | 
			
		||||
                changed_detected, update_obj, contents = update_handler.run(uuid)
 | 
			
		||||
                # Re #342
 | 
			
		||||
                # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
 | 
			
		||||
                # We then convert/.decode('utf-8') for the notification etc
 | 
			
		||||
                if not isinstance(contents, (bytes, bytearray)):
 | 
			
		||||
                    raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
 | 
			
		||||
            except PermissionError as e:
 | 
			
		||||
                logging.error("File permission error updating", uuid, str(e))
 | 
			
		||||
                process_changedetection_results = False
 | 
			
		||||
            except content_fetcher.ReplyWithContentButNoText as e:
 | 
			
		||||
                # Totally fine, it's by choice - just continue on, nothing more to care about
 | 
			
		||||
                # Page had elements/content but no renderable text
 | 
			
		||||
                # Backend (not filters) gave zero output
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)})
 | 
			
		||||
                if e.screenshot:
 | 
			
		||||
                    self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot)
 | 
			
		||||
                process_changedetection_results = False
 | 
			
		||||
 | 
			
		||||
            except content_fetcher.Non200ErrorCodeReceived as e:
 | 
			
		||||
                if e.status_code == 403:
 | 
			
		||||
                    err_text = "Error - 403 (Access denied) received"
 | 
			
		||||
                elif e.status_code == 404:
 | 
			
		||||
                    err_text = "Error - 404 (Page not found) received"
 | 
			
		||||
                elif e.status_code == 500:
 | 
			
		||||
                    err_text = "Error - 500 (Internal server Error) received"
 | 
			
		||||
                else:
 | 
			
		||||
                    err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code))
 | 
			
		||||
 | 
			
		||||
                if e.screenshot:
 | 
			
		||||
                    self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
                if e.xpath_data:
 | 
			
		||||
                    self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True)
 | 
			
		||||
                if e.page_text:
 | 
			
		||||
                    self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text)
 | 
			
		||||
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   # So that we get a trigger when the content is added again
 | 
			
		||||
                                                                   'previous_md5': ''})
 | 
			
		||||
                process_changedetection_results = False
 | 
			
		||||
 | 
			
		||||
            except FilterNotFoundInResponse as e:
 | 
			
		||||
                err_text = "Warning, filter '{}' not found".format(str(e))
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   # So that we get a trigger when the content is added again
 | 
			
		||||
                                                                   'previous_md5': ''})
 | 
			
		||||
 | 
			
		||||
                # Only when enabled, send the notification
 | 
			
		||||
                if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False):
 | 
			
		||||
                    c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
 | 
			
		||||
                    c += 1
 | 
			
		||||
                    # Send notification if we reached the threshold?
 | 
			
		||||
                    threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',
 | 
			
		||||
                                                                                   0)
 | 
			
		||||
                    print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
 | 
			
		||||
                    if threshold > 0 and c >= threshold:
 | 
			
		||||
                        if not self.datastore.data['watching'][uuid].get('notification_muted'):
 | 
			
		||||
                            self.send_filter_failure_notification(uuid)
 | 
			
		||||
                        c = 0
 | 
			
		||||
 | 
			
		||||
                    self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
 | 
			
		||||
 | 
			
		||||
                process_changedetection_results = True
 | 
			
		||||
 | 
			
		||||
            except content_fetcher.EmptyReply as e:
 | 
			
		||||
                # Some kind of custom to-str handler in the exception handler that does this?
 | 
			
		||||
                err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   'last_check_status': e.status_code})
 | 
			
		||||
            except content_fetcher.ScreenshotUnavailable as e:
 | 
			
		||||
                err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   'last_check_status': e.status_code})
 | 
			
		||||
                process_changedetection_results = False
 | 
			
		||||
            except content_fetcher.JSActionExceptions as e:
 | 
			
		||||
                err_text = "Error running JS Actions - Page request - "+e.message
 | 
			
		||||
                if e.screenshot:
 | 
			
		||||
                    self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   'last_check_status': e.status_code})
 | 
			
		||||
            except content_fetcher.PageUnloadable as e:
 | 
			
		||||
                err_text = "Page request from server didnt respond correctly"
 | 
			
		||||
                if e.message:
 | 
			
		||||
                    err_text = "{} - {}".format(err_text, e.message)
 | 
			
		||||
 | 
			
		||||
                if e.screenshot:
 | 
			
		||||
                    self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                   'last_check_status': e.status_code})
 | 
			
		||||
            except Exception as e:
 | 
			
		||||
                logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
                # Other serious error
 | 
			
		||||
                process_changedetection_results = False
 | 
			
		||||
            else:
 | 
			
		||||
                self.current_uuid = uuid
 | 
			
		||||
                # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
 | 
			
		||||
                if not self.datastore.data['watching'].get(uuid):
 | 
			
		||||
                    return
 | 
			
		||||
 | 
			
		||||
                if uuid in list(self.datastore.data['watching'].keys()):
 | 
			
		||||
                    changed_detected = False
 | 
			
		||||
                    contents = b''
 | 
			
		||||
                    screenshot = False
 | 
			
		||||
                    update_obj= {}
 | 
			
		||||
                    xpath_data = False
 | 
			
		||||
                    process_changedetection_results = True
 | 
			
		||||
                    print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url']))
 | 
			
		||||
                    now = time.time()
 | 
			
		||||
                # Mark that we never had any failures
 | 
			
		||||
                if not self.datastore.data['watching'][uuid].get('ignore_status_codes'):
 | 
			
		||||
                    update_obj['consecutive_filter_failures'] = 0
 | 
			
		||||
 | 
			
		||||
                    try:
 | 
			
		||||
                        changed_detected, update_obj, contents = update_handler.run(uuid)
 | 
			
		||||
                        # Re #342
 | 
			
		||||
                        # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
 | 
			
		||||
                        # We then convert/.decode('utf-8') for the notification etc
 | 
			
		||||
                        if not isinstance(contents, (bytes, bytearray)):
 | 
			
		||||
                            raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
 | 
			
		||||
                    except PermissionError as e:
 | 
			
		||||
                        self.app.logger.error("File permission error updating", uuid, str(e))
 | 
			
		||||
                        process_changedetection_results = False
 | 
			
		||||
                    except content_fetcher.ReplyWithContentButNoText as e:
 | 
			
		||||
                        # Totally fine, it's by choice - just continue on, nothing more to care about
 | 
			
		||||
                        # Page had elements/content but no renderable text
 | 
			
		||||
                        # Backend (not filters) gave zero output
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)})
 | 
			
		||||
                        if e.screenshot:
 | 
			
		||||
                            self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot)
 | 
			
		||||
                        process_changedetection_results = False
 | 
			
		||||
                self.cleanup_error_artifacts(uuid)
 | 
			
		||||
 | 
			
		||||
                    except content_fetcher.Non200ErrorCodeReceived as e:
 | 
			
		||||
                        if e.status_code == 403:
 | 
			
		||||
                            err_text = "Error - 403 (Access denied) received"
 | 
			
		||||
                        elif e.status_code == 404:
 | 
			
		||||
                            err_text = "Error - 404 (Page not found) received"
 | 
			
		||||
                        elif e.status_code == 500:
 | 
			
		||||
                            err_text = "Error - 500 (Internal server Error) received"
 | 
			
		||||
                        else:
 | 
			
		||||
                            err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code))
 | 
			
		||||
            # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
 | 
			
		||||
            if process_changedetection_results:
 | 
			
		||||
                try:
 | 
			
		||||
                    watch = self.datastore.data['watching'][uuid]
 | 
			
		||||
                    fname = "" # Saved history text filename
 | 
			
		||||
 | 
			
		||||
                        if e.screenshot:
 | 
			
		||||
                            self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
                        if e.xpath_data:
 | 
			
		||||
                            self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True)
 | 
			
		||||
                        if e.page_text:
 | 
			
		||||
                            self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text)
 | 
			
		||||
                    # For the FIRST time we check a site, or a change detected, save the snapshot.
 | 
			
		||||
                    if changed_detected or not watch['last_checked']:
 | 
			
		||||
                        # A change was detected
 | 
			
		||||
                        watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
 | 
			
		||||
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           # So that we get a trigger when the content is added again
 | 
			
		||||
                                                                           'previous_md5': ''})
 | 
			
		||||
                        process_changedetection_results = False
 | 
			
		||||
                    self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
 | 
			
		||||
 | 
			
		||||
                    except FilterNotFoundInResponse as e:
 | 
			
		||||
                        err_text = "Warning, filter '{}' not found".format(str(e))
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           # So that we get a trigger when the content is added again
 | 
			
		||||
                                                                           'previous_md5': ''})
 | 
			
		||||
                    # A change was detected
 | 
			
		||||
                    if changed_detected:
 | 
			
		||||
                        print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
 | 
			
		||||
 | 
			
		||||
                        # Only when enabled, send the notification
 | 
			
		||||
                        if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False):
 | 
			
		||||
                            c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
 | 
			
		||||
                            c += 1
 | 
			
		||||
                            # Send notification if we reached the threshold?
 | 
			
		||||
                            threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',
 | 
			
		||||
                                                                                           0)
 | 
			
		||||
                            print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
 | 
			
		||||
                            if threshold > 0 and c >= threshold:
 | 
			
		||||
                                if not self.datastore.data['watching'][uuid].get('notification_muted'):
 | 
			
		||||
                                    self.send_filter_failure_notification(uuid)
 | 
			
		||||
                                c = 0
 | 
			
		||||
 | 
			
		||||
                            self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
 | 
			
		||||
 | 
			
		||||
                        process_changedetection_results = True
 | 
			
		||||
 | 
			
		||||
                    except content_fetcher.EmptyReply as e:
 | 
			
		||||
                        # Some kind of custom to-str handler in the exception handler that does this?
 | 
			
		||||
                        err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except content_fetcher.ScreenshotUnavailable as e:
 | 
			
		||||
                        err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                        process_changedetection_results = False
 | 
			
		||||
                    except content_fetcher.JSActionExceptions as e:
 | 
			
		||||
                        err_text = "Error running JS Actions - Page request - "+e.message
 | 
			
		||||
                        if e.screenshot:
 | 
			
		||||
                            self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except content_fetcher.PageUnloadable as e:
 | 
			
		||||
                        err_text = "Page request from server didnt respond correctly"
 | 
			
		||||
                        if e.message:
 | 
			
		||||
                            err_text = "{} - {}".format(err_text, e.message)
 | 
			
		||||
 | 
			
		||||
                        if e.screenshot:
 | 
			
		||||
                            self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
 | 
			
		||||
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
 | 
			
		||||
                                                                           'last_check_status': e.status_code})
 | 
			
		||||
                    except Exception as e:
 | 
			
		||||
                        self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                        self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
                        # Other serious error
 | 
			
		||||
                        process_changedetection_results = False
 | 
			
		||||
                    else:
 | 
			
		||||
                        # Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
 | 
			
		||||
                        if not self.datastore.data['watching'].get(uuid):
 | 
			
		||||
                            continue
 | 
			
		||||
 | 
			
		||||
                        # Mark that we never had any failures
 | 
			
		||||
                        if not self.datastore.data['watching'][uuid].get('ignore_status_codes'):
 | 
			
		||||
                            update_obj['consecutive_filter_failures'] = 0
 | 
			
		||||
 | 
			
		||||
                        self.cleanup_error_artifacts(uuid)
 | 
			
		||||
 | 
			
		||||
                    # Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
 | 
			
		||||
                    if process_changedetection_results:
 | 
			
		||||
                        try:
 | 
			
		||||
                            watch = self.datastore.data['watching'][uuid]
 | 
			
		||||
                            fname = "" # Saved history text filename
 | 
			
		||||
 | 
			
		||||
                            # For the FIRST time we check a site, or a change detected, save the snapshot.
 | 
			
		||||
                            if changed_detected or not watch['last_checked']:
 | 
			
		||||
                                # A change was detected
 | 
			
		||||
                                watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
 | 
			
		||||
 | 
			
		||||
                            self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
 | 
			
		||||
 | 
			
		||||
                            # A change was detected
 | 
			
		||||
                            if changed_detected:
 | 
			
		||||
                                print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
 | 
			
		||||
 | 
			
		||||
                                # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
 | 
			
		||||
                                if watch.history_n >= 2:
 | 
			
		||||
                                    if not self.datastore.data['watching'][uuid].get('notification_muted'):
 | 
			
		||||
                                        self.send_content_changed_notification(self, watch_uuid=uuid)
 | 
			
		||||
                        # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
 | 
			
		||||
                        if watch.history_n >= 2:
 | 
			
		||||
                            if not self.datastore.data['watching'][uuid].get('notification_muted'):
 | 
			
		||||
                                self.send_content_changed_notification(self, watch_uuid=uuid)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                        except Exception as e:
 | 
			
		||||
                            # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
 | 
			
		||||
                            print("!!!! Exception in update_worker !!!\n", e)
 | 
			
		||||
                            self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                            self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
                except Exception as e:
 | 
			
		||||
                    # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
 | 
			
		||||
                    print("!!!! Exception in update_worker !!!\n", e)
 | 
			
		||||
                    logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
 | 
			
		||||
                    self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                    # Always record that we atleast tried
 | 
			
		||||
                    self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
 | 
			
		||||
                                                                       'last_checked': round(time.time())})
 | 
			
		||||
            # Always record that we atleast tried
 | 
			
		||||
            self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
 | 
			
		||||
                                                               'last_checked': round(time.time())})
 | 
			
		||||
 | 
			
		||||
                    # Always save the screenshot if it's available
 | 
			
		||||
                    if update_handler.screenshot:
 | 
			
		||||
                        self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot)
 | 
			
		||||
                    if update_handler.xpath_data:
 | 
			
		||||
                        self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data)
 | 
			
		||||
            # Always save the screenshot if it's available
 | 
			
		||||
            if update_handler.screenshot:
 | 
			
		||||
                self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot)
 | 
			
		||||
            if update_handler.xpath_data:
 | 
			
		||||
                self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
                self.current_uuid = None  # Done
 | 
			
		||||
                self.q.task_done()
 | 
			
		||||
 | 
			
		||||
                # Give the CPU time to interrupt
 | 
			
		||||
                time.sleep(0.1)
 | 
			
		||||
 | 
			
		||||
            self.app.config.exit.wait(1)
 | 
			
		||||
        self.current_uuid = None
 | 
			
		||||
@@ -33,6 +33,8 @@ bs4
 | 
			
		||||
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
 | 
			
		||||
lxml
 | 
			
		||||
 | 
			
		||||
PriorityThreadPoolExecutor
 | 
			
		||||
 | 
			
		||||
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
 | 
			
		||||
selenium ~= 4.1.0
 | 
			
		||||
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user