Compare commits

...

6 Commits

6 changed files with 112 additions and 37 deletions

View File

@@ -210,7 +210,7 @@ jobs:
- name: Store container log
if: always()
uses: actions/upload-artifact@v1
uses: actions/upload-artifact@v4
with:
name: test-cdio-basic-tests-output
path: output-logs

View File

@@ -36,6 +36,13 @@ class BrowserConnectError(Exception):
logger.error(f"Browser connection error {msg}")
return
class BrowserFetchTimedOut(Exception):
msg = ''
def __init__(self, msg):
self.msg = msg
logger.error(f"Browser processing took too long - {msg}")
return
class BrowserStepsStepException(Exception):
def __init__(self, step_n, original_e):
self.step_n = step_n

View File

@@ -6,7 +6,7 @@ from urllib.parse import urlparse
from loguru import logger
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable, BrowserConnectError
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError
class fetcher(Fetcher):
@@ -41,15 +41,15 @@ class fetcher(Fetcher):
self.proxy = {'username': parsed.username, 'password': parsed.password}
# Add the proxy server chrome start option, the username and password never gets added here
# (It always goes in via await self.page.authenticate(self.proxy))
import urllib.parse
# @todo filter some injection attack?
# check /somepath?thisandthat
# check scheme when no scheme
h = urllib.parse.quote(parsed.scheme + "://") if parsed.scheme else ''
h += urllib.parse.quote(f"{parsed.hostname}:{parsed.port}{parsed.path}?{parsed.query}", safe='')
# @todo filter some injection attack?
# check scheme when no scheme
proxy_url = parsed.scheme + "://" if parsed.scheme else 'http://'
r = "?" if not '?' in self.browser_connection_url else '&'
self.browser_connection_url += f"{r}--proxy-server={h}"
port = ":"+str(parsed.port) if parsed.port else ''
q = "?"+parsed.query if parsed.query else ''
proxy_url += f"{parsed.hostname}{port}{parsed.path}{q}"
self.browser_connection_url += f"{r}--proxy-server={proxy_url}"
# def screenshot_step(self, step_n=''):
# screenshot = self.page.screenshot(type='jpeg', full_page=True, quality=85)
@@ -80,6 +80,7 @@ class fetcher(Fetcher):
from changedetectionio.content_fetchers import visualselector_xpath_selectors
self.delete_browser_steps_screenshots()
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
from pyppeteer import Pyppeteer
pyppeteer_instance = Pyppeteer()
@@ -88,7 +89,7 @@ class fetcher(Fetcher):
# @todo timeout
try:
browser = await pyppeteer_instance.connect(browserWSEndpoint=self.browser_connection_url,
defaultViewport={"width": 1024, "height": 768}
ignoreHTTPSErrors=True
)
except websockets.exceptions.InvalidStatusCode as e:
raise BrowserConnectError(msg=f"Error while trying to connect the browser, Code {e.status_code} (check your access)")
@@ -107,8 +108,8 @@ class fetcher(Fetcher):
# SOCKS5 with authentication is not supported (yet)
# https://github.com/microsoft/playwright/issues/10567
self.page.setDefaultNavigationTimeout(0)
if self.proxy:
await self.page.setCacheEnabled(True)
if self.proxy and self.proxy.get('username'):
# Setting Proxy-Authentication header is deprecated, and doing so can trigger header change errors from Puppeteer
# https://github.com/puppeteer/puppeteer/issues/676 ?
# https://help.brightdata.com/hc/en-us/articles/12632549957649-Proxy-Manager-How-to-Guides#h_01HAKWR4Q0AFS8RZTNYWRDFJC2
@@ -123,7 +124,7 @@ class fetcher(Fetcher):
# browsersteps_interface.page = self.page
response = await self.page.goto(url, waitUntil="load")
self.headers = response.headers
if response is None:
await self.page.close()
@@ -131,6 +132,8 @@ class fetcher(Fetcher):
logger.warning("Content Fetcher > Response object was none")
raise EmptyReply(url=url, status_code=None)
self.headers = response.headers
try:
if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
await self.page.evaluate(self.webdriver_js_execute_code)
@@ -142,9 +145,6 @@ class fetcher(Fetcher):
# This can be ok, we will try to grab what we could retrieve
raise PageUnloadable(url=url, status_code=None, message=str(e))
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
await asyncio.sleep(1 + extra_wait)
try:
self.status_code = response.status
except Exception as e:
@@ -221,14 +221,21 @@ class fetcher(Fetcher):
def run(self, url, timeout, request_headers, request_body, request_method, ignore_status_codes=False,
current_include_filters=None, is_binary=False):
#@todo make update_worker async which could run any of these content_fetchers within memory and time constraints
max_time = os.getenv('PUPPETEER_MAX_PROCESSING_TIMEOUT_SECONDS', 180)
# This will work in 3.10 but not >= 3.11 because 3.11 wants tasks only
asyncio.run(self.main(
url=url,
timeout=timeout,
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
ignore_status_codes=ignore_status_codes,
current_include_filters=current_include_filters,
is_binary=is_binary
))
try:
asyncio.run(asyncio.wait_for(self.main(
url=url,
timeout=timeout,
request_headers=request_headers,
request_body=request_body,
request_method=request_method,
ignore_status_codes=ignore_status_codes,
current_include_filters=current_include_filters,
is_binary=is_binary
), timeout=max_time))
except asyncio.TimeoutError:
raise(BrowserFetchTimedOut(msg=f"Browser connected but was unable to process the page in {max_time} seconds."))

View File

@@ -1547,6 +1547,7 @@ def changedetection_app(config=None, datastore_o=None):
# @todo handle ctrl break
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
threading.Thread(target=notification_runner).start()
threading.Thread(target=thread_maintain_worker_thread_pool).start()
# Check for new release version, but not when running in test/build or pytest
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
@@ -1629,23 +1630,73 @@ def notification_runner():
# Trim the log length
notification_debug_log = notification_debug_log[-100:]
def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []
for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)
for i in dead_threads:
del running_update_threads[i]
for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
app.config.exit.wait(2)
def thread_maintain_worker_thread_pool():
from changedetectionio import update_worker
logger.info("Starting thread pool worker maintainer thread")
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
while not app.config.exit.is_set():
needed_threads = n_workers if not running_update_threads else 0
how_many_running_now = 0
dead_threads = []
for i, t in enumerate(running_update_threads):
if t.is_alive():
how_many_running_now += 1
else:
dead_threads.append(i)
for i in dead_threads:
del running_update_threads[i]
for _ in range(needed_threads - how_many_running_now):
logger.info("Adding new worker thread")
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
app.config.exit.wait(2)
# Thread runner to check every minute, look for new watches to feed into the Queue.
def ticker_thread_check_time_launch_checks():
import random
from changedetectionio import update_worker
proxy_last_called_time = {}
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
# Spin up Workers that do the fetching
# Can be overriden by ENV or use the default settings
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
for _ in range(n_workers):
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
running_update_threads.append(new_worker)
new_worker.start()
while not app.config.exit.is_set():
@@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
priority = int(time.time())
logger.debug(
f"> Queued watch UUID {uuid} "
f"last checked at {watch['last_checked']} "
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
f"queued at {now:0.2f} priority {priority} "
f"jitter {watch.jitter_seconds:0.2f}s, "
f"{now - watch['last_checked']:0.2f}s since last checked")

View File

@@ -75,8 +75,12 @@ class difference_detection_processor():
proxy_url = None
if preferred_proxy_id:
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
# Custom browser endpoints should not have a proxy added
if not preferred_proxy_id.startswith('ui-'):
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
else:
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)

View File

@@ -369,6 +369,12 @@ class update_worker(threading.Thread):
}
)
process_changedetection_results = False
except content_fetchers.exceptions.BrowserFetchTimedOut as e:
self.datastore.update_watch(uuid=uuid,
update_obj={'last_error': e.msg
}
)
process_changedetection_results = False
except content_fetchers.exceptions.BrowserStepsStepException as e:
if not self.datastore.data['watching'].get(uuid):