Compare commits

...

3 Commits

Author SHA1 Message Date
dgtlmoon
9d8558fbc9 LXML memory leak workaround 2022-07-27 15:35:05 +02:00
dgtlmoon
a7a8ba58ed fix status 2022-07-27 13:11:14 +02:00
dgtlmoon
7823140442 WIP 2022-07-27 13:05:23 +02:00
3 changed files with 219 additions and 161 deletions

View File

@@ -105,9 +105,10 @@ def init_app_secret(datastore_path):
# running or something similar. # running or something similar.
@app.template_filter('format_last_checked_time') @app.template_filter('format_last_checked_time')
def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"):
# Worker thread tells us which UUID it is currently processing. # Worker thread tells us which UUID it is currently processing.
for t in running_update_threads: for t in threading.enumerate():
if t.current_uuid == watch_obj['uuid']: if t.name == 'update_worker' and t.current_uuid == watch_obj['uuid']:
return '<span class="loader"></span><span> Checking now</span>' return '<span class="loader"></span><span> Checking now</span>'
if watch_obj['last_checked'] == 0: if watch_obj['last_checked'] == 0:
@@ -1213,6 +1214,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break # @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start() ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=ticker_thread_job_queue_processor).start()
threading.Thread(target=notification_runner).start() threading.Thread(target=notification_runner).start()
@@ -1288,25 +1290,63 @@ def notification_runner():
# Trim the log length # Trim the log length
notification_debug_log = notification_debug_log[-100:] notification_debug_log = notification_debug_log[-100:]
# Check the queue, when a job exists, start a fresh thread of update_worker
def ticker_thread_job_queue_processor():
from changedetectionio import update_worker
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
time.sleep(0.3)
# Check that some threads are free
running = 0
for t in threading.enumerate():
if t.name == 'update_worker':
running += 1
if running >= n_workers:
continue
try:
uuid = update_q.get(block=False)
except queue.Empty:
# Go back to waiting for exit and/or another entry from the queue
continue
print ("Starting a thread fetch")
try:
# Launch the update_worker thread that will handle picking items off a queue and sending them off
# in the event that playwright or others have a memory leak, this should clean it up better than gc.collect()
# (By letting it exit entirely)
update_worker.update_worker(update_q, notification_q, app, datastore, uuid).start()
except Exception as e:
print ("Error launching update_worker for UUID {}.".format(uuid))
print (str(e))
print ("Running now {}", running)
# Thread runner to check every minute, look for new watches to feed into the Queue. # Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks(): def ticker_thread_check_time_launch_checks():
import random import random
from changedetectionio import update_worker
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20)) recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds) print("System env MINIMUM_SECONDS_RECHECK_TIME", recheck_time_minimum_seconds)
# Can go in its own function
# Always maintain the minimum number of threads, each thread will terminate when it has processed exactly 1 queued watch
# This is to be totally sure that they don't leak memory
# Spin up Workers that do the fetching # Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings # Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set(): while not app.config.exit.is_set():
# Get a list of watches by UUID that are currently fetching data # Update our list of watches by UUID that are currently fetching data, used in the UI
running_uuids = [] running_uuids = []
for t in running_update_threads: for t in running_update_threads:
if t.current_uuid: if t.current_uuid:

View File

@@ -4,8 +4,6 @@ from typing import List
from bs4 import BeautifulSoup from bs4 import BeautifulSoup
from jsonpath_ng.ext import parse from jsonpath_ng.ext import parse
import re import re
from inscriptis import get_text
from inscriptis.model.config import ParserConfig
class FilterNotFoundInResponse(ValueError): class FilterNotFoundInResponse(ValueError):
def __init__(self, msg): def __init__(self, msg):
@@ -183,9 +181,16 @@ def strip_ignore_text(content, wordlist, mode="content"):
def html_to_text(html_content: str, render_anchor_tag_content=False) -> str: def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
import multiprocessing
from inscriptis.model.config import ParserConfig
"""Converts html string to a string with just the text. If ignoring """Converts html string to a string with just the text. If ignoring
rendering anchor tag content is enable, anchor tag content are also rendering anchor tag content is enable, anchor tag content are also
included in the text included in the text
@NOTE: HORRIBLE LXML INDUCED MEMORY LEAK WORKAROUND HERE
https://www.reddit.com/r/Python/comments/j0gl8t/psa_pythonlxml_memory_leaks_and_a_solution/
:param html_content: string with html content :param html_content: string with html content
:param render_anchor_tag_content: boolean flag indicating whether to extract :param render_anchor_tag_content: boolean flag indicating whether to extract
@@ -207,8 +212,19 @@ def html_to_text(html_content: str, render_anchor_tag_content=False) -> str:
else: else:
parser_config = None parser_config = None
# get text and annotations via inscriptis
text_content = get_text(html_content, config=parser_config) def parse_function(html_content, parser_config, results_queue):
from inscriptis import get_text
# get text and annotations via inscriptis
text_content = get_text(html_content, config=parser_config)
results_queue.put(text_content)
results_queue = multiprocessing.Queue()
parse_process = multiprocessing.Process(target=parse_function, args=(html_content, parser_config, results_queue))
parse_process.daemon = True
parse_process.start()
text_content = results_queue.get() # blocks until results are available
parse_process.terminate()
return text_content return text_content

View File

@@ -7,19 +7,20 @@ from changedetectionio.html_tools import FilterNotFoundInResponse
# A single update worker # A single update worker
# #
# Requests for checking on a single site(watch) from a queue of watches #
# (another process inserts watches into the queue that are time-ready for checking)
class update_worker(threading.Thread): class update_worker(threading.Thread):
current_uuid = None current_uuid = None
def __init__(self, q, notification_q, app, datastore, *args, **kwargs): def __init__(self, q, notification_q, app, datastore, uuid, *args, **kwargs):
self.q = q self.q = q
self.app = app self.app = app
self.notification_q = notification_q self.notification_q = notification_q
self.datastore = datastore self.datastore = datastore
self.current_uuid = uuid
super().__init__(*args, **kwargs) super().__init__(*args, **kwargs)
self.name = "update_worker"
def send_filter_failure_notification(self, uuid): def send_filter_failure_notification(self, uuid):
@@ -47,169 +48,170 @@ class update_worker(threading.Thread):
self.notification_q.put(n_object) self.notification_q.put(n_object)
print("Sent filter not found notification for {}".format(uuid)) print("Sent filter not found notification for {}".format(uuid))
# Pick one job off the list, process it threaded, exist
def run(self): def run(self):
# Go talk to the website
self.perform_site_update()
self.current_uuid = None # Done
self.q.task_done()
# Let the thread die after processing 1
# We will launch nice juicy fresh threads every time to prevent memory leaks in complex runner code (playwright etc)
print ("EXITING THREAD!")
self.app.config.exit.wait(1)
return
def perform_site_update(self):
from changedetectionio import fetch_site_status from changedetectionio import fetch_site_status
if not self.current_uuid in list(self.datastore.data['watching'].keys()):
return
changed_detected = False
contents = ""
screenshot = False
update_obj= {}
xpath_data = False
now = time.time()
update_handler = fetch_site_status.perform_site_check(datastore=self.datastore) update_handler = fetch_site_status.perform_site_check(datastore=self.datastore)
try:
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(self.current_uuid)
# Re #342
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes.
# We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e:
self.app.logger.error("File permission error updating", self.current_uuid, str(e))
except content_fetcher.ReplyWithContentButNoText as e:
# Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': "Got HTML content but no text found."})
except FilterNotFoundInResponse as e:
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
c = 0
if self.datastore.data['watching'].get(self.current_uuid, False):
c = self.datastore.data['watching'][self.current_uuid].get('consecutive_filter_failures', 5)
c += 1
while not self.app.config.exit.is_set(): # Send notification if we reached the threshold?
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0)
print("Filter for {} not found, consecutive_filter_failures: {}".format(self.current_uuid, c))
if threshold >0 and c >= threshold:
self.send_filter_failure_notification(self.current_uuid)
c = 0
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'consecutive_filter_failures': c})
except content_fetcher.EmptyReply as e:
# Some kind of custom to-str handler in the exception handler that does this?
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code)
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except content_fetcher.ScreenshotUnavailable as e:
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except content_fetcher.PageUnloadable as e:
err_text = "Page request from server didnt respond correctly"
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except Exception as e:
self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
else:
try: try:
uuid = self.q.get(block=False) watch = self.datastore.data['watching'][self.current_uuid]
except queue.Empty: fname = "" # Saved history text filename
pass
else: # For the FIRST time we check a site, or a change detected, save the snapshot.
self.current_uuid = uuid if changed_detected or not watch['last_checked']:
# A change was detected
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time())))
if uuid in list(self.datastore.data['watching'].keys()): # Generally update anything interesting returned
update_obj['consecutive_filter_failures'] = 0
self.datastore.update_watch(uuid=self.current_uuid, update_obj=update_obj)
changed_detected = False # A change was detected
contents = "" if changed_detected:
screenshot = False n_object = {}
update_obj= {} print (">> Change detected in UUID {} - {}".format(self.current_uuid, watch['url']))
xpath_data = False
now = time.time()
try: # Notifications should only trigger on the second time (first time, we gather the initial snapshot)
changed_detected, update_obj, contents, screenshot, xpath_data = update_handler.run(uuid) if watch.history_n >= 2:
# Re #342 # Atleast 2, means there really was a change
# In Python 3, all strings are sequences of Unicode characters. There is a bytes type that holds raw bytes. self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_changed': round(now)})
# We then convert/.decode('utf-8') for the notification etc
if not isinstance(contents, (bytes, bytearray)):
raise Exception("Error - returned data from the fetch handler SHOULD be bytes")
except PermissionError as e:
self.app.logger.error("File permission error updating", uuid, str(e))
except content_fetcher.ReplyWithContentButNoText as e:
# Totally fine, it's by choice - just continue on, nothing more to care about
# Page had elements/content but no renderable text
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': "Got HTML content but no text found."})
except FilterNotFoundInResponse as e:
err_text = "Filter '{}' not found - Did the page change its layout?".format(str(e))
c = 0
if self.datastore.data['watching'].get(uuid, False):
c = self.datastore.data['watching'][uuid].get('consecutive_filter_failures', 5)
c += 1
# Send notification if we reached the threshold? watch_history = watch.history
threshold = self.datastore.data['settings']['application'].get('filter_failure_notification_threshold_attempts', 0) dates = list(watch_history.keys())
print("Filter for {} not found, consecutive_filter_failures: {}".format(uuid, c)) # Theoretically it's possible that this could be just 1 long,
if threshold >0 and c >= threshold: # - In the case that the timestamp key was not unique
self.send_filter_failure_notification(uuid) if len(dates) == 1:
c = 0 raise ValueError(
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
)
prev_fname = watch_history[dates[-2]]
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, # Did it have any notification alerts to hit?
'consecutive_filter_failures': c}) if len(watch['notification_urls']):
except content_fetcher.EmptyReply as e: print(">>> Notifications queued for UUID from watch {}".format(self.current_uuid))
# Some kind of custom to-str handler in the exception handler that does this? n_object['notification_urls'] = watch['notification_urls']
err_text = "EmptyReply - try increasing 'Wait seconds before extracting text', Status Code {}".format(e.status_code) n_object['notification_title'] = watch['notification_title']
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text, n_object['notification_body'] = watch['notification_body']
'last_check_status': e.status_code}) n_object['notification_format'] = watch['notification_format']
except content_fetcher.ScreenshotUnavailable as e:
err_text = "Screenshot unavailable, page did not render fully in the expected time - try increasing 'Wait seconds before extracting text'"
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except content_fetcher.PageUnloadable as e:
err_text = "Page request from server didnt respond correctly"
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': err_text,
'last_check_status': e.status_code})
except Exception as e:
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
else: # No? maybe theres a global setting, queue them all
try: elif len(self.datastore.data['settings']['application']['notification_urls']):
watch = self.datastore.data['watching'][uuid] print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(self.current_uuid))
fname = "" # Saved history text filename n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
else:
print(">>> NO notifications queued, watch and global notification URLs were empty.")
# For the FIRST time we check a site, or a change detected, save the snapshot. # Only prepare to notify if the rules above matched
if changed_detected or not watch['last_checked']: if 'notification_urls' in n_object:
# A change was detected # HTML needs linebreak, but MarkDown and Text can use a linefeed
fname = watch.save_history_text(contents=contents, timestamp=str(round(time.time()))) if n_object['notification_format'] == 'HTML':
line_feed_sep = "</br>"
else:
line_feed_sep = "\n"
# Generally update anything interesting returned from changedetectionio import diff
update_obj['consecutive_filter_failures'] = 0 n_object.update({
self.datastore.update_watch(uuid=uuid, update_obj=update_obj) 'watch_url': watch['url'],
'uuid': self.current_uuid,
'current_snapshot': contents.decode('utf-8'),
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
})
# A change was detected self.notification_q.put(n_object)
if changed_detected:
n_object = {}
print (">> Change detected in UUID {} - {}".format(uuid, watch['url']))
# Notifications should only trigger on the second time (first time, we gather the initial snapshot) except Exception as e:
if watch.history_n >= 2: # Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
# Atleast 2, means there really was a change print("!!!! Exception in update_worker !!!\n", e)
self.datastore.update_watch(uuid=uuid, update_obj={'last_changed': round(now)}) self.app.logger.error("Exception reached processing watch UUID: %s - %s", self.current_uuid, str(e))
self.datastore.update_watch(uuid=self.current_uuid, update_obj={'last_error': str(e)})
watch_history = watch.history finally:
dates = list(watch_history.keys()) # Always record that we atleast tried
# Theoretically it's possible that this could be just 1 long, self.datastore.update_watch(uuid=self.current_uuid, update_obj={'fetch_time': round(time.time() - now, 3),
# - In the case that the timestamp key was not unique 'last_checked': round(time.time())})
if len(dates) == 1:
raise ValueError(
"History index had 2 or more, but only 1 date loaded, timestamps were not unique? maybe two of the same timestamps got written, needs more delay?"
)
prev_fname = watch_history[dates[-2]]
# Did it have any notification alerts to hit? # Always save the screenshot if it's available
if len(watch['notification_urls']): if screenshot:
print(">>> Notifications queued for UUID from watch {}".format(uuid)) self.datastore.save_screenshot(watch_uuid=self.current_uuid, screenshot=screenshot)
n_object['notification_urls'] = watch['notification_urls'] if xpath_data:
n_object['notification_title'] = watch['notification_title'] self.datastore.save_xpath_data(watch_uuid=self.current_uuid, data=xpath_data)
n_object['notification_body'] = watch['notification_body']
n_object['notification_format'] = watch['notification_format']
# No? maybe theres a global setting, queue them all
elif len(self.datastore.data['settings']['application']['notification_urls']):
print(">>> Watch notification URLs were empty, using GLOBAL notifications for UUID: {}".format(uuid))
n_object['notification_urls'] = self.datastore.data['settings']['application']['notification_urls']
n_object['notification_title'] = self.datastore.data['settings']['application']['notification_title']
n_object['notification_body'] = self.datastore.data['settings']['application']['notification_body']
n_object['notification_format'] = self.datastore.data['settings']['application']['notification_format']
else:
print(">>> NO notifications queued, watch and global notification URLs were empty.")
# Only prepare to notify if the rules above matched
if 'notification_urls' in n_object:
# HTML needs linebreak, but MarkDown and Text can use a linefeed
if n_object['notification_format'] == 'HTML':
line_feed_sep = "</br>"
else:
line_feed_sep = "\n"
from changedetectionio import diff
n_object.update({
'watch_url': watch['url'],
'uuid': uuid,
'current_snapshot': contents.decode('utf-8'),
'diff': diff.render_diff(prev_fname, fname, line_feed_sep=line_feed_sep),
'diff_full': diff.render_diff(prev_fname, fname, True, line_feed_sep=line_feed_sep)
})
self.notification_q.put(n_object)
except Exception as e:
# Catch everything possible here, so that if a worker crashes, we don't lose it until restart!
print("!!!! Exception in update_worker !!!\n", e)
self.app.logger.error("Exception reached processing watch UUID: %s - %s", uuid, str(e))
self.datastore.update_watch(uuid=uuid, update_obj={'last_error': str(e)})
finally:
# Always record that we atleast tried
self.datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - now, 3),
'last_checked': round(time.time())})
# Always save the screenshot if it's available
if screenshot:
self.datastore.save_screenshot(watch_uuid=uuid, screenshot=screenshot)
if xpath_data:
self.datastore.save_xpath_data(watch_uuid=uuid, data=xpath_data)
self.current_uuid = None # Done
self.q.task_done()
# Give the CPU time to interrupt
time.sleep(0.1)
self.app.config.exit.wait(1)