mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-07 01:56:53 +00:00
Compare commits
2 Commits
build-erro
...
thread-rec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
56b88624d7 | ||
|
|
c3c0f62662 |
@@ -18,6 +18,7 @@ import threading
|
|||||||
import time
|
import time
|
||||||
from copy import deepcopy
|
from copy import deepcopy
|
||||||
from threading import Event
|
from threading import Event
|
||||||
|
from PriorityThreadPoolExecutor import PriorityThreadPoolExecutor
|
||||||
|
|
||||||
import flask_login
|
import flask_login
|
||||||
import logging
|
import logging
|
||||||
@@ -49,12 +50,12 @@ __version__ = '0.39.18'
|
|||||||
datastore = None
|
datastore = None
|
||||||
|
|
||||||
# Local
|
# Local
|
||||||
running_update_threads = []
|
running_update_uuids = set()
|
||||||
ticker_thread = None
|
ticker_thread = None
|
||||||
|
|
||||||
extra_stylesheets = []
|
extra_stylesheets = []
|
||||||
|
|
||||||
update_q = queue.PriorityQueue()
|
pool = None
|
||||||
|
|
||||||
notification_q = queue.Queue()
|
notification_q = queue.Queue()
|
||||||
|
|
||||||
@@ -105,10 +106,9 @@ def init_app_secret(datastore_path):
|
|||||||
# running or something similar.
|
# running or something similar.
|
||||||
@app.template_filter('format_last_checked_time')
|
@app.template_filter('format_last_checked_time')
|
||||||
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
|
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
|
||||||
# Worker thread tells us which UUID it is currently processing.
|
|
||||||
for t in running_update_threads:
|
if watch_obj['uuid'] in running_update_uuids:
|
||||||
if t.current_uuid == watch_obj['uuid']:
|
return '<span class="loader"></span><span> Checking now</span>'
|
||||||
return '<span class="loader"></span><span> Checking now</span>'
|
|
||||||
|
|
||||||
if watch_obj['last_checked'] == 0:
|
if watch_obj['last_checked'] == 0:
|
||||||
return 'Not yet'
|
return 'Not yet'
|
||||||
@@ -178,13 +178,15 @@ class User(flask_login.UserMixin):
|
|||||||
|
|
||||||
def changedetection_app(config=None, datastore_o=None):
|
def changedetection_app(config=None, datastore_o=None):
|
||||||
global datastore
|
global datastore
|
||||||
|
global pool
|
||||||
datastore = datastore_o
|
datastore = datastore_o
|
||||||
|
|
||||||
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
# so far just for read-only via tests, but this will be moved eventually to be the main source
|
||||||
# (instead of the global var)
|
# (instead of the global var)
|
||||||
app.config['DATASTORE']=datastore_o
|
app.config['DATASTORE']=datastore_o
|
||||||
|
|
||||||
#app.config.update(config or {})
|
pool = PriorityThreadPoolExecutor(max_workers=int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])))
|
||||||
|
|
||||||
|
|
||||||
login_manager = flask_login.LoginManager(app)
|
login_manager = flask_login.LoginManager(app)
|
||||||
login_manager.login_view = 'login'
|
login_manager.login_view = 'login'
|
||||||
@@ -193,20 +195,17 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
watch_api.add_resource(api_v1.WatchSingleHistory,
|
watch_api.add_resource(api_v1.WatchSingleHistory,
|
||||||
'/api/v1/watch/<string:uuid>/history/<string:timestamp>',
|
'/api/v1/watch/<string:uuid>/history/<string:timestamp>',
|
||||||
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
|
resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
|
||||||
|
|
||||||
watch_api.add_resource(api_v1.WatchHistory,
|
watch_api.add_resource(api_v1.WatchHistory,
|
||||||
'/api/v1/watch/<string:uuid>/history',
|
'/api/v1/watch/<string:uuid>/history',
|
||||||
resource_class_kwargs={'datastore': datastore})
|
resource_class_kwargs={'datastore': datastore})
|
||||||
|
|
||||||
watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch',
|
watch_api.add_resource(api_v1.CreateWatch, '/api/v1/watch',
|
||||||
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
|
resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
|
||||||
|
|
||||||
watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>',
|
watch_api.add_resource(api_v1.Watch, '/api/v1/watch/<string:uuid>',
|
||||||
resource_class_kwargs={'datastore': datastore, 'update_q': update_q})
|
resource_class_kwargs={'datastore': datastore, 'queue_single_watch': queue_single_watch})
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
# Setup cors headers to allow all domains
|
# Setup cors headers to allow all domains
|
||||||
@@ -417,8 +416,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
# Don't link to hosting when we're on the hosting environment
|
# Don't link to hosting when we're on the hosting environment
|
||||||
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
|
hosted_sticky=os.getenv("SALTED_PASS", False) == False,
|
||||||
guid=datastore.data['app_guid'],
|
guid=datastore.data['app_guid'],
|
||||||
queued_uuids=[uuid for p,uuid in update_q.queue])
|
queued_uuids=get_uuids_in_queue())
|
||||||
|
|
||||||
|
|
||||||
if session.get('share-link'):
|
if session.get('share-link'):
|
||||||
del(session['share-link'])
|
del(session['share-link'])
|
||||||
@@ -632,7 +630,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
datastore.needs_write_urgent = True
|
datastore.needs_write_urgent = True
|
||||||
|
|
||||||
# Queue the watch for immediate recheck, with a higher priority
|
# Queue the watch for immediate recheck, with a higher priority
|
||||||
update_q.put((1, uuid))
|
queue_single_watch(uuid=uuid, priority=1)
|
||||||
|
|
||||||
# Diff page [edit] link should go back to diff page
|
# Diff page [edit] link should go back to diff page
|
||||||
if request.args.get("next") and request.args.get("next") == 'diff':
|
if request.args.get("next") and request.args.get("next") == 'diff':
|
||||||
@@ -749,7 +747,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
importer = import_url_list()
|
importer = import_url_list()
|
||||||
importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore)
|
importer.run(data=request.values.get('urls'), flash=flash, datastore=datastore)
|
||||||
for uuid in importer.new_uuids:
|
for uuid in importer.new_uuids:
|
||||||
update_q.put((1, uuid))
|
queue_single_watch(uuid=uuid, priority=1)
|
||||||
|
|
||||||
if len(importer.remaining_data) == 0:
|
if len(importer.remaining_data) == 0:
|
||||||
return redirect(url_for('index'))
|
return redirect(url_for('index'))
|
||||||
@@ -762,7 +760,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
d_importer = import_distill_io_json()
|
d_importer = import_distill_io_json()
|
||||||
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
|
d_importer.run(data=request.values.get('distill-io'), flash=flash, datastore=datastore)
|
||||||
for uuid in d_importer.new_uuids:
|
for uuid in d_importer.new_uuids:
|
||||||
update_q.put((1, uuid))
|
queue_single_watch(uuid=uuid, priority=1)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@@ -1107,7 +1105,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
if not add_paused and new_uuid:
|
if not add_paused and new_uuid:
|
||||||
# Straight into the queue.
|
# Straight into the queue.
|
||||||
update_q.put((1, new_uuid))
|
queue_single_watch(uuid=new_uuid, priority=1)
|
||||||
flash("Watch added.")
|
flash("Watch added.")
|
||||||
|
|
||||||
if add_paused:
|
if add_paused:
|
||||||
@@ -1144,7 +1142,7 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
uuid = list(datastore.data['watching'].keys()).pop()
|
uuid = list(datastore.data['watching'].keys()).pop()
|
||||||
|
|
||||||
new_uuid = datastore.clone(uuid)
|
new_uuid = datastore.clone(uuid)
|
||||||
update_q.put((5, new_uuid))
|
queue_single_watch(uuid=uuid, priority=5)
|
||||||
flash('Cloned.')
|
flash('Cloned.')
|
||||||
|
|
||||||
return redirect(url_for('index'))
|
return redirect(url_for('index'))
|
||||||
@@ -1157,31 +1155,25 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
uuid = request.args.get('uuid')
|
uuid = request.args.get('uuid')
|
||||||
i = 0
|
i = 0
|
||||||
|
|
||||||
running_uuids = []
|
|
||||||
for t in running_update_threads:
|
|
||||||
running_uuids.append(t.current_uuid)
|
|
||||||
|
|
||||||
# @todo check thread is running and skip
|
|
||||||
|
|
||||||
if uuid:
|
if uuid:
|
||||||
if uuid not in running_uuids:
|
if uuid not in get_uuids_in_queue():
|
||||||
update_q.put((1, uuid))
|
queue_single_watch(uuid=uuid, priority=1)
|
||||||
i = 1
|
i = 1
|
||||||
|
|
||||||
elif tag != None:
|
elif tag != None:
|
||||||
# Items that have this current tag
|
# Items that have this current tag
|
||||||
for watch_uuid, watch in datastore.data['watching'].items():
|
for watch_uuid, watch in datastore.data['watching'].items():
|
||||||
if (tag != None and tag in watch['tag']):
|
if (tag != None and tag in watch['tag']):
|
||||||
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
|
if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
|
||||||
update_q.put((1, watch_uuid))
|
queue_single_watch(uuid=watch_uuid, priority=1)
|
||||||
i += 1
|
i += 1
|
||||||
|
|
||||||
else:
|
else:
|
||||||
# No tag, no uuid, add everything.
|
# No tag, no uuid, add everything.
|
||||||
for watch_uuid, watch in datastore.data['watching'].items():
|
for watch_uuid, watch in datastore.data['watching'].items():
|
||||||
|
|
||||||
if watch_uuid not in running_uuids and not datastore.data['watching'][watch_uuid]['paused']:
|
if watch_uuid not in get_uuids_in_queue() and not datastore.data['watching'][watch_uuid]['paused']:
|
||||||
update_q.put((1, watch_uuid))
|
queue_single_watch(uuid=watch_uuid, priority=1)
|
||||||
i += 1
|
i += 1
|
||||||
flash("{} watches are queued for rechecking.".format(i))
|
flash("{} watches are queued for rechecking.".format(i))
|
||||||
return redirect(url_for('index', tag=tag))
|
return redirect(url_for('index', tag=tag))
|
||||||
@@ -1346,33 +1338,31 @@ def notification_runner():
|
|||||||
# Trim the log length
|
# Trim the log length
|
||||||
notification_debug_log = notification_debug_log[-100:]
|
notification_debug_log = notification_debug_log[-100:]
|
||||||
|
|
||||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
def queue_single_watch(uuid, priority=1):
|
||||||
|
pool.submit(process_single_watch, uuid, priority=int(time.time()) - priority)
|
||||||
|
|
||||||
|
def process_single_watch(uuid):
|
||||||
|
running_update_uuids.add(uuid)
|
||||||
|
from changedetectionio import update_worker
|
||||||
|
worker = update_worker.update_worker(notification_q=notification_q, datastore=datastore)
|
||||||
|
worker.run(uuid)
|
||||||
|
running_update_uuids.remove(uuid)
|
||||||
|
|
||||||
|
def get_uuids_in_queue():
|
||||||
|
return [workitem.args[0] for p, workitem in pool._work_queue.queue]
|
||||||
|
|
||||||
|
# Thread runner to load watch jobs into the queue as they become ready/due for checking again
|
||||||
def ticker_thread_check_time_launch_checks():
|
def ticker_thread_check_time_launch_checks():
|
||||||
import random
|
import random
|
||||||
from changedetectionio import update_worker
|
|
||||||
|
|
||||||
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
||||||
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
|
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
|
||||||
|
|
||||||
# Spin up Workers that do the fetching
|
|
||||||
# Can be overriden by ENV or use the default settings
|
|
||||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
|
||||||
for _ in range(n_workers):
|
|
||||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
|
||||||
running_update_threads.append(new_worker)
|
|
||||||
new_worker.start()
|
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
|
|
||||||
# Get a list of watches by UUID that are currently fetching data
|
|
||||||
running_uuids = []
|
|
||||||
for t in running_update_threads:
|
|
||||||
if t.current_uuid:
|
|
||||||
running_uuids.append(t.current_uuid)
|
|
||||||
|
|
||||||
# Re #232 - Deepcopy the data incase it changes while we're iterating through it all
|
# Re #232 - Deepcopy the data incase it changes while we're iterating through it all
|
||||||
watch_uuid_list = []
|
watch_uuid_list = []
|
||||||
while True:
|
while not app.config.exit.is_set():
|
||||||
try:
|
try:
|
||||||
watch_uuid_list = datastore.data['watching'].keys()
|
watch_uuid_list = datastore.data['watching'].keys()
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
@@ -1382,8 +1372,9 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
break
|
break
|
||||||
|
|
||||||
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
|
# Re #438 - Don't place more watches in the queue to be checked if the queue is already large
|
||||||
while update_q.qsize() >= 2000:
|
while pool._work_queue.qsize() >= 2000:
|
||||||
time.sleep(1)
|
if not app.config.exit.is_set():
|
||||||
|
time.sleep(1)
|
||||||
|
|
||||||
|
|
||||||
recheck_time_system_seconds = int(datastore.threshold_seconds)
|
recheck_time_system_seconds = int(datastore.threshold_seconds)
|
||||||
@@ -1414,7 +1405,8 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
|
|
||||||
seconds_since_last_recheck = now - watch['last_checked']
|
seconds_since_last_recheck = now - watch['last_checked']
|
||||||
if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
|
if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds:
|
||||||
if not uuid in running_uuids and uuid not in [q_uuid for p,q_uuid in update_q.queue]:
|
#@todo check 'not in running_uuids'
|
||||||
|
if not uuid and uuid not in get_uuids_in_queue():
|
||||||
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
# Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it.
|
||||||
priority = int(time.time())
|
priority = int(time.time())
|
||||||
print(
|
print(
|
||||||
@@ -1425,8 +1417,8 @@ def ticker_thread_check_time_launch_checks():
|
|||||||
priority,
|
priority,
|
||||||
watch.jitter_seconds,
|
watch.jitter_seconds,
|
||||||
now - watch['last_checked']))
|
now - watch['last_checked']))
|
||||||
# Into the queue with you
|
|
||||||
update_q.put((priority, uuid))
|
queue_single_watch(uuid=uuid, priority=priority)
|
||||||
|
|
||||||
# Reset for next time
|
# Reset for next time
|
||||||
watch.jitter_seconds = 0
|
watch.jitter_seconds = 0
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
from flask_restful import abort, Resource
|
from flask_restful import abort, Resource
|
||||||
from flask import request, make_response
|
from flask import request, make_response
|
||||||
|
|
||||||
import validators
|
import validators
|
||||||
|
|
||||||
from . import auth
|
from . import auth
|
||||||
|
|
||||||
|
|
||||||
@@ -11,7 +13,7 @@ class Watch(Resource):
|
|||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
# datastore is a black box dependency
|
# datastore is a black box dependency
|
||||||
self.datastore = kwargs['datastore']
|
self.datastore = kwargs['datastore']
|
||||||
self.update_q = kwargs['update_q']
|
self.queue_single_watch = kwargs['queue_single_watch']
|
||||||
|
|
||||||
# Get information about a single watch, excluding the history list (can be large)
|
# Get information about a single watch, excluding the history list (can be large)
|
||||||
# curl http://localhost:4000/api/v1/watch/<string:uuid>
|
# curl http://localhost:4000/api/v1/watch/<string:uuid>
|
||||||
@@ -24,7 +26,7 @@ class Watch(Resource):
|
|||||||
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
abort(404, message='No watch exists with the UUID of {}'.format(uuid))
|
||||||
|
|
||||||
if request.args.get('recheck'):
|
if request.args.get('recheck'):
|
||||||
self.update_q.put((1, uuid))
|
self.queue_single_watch(uuid, priority=1)
|
||||||
return "OK", 200
|
return "OK", 200
|
||||||
|
|
||||||
# Return without history, get that via another API call
|
# Return without history, get that via another API call
|
||||||
@@ -86,7 +88,7 @@ class CreateWatch(Resource):
|
|||||||
def __init__(self, **kwargs):
|
def __init__(self, **kwargs):
|
||||||
# datastore is a black box dependency
|
# datastore is a black box dependency
|
||||||
self.datastore = kwargs['datastore']
|
self.datastore = kwargs['datastore']
|
||||||
self.update_q = kwargs['update_q']
|
self.queue_single_watch = kwargs['queue_single_watch']
|
||||||
|
|
||||||
@auth.check_token
|
@auth.check_token
|
||||||
def post(self):
|
def post(self):
|
||||||
@@ -100,7 +102,7 @@ class CreateWatch(Resource):
|
|||||||
extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
|
extras = {'title': json_data['title'].strip()} if json_data.get('title') else {}
|
||||||
|
|
||||||
new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
|
new_uuid = self.datastore.add_watch(url=json_data['url'].strip(), tag=tag, extras=extras)
|
||||||
self.update_q.put((1, new_uuid))
|
self.queue_single_watch(new_uuid, priority=1)
|
||||||
return {'uuid': new_uuid}, 201
|
return {'uuid': new_uuid}, 201
|
||||||
|
|
||||||
# Return concise list of available watches and some very basic info
|
# Return concise list of available watches and some very basic info
|
||||||
@@ -118,7 +120,7 @@ class CreateWatch(Resource):
|
|||||||
|
|
||||||
if request.args.get('recheck_all'):
|
if request.args.get('recheck_all'):
|
||||||
for uuid in self.datastore.data['watching'].keys():
|
for uuid in self.datastore.data['watching'].keys():
|
||||||
self.update_q.put((1, uuid))
|
self.queue_single_watch(uuid, priority=1)
|
||||||
return {'status': "OK"}, 200
|
return {'status': "OK"}, 200
|
||||||
|
|
||||||
return list, 200
|
return list, 200
|
||||||
|
|||||||
@@ -1,8 +1,7 @@
|
|||||||
|
import logging
|
||||||
import os
|
import os
|
||||||
import threading
|
|
||||||
import queue
|
|
||||||
import time
|
import time
|
||||||
|
logging.basicConfig(level=logging.DEBUG)
|
||||||
from changedetectionio import content_fetcher
|
from changedetectionio import content_fetcher
|
||||||
from changedetectionio.html_tools import FilterNotFoundInResponse
|
from changedetectionio.html_tools import FilterNotFoundInResponse
|
||||||
|
|
||||||
@@ -12,15 +11,12 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
|
|||||||
# (another process inserts watches into the queue that are time-ready for checking)
|
# (another process inserts watches into the queue that are time-ready for checking)
|
||||||
|
|
||||||
|
|
||||||
class update_worker(threading.Thread):
|
class update_worker():
|
||||||
current_uuid = None
|
current_uuid = None
|
||||||
|
|
||||||
def __init__(self, q, notification_q, app, datastore, *args, **kwargs):
|
def __init__(self, notification_q, datastore):
|
||||||
self.q = q
|
|
||||||
self.app = app
|
|
||||||
self.notification_q = notification_q
|
self.notification_q = notification_q
|
||||||
self.datastore = datastore
|
self.datastore = datastore
|
||||||
super().__init__(*args, **kwargs)
|
|
||||||
|
|
||||||
def send_content_changed_notification(self, t, watch_uuid):
|
def send_content_changed_notification(self, t, watch_uuid):
|
||||||
|
|
||||||
@@ -116,182 +112,168 @@ class update_worker(threading.Thread):
|
|||||||
if os.path.isfile(full_path):
|
if os.path.isfile(full_path):
|
||||||
os.unlink(full_path)
|
os.unlink(full_path)
|
||||||
|
|
||||||
def run(self):
|
def run(self, uuid):
|
||||||
from changedetectionio import fetch_site_status
|
from changedetectionio import fetch_site_status
|
||||||
|
|
||||||
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
|
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
|
||||||
|
|
||||||
while not self.app.config.exit.is_set():
|
self.current_uuid = uuid
|
||||||
|
|
||||||
|
if uuid in list(self.datastore.data['watching'].keys()):
|
||||||
|
changed_detected = False
|
||||||
|
contents = b''
|
||||||
|
screenshot = False
|
||||||
|
update_obj= {}
|
||||||
|
xpath_data = False
|
||||||
|
process_changedetection_results = True
|
||||||
|
print("> Processing UUID {} Priority {} URL {}".format(uuid, 1, self.datastore.data['watching'][uuid]['url']))
|
||||||
|
now = time.time()
|
||||||
|
|
||||||
try:
|
try:
|
||||||
priority, uuid = self.q.get(block=False)
|
changed_detected, update_obj, contents = update_handler.run(uuid)
|
||||||
except queue.Empty:
|
# Re #342
|
||||||
pass
|
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
|
||||||
|
# We then convert/.decode('utf-8') for the notification etc
|
||||||
|
if not isinstance(contents, (bytes, bytearray)):
|
||||||
|
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
|
||||||
|
except PermissionError as e:
|
||||||
|
logging.error("File permission error updating", uuid, str(e))
|
||||||
|
process_changedetection_results = False
|
||||||
|
except content_fetcher.ReplyWithContentButNoText as e:
|
||||||
|
# Totally fine, it's by choice - just continue on, nothing more to care about
|
||||||
|
# Page had elements/content but no renderable text
|
||||||
|
# Backend (not filters) gave zero output
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)})
|
||||||
|
if e.screenshot:
|
||||||
|
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot)
|
||||||
|
process_changedetection_results = False
|
||||||
|
|
||||||
|
except content_fetcher.Non200ErrorCodeReceived as e:
|
||||||
|
if e.status_code == 403:
|
||||||
|
err_text = "Error - 403 (Access denied) received"
|
||||||
|
elif e.status_code == 404:
|
||||||
|
err_text = "Error - 404 (Page not found) received"
|
||||||
|
elif e.status_code == 500:
|
||||||
|
err_text = "Error - 500 (Internal server Error) received"
|
||||||
|
else:
|
||||||
|
err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code))
|
||||||
|
|
||||||
|
if e.screenshot:
|
||||||
|
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
||||||
|
if e.xpath_data:
|
||||||
|
self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True)
|
||||||
|
if e.page_text:
|
||||||
|
self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text)
|
||||||
|
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
# So that we get a trigger when the content is added again
|
||||||
|
'previous_md5': ''})
|
||||||
|
process_changedetection_results = False
|
||||||
|
|
||||||
|
except FilterNotFoundInResponse as e:
|
||||||
|
err_text = "Warning, filter '{}' not found".format(str(e))
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
# So that we get a trigger when the content is added again
|
||||||
|
'previous_md5': ''})
|
||||||
|
|
||||||
|
# Only when enabled, send the notification
|
||||||
|
if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False):
|
||||||
|
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
|
||||||
|
c += 1
|
||||||
|
# Send notification if we reached the threshold?
|
||||||
|
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',
|
||||||
|
0)
|
||||||
|
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
|
||||||
|
if threshold > 0 and c >= threshold:
|
||||||
|
if not self.datastore.data['watching'][uuid].get('notification_muted'):
|
||||||
|
self.send_filter_failure_notification(uuid)
|
||||||
|
c = 0
|
||||||
|
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
|
||||||
|
|
||||||
|
process_changedetection_results = True
|
||||||
|
|
||||||
|
except content_fetcher.EmptyReply as e:
|
||||||
|
# Some kind of custom to-str handler in the exception handler that does this?
|
||||||
|
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except content_fetcher.ScreenshotUnavailable as e:
|
||||||
|
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
process_changedetection_results = False
|
||||||
|
except content_fetcher.JSActionExceptions as e:
|
||||||
|
err_text = "Error running JS Actions - Page request - "+e.message
|
||||||
|
if e.screenshot:
|
||||||
|
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except content_fetcher.PageUnloadable as e:
|
||||||
|
err_text = "Page request from server didnt respond correctly"
|
||||||
|
if e.message:
|
||||||
|
err_text = "{} - {}".format(err_text, e.message)
|
||||||
|
|
||||||
|
if e.screenshot:
|
||||||
|
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
||||||
|
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
||||||
|
'last_check_status': e.status_code})
|
||||||
|
except Exception as e:
|
||||||
|
logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
||||||
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
||||||
|
# Other serious error
|
||||||
|
process_changedetection_results = False
|
||||||
else:
|
else:
|
||||||
self.current_uuid = uuid
|
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
|
||||||
|
if not self.datastore.data['watching'].get(uuid):
|
||||||
|
return
|
||||||
|
|
||||||
if uuid in list(self.datastore.data['watching'].keys()):
|
# Mark that we never had any failures
|
||||||
changed_detected = False
|
if not self.datastore.data['watching'][uuid].get('ignore_status_codes'):
|
||||||
contents = b''
|
update_obj['consecutive_filter_failures'] = 0
|
||||||
screenshot = False
|
|
||||||
update_obj= {}
|
|
||||||
xpath_data = False
|
|
||||||
process_changedetection_results = True
|
|
||||||
print("> Processing UUID {} Priority {} URL {}".format(uuid, priority, self.datastore.data['watching'][uuid]['url']))
|
|
||||||
now = time.time()
|
|
||||||
|
|
||||||
try:
|
self.cleanup_error_artifacts(uuid)
|
||||||
changed_detected, update_obj, contents = update_handler.run(uuid)
|
|
||||||
# Re #342
|
|
||||||
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
|
|
||||||
# We then convert/.decode('utf-8') for the notification etc
|
|
||||||
if not isinstance(contents, (bytes, bytearray)):
|
|
||||||
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
|
|
||||||
except PermissionError as e:
|
|
||||||
self.app.logger.error("File permission error updating", uuid, str(e))
|
|
||||||
process_changedetection_results = False
|
|
||||||
except content_fetcher.ReplyWithContentButNoText as e:
|
|
||||||
# Totally fine, it's by choice - just continue on, nothing more to care about
|
|
||||||
# Page had elements/content but no renderable text
|
|
||||||
# Backend (not filters) gave zero output
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found (With {} reply code).".format(e.status_code)})
|
|
||||||
if e.screenshot:
|
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot)
|
|
||||||
process_changedetection_results = False
|
|
||||||
|
|
||||||
except content_fetcher.Non200ErrorCodeReceived as e:
|
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
|
||||||
if e.status_code == 403:
|
if process_changedetection_results:
|
||||||
err_text = "Error - 403 (Access denied) received"
|
try:
|
||||||
elif e.status_code == 404:
|
watch = self.datastore.data['watching'][uuid]
|
||||||
err_text = "Error - 404 (Page not found) received"
|
fname = "" # Saved history text filename
|
||||||
elif e.status_code == 500:
|
|
||||||
err_text = "Error - 500 (Internal server Error) received"
|
|
||||||
else:
|
|
||||||
err_text = "Error - Request returned a HTTP error code {}".format(str(e.status_code))
|
|
||||||
|
|
||||||
if e.screenshot:
|
# For the FIRST time we check a site, or a change detected, save the snapshot.
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
if changed_detected or not watch['last_checked']:
|
||||||
if e.xpath_data:
|
# A change was detected
|
||||||
self.datastore.save_xpath_data(watch_uuid=uuid, data=e.xpath_data, as_error=True)
|
watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
|
||||||
if e.page_text:
|
|
||||||
self.datastore.save_error_text(watch_uuid=uuid, contents=e.page_text)
|
|
||||||
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
|
||||||
# So that we get a trigger when the content is added again
|
|
||||||
'previous_md5': ''})
|
|
||||||
process_changedetection_results = False
|
|
||||||
|
|
||||||
except FilterNotFoundInResponse as e:
|
# A change was detected
|
||||||
err_text = "Warning, filter '{}' not found".format(str(e))
|
if changed_detected:
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
|
||||||
# So that we get a trigger when the content is added again
|
|
||||||
'previous_md5': ''})
|
|
||||||
|
|
||||||
# Only when enabled, send the notification
|
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
|
||||||
if self.datastore.data['watching'][uuid].get('filter_failure_notification_send', False):
|
if watch.history_n >= 2:
|
||||||
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
|
if not self.datastore.data['watching'][uuid].get('notification_muted'):
|
||||||
c += 1
|
self.send_content_changed_notification(self, watch_uuid=uuid)
|
||||||
# Send notification if we reached the threshold?
|
|
||||||
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts',
|
|
||||||
0)
|
|
||||||
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c))
|
|
||||||
if threshold > 0 and c >= threshold:
|
|
||||||
if not self.datastore.data['watching'][uuid].get('notification_muted'):
|
|
||||||
self.send_filter_failure_notification(uuid)
|
|
||||||
c = 0
|
|
||||||
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'consecutive_filter_failures': c})
|
|
||||||
|
|
||||||
process_changedetection_results = True
|
|
||||||
|
|
||||||
except content_fetcher.EmptyReply as e:
|
|
||||||
# Some kind of custom to-str handler in the exception handler that does this?
|
|
||||||
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
except content_fetcher.ScreenshotUnavailable as e:
|
|
||||||
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
process_changedetection_results = False
|
|
||||||
except content_fetcher.JSActionExceptions as e:
|
|
||||||
err_text = "Error running JS Actions - Page request - "+e.message
|
|
||||||
if e.screenshot:
|
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
except content_fetcher.PageUnloadable as e:
|
|
||||||
err_text = "Page request from server didnt respond correctly"
|
|
||||||
if e.message:
|
|
||||||
err_text = "{} - {}".format(err_text, e.message)
|
|
||||||
|
|
||||||
if e.screenshot:
|
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=e.screenshot, as_error=True)
|
|
||||||
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
|
|
||||||
'last_check_status': e.status_code})
|
|
||||||
except Exception as e:
|
|
||||||
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
|
||||||
# Other serious error
|
|
||||||
process_changedetection_results = False
|
|
||||||
else:
|
|
||||||
# Crash protection, the watch entry could have been removed by this point (during a slow chrome fetch etc)
|
|
||||||
if not self.datastore.data['watching'].get(uuid):
|
|
||||||
continue
|
|
||||||
|
|
||||||
# Mark that we never had any failures
|
|
||||||
if not self.datastore.data['watching'][uuid].get('ignore_status_codes'):
|
|
||||||
update_obj['consecutive_filter_failures'] = 0
|
|
||||||
|
|
||||||
self.cleanup_error_artifacts(uuid)
|
|
||||||
|
|
||||||
# Different exceptions mean that we may or may not want to bump the snapshot, trigger notifications etc
|
|
||||||
if process_changedetection_results:
|
|
||||||
try:
|
|
||||||
watch = self.datastore.data['watching'][uuid]
|
|
||||||
fname = "" # Saved history text filename
|
|
||||||
|
|
||||||
# For the FIRST time we check a site, or a change detected, save the snapshot.
|
|
||||||
if changed_detected or not watch['last_checked']:
|
|
||||||
# A change was detected
|
|
||||||
watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
|
|
||||||
|
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj=update_obj)
|
|
||||||
|
|
||||||
# A change was detected
|
|
||||||
if changed_detected:
|
|
||||||
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
|
|
||||||
|
|
||||||
# Notifications should only trigger on the second time (first time, we gather the initial snapshot)
|
|
||||||
if watch.history_n >= 2:
|
|
||||||
if not self.datastore.data['watching'][uuid].get('notification_muted'):
|
|
||||||
self.send_content_changed_notification(self, watch_uuid=uuid)
|
|
||||||
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
|
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
|
||||||
print("!!!! Exception in update_worker !!!\n", e)
|
print("!!!! Exception in update_worker !!!\n", e)
|
||||||
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
logging.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
|
||||||
|
|
||||||
|
|
||||||
# Always record that we atleast tried
|
# Always record that we atleast tried
|
||||||
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
|
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
|
||||||
'last_checked': round(time.time())})
|
'last_checked': round(time.time())})
|
||||||
|
|
||||||
# Always save the screenshot if it's available
|
# Always save the screenshot if it's available
|
||||||
if update_handler.screenshot:
|
if update_handler.screenshot:
|
||||||
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot)
|
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=update_handler.screenshot)
|
||||||
if update_handler.xpath_data:
|
if update_handler.xpath_data:
|
||||||
self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data)
|
self.datastore.save_xpath_data(watch_uuid=uuid, data=update_handler.xpath_data)
|
||||||
|
|
||||||
|
|
||||||
self.current_uuid = None # Done
|
self.current_uuid = None
|
||||||
self.q.task_done()
|
|
||||||
|
|
||||||
# Give the CPU time to interrupt
|
|
||||||
time.sleep(0.1)
|
|
||||||
|
|
||||||
self.app.config.exit.wait(1)
|
|
||||||
@@ -33,6 +33,8 @@ bs4
|
|||||||
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
|
# XPath filtering, lxml is required by bs4 anyway, but put it here to be safe.
|
||||||
lxml
|
lxml
|
||||||
|
|
||||||
|
PriorityThreadPoolExecutor
|
||||||
|
|
||||||
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
|
# 3.141 was missing socksVersion, 3.150 was not in pypi, so we try 4.1.0
|
||||||
selenium ~= 4.1.0
|
selenium ~= 4.1.0
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user