Compare commits

...

3 Commits

Author SHA1 Message Date
dgtlmoon
9d8558fbc9 LXML memory leak workaround 2022-07-27 15:35:05 +02:00
dgtlmoon
a7a8ba58ed fix status 2022-07-27 13:11:14 +02:00
dgtlmoon
7823140442 WIP 2022-07-27 13:05:23 +02:00
3 changed files with 219 additions and 161 deletions

View File

@@ -105,9 +105,10 @@ def init_app_secret(datastore_path):
# running or something similar. # running or something similar.
@app.template_filter('format_last_checked_time') @app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
# Worker thread tells us which UUID it is currently processing. # Worker thread tells us which UUID it is currently processing.
for t in running_update_threads: for t in threading.enumerate():
if t.current_uuid == watch_obj['uuid']: if t.name == 'update_worker' and t.current_uuid == watch_obj['uuid']:
return '<span class="loader"></span><span> Checking now</span>' return '<span class="loader"></span><span> Checking now</span>'
if watch_obj['last_checked'] == 0: if watch_obj['last_checked'] == 0:
@@ -1213,6 +1214,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=ticker_thread_job_queue_processor).start()
threading.Thread(target=notification_runner).start() threading.Thread(target=notification_runner).start()
@@ -1288,25 +1290,63 @@ def notification_runner():
# Trim the log length # Trim the log length
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Check the queue, when a job exists, start a fresh thread of update_worker
def ticker_thread_job_queue_processor():
from changedetectionio import update_worker
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
time.sleep(0.3)
# Check that some threads are free
running = 0
for t in threading.enumerate():
if t.name == 'update_worker':
running += 1
if running >= n_workers:
continue
try:
uuid = update_q.get(block=False)
except queue.Empty:
# Go back to waiting for exit and/or another entry from the queue
continue
print ("Starting a thread fetch")
try:
# Launch the update_worker thread that will handle picking items off a queue and sending them off
# in the event that playwright or others have a memory leak, this should clean it up better than gc.collect()
# (By letting it exit entirely)
update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start()
except Exception as e:
print ("Error launching update_worker for UUID {}.".format(uuid))
print (str(e))
print ("Running now {}", running)
# Thread runner to check every minute, look for new watches to feed into the Queue. # Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks(): def ticker_thread_check_time_launch_checks():
import random import random
from changedetectionio import update_worker
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
# Can go in its own function
# Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch
# This is to be totally sure that they don't leak memory
# Spin up Workers that do the fetching # Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings # Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set(): while not app.config.exit.is_set():
# Get a list of watches by UUID that are currently fetching data # Update our list of watches by UUID that are currently fetching data, used in the UI
running_uuids = [] running_uuids = []
for t in running_update_threads: for t in running_update_threads:
if t.current_uuid: if t.current_uuid:

View File

@@ -4,8 +4,6 @@ from typing import List
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from jsonpath_ng.ext import parse from jsonpath_ng.ext import parse
import re import re
from inscriptis import get_text
from inscriptis.model.config import ParserConfig
class FilterNotFoundInResponse(ValueError): class FilterNotFoundInResponse(ValueError):
def __init__(self, msg): def __init__(self, msg):
@@ -183,10 +181,17 @@ def strip_ignore_text(content, wordlist, mode="content"):
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
import multiprocessing
from inscriptis.model.config import ParserConfig
"""Converts html string to a string with just the text. If ignoring """Converts html string to a string with just the text. If ignoring
rendering anchor tag content is enable, anchor tag content are also rendering anchor tag content is enable, anchor tag content are also
included in the text included in the text
@NOTE: HORRIBLE LXML INDUCED MEMORY LEAK WORKAROUND HERE
https://www.reddit.com/r/Python/comments/j0gl8t/psa_pythonlxml_memory_leaks_and_a_solution/
:param html_content: string with html content :param html_content: string with html content
:param render_anchor_tag_content: boolean flag indicating whether to extract :param render_anchor_tag_content: boolean flag indicating whether to extract
hyperlinks (the anchor tag content) together with text. This refers to the hyperlinks (the anchor tag content) together with text. This refers to the
@@ -207,8 +212,19 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
else: else:
parser_config = None parser_config = None
def parse_function(html_content, parser_config, results_queue):
from inscriptis import get_text
# get text and annotations via inscriptis # get text and annotations via inscriptis
text_content = get_text(html_content, config=parser_config) text_content = get_text(html_content, config=parser_config)
results_queue.put(text_content)
results_queue = multiprocessing.Queue()
parse_process = multiprocessing.Process(target=parse_function, args=(html_content, parser_config, results_queue))
parse_process.daemon = True
parse_process.start()
text_content = results_queue.get() # blocks until results are available
parse_process.terminate()
return text_content return text_content

View File

@@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
# A single update worker # A single update worker
# #
# Requests for checking on a single site(watch) from a queue of watches #
# (another process inserts watches into the queue that are time-ready for checking)
class update_worker(threading.Thread): class update_worker(threading.Thread):
current_uuid = None current_uuid = None
def __init__(self, q, notification_q, app, datastore, *args, **kwargs): def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs):
self.q = q self.q = q
self.app = app self.app = app
self.notification_q = notification_q self.notification_q = notification_q
self.datastore = datastore self.datastore = datastore
self.current_uuid = uuid
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.name = "update_worker"
def send_filter_failure_notification(self, uuid): def send_filter_failure_notification(self, uuid):
@@ -47,22 +48,29 @@ class update_worker(threading.Thread):
self.notification_q.put(n_object) self.notification_q.put(n_object)
print("Sent filter not found notification for {}".format(uuid)) print("Sent filter not found notification for {}".format(uuid))
# Pick one job off the list, process it threaded, exist
def run(self): def run(self):
# Go talk to the website
self.perform_site_update()
self.current_uuid = None # Done
self.q.task_done()
# Let the thread die after processing 1
# We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc)
print ("EXITING THREAD!")
self.app.config.exit.wait(1)
return
def perform_site_update(self):
from changedetectionio import fetch_site_status from changedetectionio import fetch_site_status
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) if not self.current_uuid in list(self.datastore.data['watching'].keys()):
return
while not self.app.config.exit.is_set():
try:
uuid = self.q.get(block=False)
except queue.Empty:
pass
else:
self.current_uuid = uuid
if uuid in list(self.datastore.data['watching'].keys()):
changed_detected = False changed_detected = False
contents = "" contents = ""
@@ -71,55 +79,56 @@ class update_worker(threading.Thread):
xpath_data = False xpath_data = False
now = time.time() now = time.time()
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
try: try:
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid) changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid)
# Re #342 # Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. # In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc # We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)): if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes") raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e: except PermissionError as e:
self.app.logger.error("File permission error updating", uuid, str(e)) self.app.logger.error("File permission error updating", self.current_uuid, str(e))
except content_fetcher.ReplyWithContentButNoText as e: except content_fetcher.ReplyWithContentButNoText as e:
# Totally fine, it's by choice - just continue on, nothing more to care about # Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text # Page had elements/content but no renderable text
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."}) self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."})
except FilterNotFoundInResponse as e: except FilterNotFoundInResponse as e:
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e)) err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
c = 0 c = 0
if self.datastore.data['watching'].get(uuid, False): if self.datastore.data['watching'].get(self.current_uuid, False):
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5) c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5)
c += 1 c += 1
# Send notification if we reached the threshold? # Send notification if we reached the threshold?
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c))
if threshold >0 and c >= threshold: if threshold >0 and c >= threshold:
self.send_filter_failure_notification(uuid) self.send_filter_failure_notification(self.current_uuid)
c = 0 c = 0
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'consecutive_filter_failures': c}) 'consecutive_filter_failures': c})
except content_fetcher.EmptyReply as e: except content_fetcher.EmptyReply as e:
# Some kind of custom to-str handler in the exception handler that does this? # Some kind of custom to-str handler in the exception handler that does this?
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code}) 'last_check_status': e.status_code})
except content_fetcher.ScreenshotUnavailable as e: except content_fetcher.ScreenshotUnavailable as e:
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'" err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code}) 'last_check_status': e.status_code})
except content_fetcher.PageUnloadable as e: except content_fetcher.PageUnloadable as e:
err_text = "Page request from server didnt respond correctly" err_text = "Page request from server didnt respond correctly"
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code}) 'last_check_status': e.status_code})
except Exception as e: except Exception as e:
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
else: else:
try: try:
watch = self.datastore.data['watching'][uuid] watch = self.datastore.data['watching'][self.current_uuid]
fname = "" # Saved history text filename fname = "" # Saved history text filename
# For the FIRST time we check a site, or a change detected, save the snapshot. # For the FIRST time we check a site, or a change detected, save the snapshot.
@@ -129,17 +138,17 @@ class update_worker(threading.Thread):
# Generally update anything interesting returned # Generally update anything interesting returned
update_obj['consecutive_filter_failures'] = 0 update_obj['consecutive_filter_failures'] = 0
self.datastore.update_watch(uuid=uuid, update_obj=update_obj) self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj)
# A change was detected # A change was detected
if changed_detected: if changed_detected:
n_object = {} n_object = {}
print (">> Change detected in UUID {} - {}".format(uuid, watch['url'])) print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url']))
# Notifications should only trigger on the second time (first time, we gather the initial snapshot) # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
if watch.history_n >= 2: if watch.history_n >= 2:
# Atleast 2, means there really was a change # Atleast 2, means there really was a change
self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)}) self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)})
watch_history = watch.history watch_history = watch.history
dates = list(watch_history.keys()) dates = list(watch_history.keys())
@@ -153,7 +162,7 @@ class update_worker(threading.Thread):
# Did it have any notification alerts to hit? # Did it have any notification alerts to hit?
if len(watch['notification_urls']): if len(watch['notification_urls']):
print(">>> Notifications queued for UUID from watch {}".format(uuid)) print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid))
n_object['notification_urls'] = watch['notification_urls'] n_object['notification_urls'] = watch['notification_urls']
n_object['notification_title'] = watch['notification_title'] n_object['notification_title'] = watch['notification_title']
n_object['notification_body'] = watch['notification_body'] n_object['notification_body'] = watch['notification_body']
@@ -161,7 +170,7 @@ class update_worker(threading.Thread):
# No? maybe theres a global setting, queue them all # No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']): elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid)) print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid))
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls'] n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title'] n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body'] n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
@@ -180,7 +189,7 @@ class update_worker(threading.Thread):
from changedetectionio import diff from changedetectionio import diff
n_object.update({ n_object.update({
'watch_url': watch['url'], 'watch_url': watch['url'],
'uuid': uuid, 'uuid': self.current_uuid,
'current_snapshot': contents.decode('utf-8'), 'current_snapshot': contents.decode('utf-8'),
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep), 'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep) 'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
@@ -191,25 +200,18 @@ class update_worker(threading.Thread):
except Exception as e: except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart! # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e) print("!!!! Exception in update_worker !!!\n", e)
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e)) self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)}) self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
finally: finally:
# Always record that we atleast tried # Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3), self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3),
'last_checked': round(time.time())}) 'last_checked': round(time.time())})
# Always save the screenshot if it's available # Always save the screenshot if it's available
if screenshot: if screenshot:
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot) self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot)
if xpath_data: if xpath_data:
self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data) self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data)
self.current_uuid = None # Done
self.q.task_done()
# Give the CPU time to interrupt
time.sleep(0.1)
self.app.config.exit.wait(1)