mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-04-15 07:28:21 +00:00
Compare commits
6 Commits
misc-fixes
...
adjustable
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
8aa675cbf0 | ||
|
|
ccb42bcb12 | ||
|
|
4163030805 | ||
|
|
140d375ad0 | ||
|
|
1a608d0ae6 | ||
|
|
e6ed91cfe3 |
2
.github/workflows/test-only.yml
vendored
2
.github/workflows/test-only.yml
vendored
@@ -210,7 +210,7 @@ jobs:
|
||||
|
||||
- name: Store container log
|
||||
if: always()
|
||||
uses: actions/upload-artifact@v1
|
||||
uses: actions/upload-artifact@v4
|
||||
with:
|
||||
name: test-cdio-basic-tests-output
|
||||
path: output-logs
|
||||
|
||||
@@ -36,6 +36,13 @@ class BrowserConnectError(Exception):
|
||||
logger.error(f"Browser connection error {msg}")
|
||||
return
|
||||
|
||||
class BrowserFetchTimedOut(Exception):
|
||||
msg = ''
|
||||
def __init__(self, msg):
|
||||
self.msg = msg
|
||||
logger.error(f"Browser processing took too long - {msg}")
|
||||
return
|
||||
|
||||
class BrowserStepsStepException(Exception):
|
||||
def __init__(self, step_n, original_e):
|
||||
self.step_n = step_n
|
||||
|
||||
@@ -6,7 +6,7 @@ from urllib.parse import urlparse
|
||||
|
||||
from loguru import logger
|
||||
from changedetectionio.content_fetchers.base import Fetcher
|
||||
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, ScreenshotUnavailable, BrowserConnectError
|
||||
from changedetectionio.content_fetchers.exceptions import PageUnloadable, Non200ErrorCodeReceived, EmptyReply, BrowserFetchTimedOut, BrowserConnectError
|
||||
|
||||
|
||||
class fetcher(Fetcher):
|
||||
@@ -41,15 +41,15 @@ class fetcher(Fetcher):
|
||||
self.proxy = {'username': parsed.username, 'password': parsed.password}
|
||||
# Add the proxy server chrome start option, the username and password never gets added here
|
||||
# (It always goes in via await self.page.authenticate(self.proxy))
|
||||
import urllib.parse
|
||||
# @todo filter some injection attack?
|
||||
# check /somepath?thisandthat
|
||||
# check scheme when no scheme
|
||||
h = urllib.parse.quote(parsed.scheme + "://") if parsed.scheme else ''
|
||||
h += urllib.parse.quote(f"{parsed.hostname}:{parsed.port}{parsed.path}?{parsed.query}", safe='')
|
||||
|
||||
# @todo filter some injection attack?
|
||||
# check scheme when no scheme
|
||||
proxy_url = parsed.scheme + "://" if parsed.scheme else 'http://'
|
||||
r = "?" if not '?' in self.browser_connection_url else '&'
|
||||
self.browser_connection_url += f"{r}--proxy-server={h}"
|
||||
port = ":"+str(parsed.port) if parsed.port else ''
|
||||
q = "?"+parsed.query if parsed.query else ''
|
||||
proxy_url += f"{parsed.hostname}{port}{parsed.path}{q}"
|
||||
self.browser_connection_url += f"{r}--proxy-server={proxy_url}"
|
||||
|
||||
# def screenshot_step(self, step_n=''):
|
||||
# screenshot = self.page.screenshot(type='jpeg', full_page=True, quality=85)
|
||||
@@ -80,6 +80,7 @@ class fetcher(Fetcher):
|
||||
|
||||
from changedetectionio.content_fetchers import visualselector_xpath_selectors
|
||||
self.delete_browser_steps_screenshots()
|
||||
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
|
||||
|
||||
from pyppeteer import Pyppeteer
|
||||
pyppeteer_instance = Pyppeteer()
|
||||
@@ -88,7 +89,7 @@ class fetcher(Fetcher):
|
||||
# @todo timeout
|
||||
try:
|
||||
browser = await pyppeteer_instance.connect(browserWSEndpoint=self.browser_connection_url,
|
||||
defaultViewport={"width": 1024, "height": 768}
|
||||
ignoreHTTPSErrors=True
|
||||
)
|
||||
except websockets.exceptions.InvalidStatusCode as e:
|
||||
raise BrowserConnectError(msg=f"Error while trying to connect the browser, Code {e.status_code} (check your access)")
|
||||
@@ -107,8 +108,8 @@ class fetcher(Fetcher):
|
||||
# SOCKS5 with authentication is not supported (yet)
|
||||
# https://github.com/microsoft/playwright/issues/10567
|
||||
self.page.setDefaultNavigationTimeout(0)
|
||||
|
||||
if self.proxy:
|
||||
await self.page.setCacheEnabled(True)
|
||||
if self.proxy and self.proxy.get('username'):
|
||||
# Setting Proxy-Authentication header is deprecated, and doing so can trigger header change errors from Puppeteer
|
||||
# https://github.com/puppeteer/puppeteer/issues/676 ?
|
||||
# https://help.brightdata.com/hc/en-us/articles/12632549957649-Proxy-Manager-How-to-Guides#h_01HAKWR4Q0AFS8RZTNYWRDFJC2
|
||||
@@ -123,7 +124,7 @@ class fetcher(Fetcher):
|
||||
# browsersteps_interface.page = self.page
|
||||
|
||||
response = await self.page.goto(url, waitUntil="load")
|
||||
self.headers = response.headers
|
||||
|
||||
|
||||
if response is None:
|
||||
await self.page.close()
|
||||
@@ -131,6 +132,8 @@ class fetcher(Fetcher):
|
||||
logger.warning("Content Fetcher > Response object was none")
|
||||
raise EmptyReply(url=url, status_code=None)
|
||||
|
||||
self.headers = response.headers
|
||||
|
||||
try:
|
||||
if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
|
||||
await self.page.evaluate(self.webdriver_js_execute_code)
|
||||
@@ -142,9 +145,6 @@ class fetcher(Fetcher):
|
||||
# This can be ok, we will try to grab what we could retrieve
|
||||
raise PageUnloadable(url=url, status_code=None, message=str(e))
|
||||
|
||||
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
|
||||
await asyncio.sleep(1 + extra_wait)
|
||||
|
||||
try:
|
||||
self.status_code = response.status
|
||||
except Exception as e:
|
||||
@@ -221,14 +221,21 @@ class fetcher(Fetcher):
|
||||
def run(self, url, timeout, request_headers, request_body, request_method, ignore_status_codes=False,
|
||||
current_include_filters=None, is_binary=False):
|
||||
|
||||
#@todo make update_worker async which could run any of these content_fetchers within memory and time constraints
|
||||
max_time = os.getenv('PUPPETEER_MAX_PROCESSING_TIMEOUT_SECONDS', 180)
|
||||
|
||||
# This will work in 3.10 but not >= 3.11 because 3.11 wants tasks only
|
||||
asyncio.run(self.main(
|
||||
url=url,
|
||||
timeout=timeout,
|
||||
request_headers=request_headers,
|
||||
request_body=request_body,
|
||||
request_method=request_method,
|
||||
ignore_status_codes=ignore_status_codes,
|
||||
current_include_filters=current_include_filters,
|
||||
is_binary=is_binary
|
||||
))
|
||||
try:
|
||||
asyncio.run(asyncio.wait_for(self.main(
|
||||
url=url,
|
||||
timeout=timeout,
|
||||
request_headers=request_headers,
|
||||
request_body=request_body,
|
||||
request_method=request_method,
|
||||
ignore_status_codes=ignore_status_codes,
|
||||
current_include_filters=current_include_filters,
|
||||
is_binary=is_binary
|
||||
), timeout=max_time))
|
||||
except asyncio.TimeoutError:
|
||||
raise(BrowserFetchTimedOut(msg=f"Browser connected but was unable to process the page in {max_time} seconds."))
|
||||
|
||||
|
||||
@@ -1547,6 +1547,7 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
# @todo handle ctrl break
|
||||
ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks).start()
|
||||
threading.Thread(target=notification_runner).start()
|
||||
threading.Thread(target=thread_maintain_worker_thread_pool).start()
|
||||
|
||||
# Check for new release version, but not when running in test/build or pytest
|
||||
if not os.getenv("GITHUB_REF", False) and not config.get('disable_checkver') == True:
|
||||
@@ -1629,23 +1630,73 @@ def notification_runner():
|
||||
# Trim the log length
|
||||
notification_debug_log = notification_debug_log[-100:]
|
||||
|
||||
|
||||
def thread_maintain_worker_thread_pool():
|
||||
from changedetectionio import update_worker
|
||||
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
needed_threads = n_workers if not running_update_threads else 0
|
||||
how_many_running_now = 0
|
||||
dead_threads = []
|
||||
|
||||
for i, t in enumerate(running_update_threads):
|
||||
if t.is_alive():
|
||||
how_many_running_now += 1
|
||||
else:
|
||||
dead_threads.append(i)
|
||||
|
||||
for i in dead_threads:
|
||||
del running_update_threads[i]
|
||||
|
||||
for _ in range(needed_threads - how_many_running_now):
|
||||
logger.info("Adding new worker thread")
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
app.config.exit.wait(2)
|
||||
|
||||
|
||||
|
||||
def thread_maintain_worker_thread_pool():
|
||||
from changedetectionio import update_worker
|
||||
|
||||
logger.info("Starting thread pool worker maintainer thread")
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
needed_threads = n_workers if not running_update_threads else 0
|
||||
how_many_running_now = 0
|
||||
dead_threads = []
|
||||
|
||||
for i, t in enumerate(running_update_threads):
|
||||
if t.is_alive():
|
||||
how_many_running_now += 1
|
||||
else:
|
||||
dead_threads.append(i)
|
||||
|
||||
for i in dead_threads:
|
||||
del running_update_threads[i]
|
||||
|
||||
for _ in range(needed_threads - how_many_running_now):
|
||||
logger.info("Adding new worker thread")
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
app.config.exit.wait(2)
|
||||
|
||||
# Thread runner to check every minute, look for new watches to feed into the Queue.
|
||||
def ticker_thread_check_time_launch_checks():
|
||||
import random
|
||||
from changedetectionio import update_worker
|
||||
|
||||
proxy_last_called_time = {}
|
||||
|
||||
recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 20))
|
||||
logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}")
|
||||
|
||||
# Spin up Workers that do the fetching
|
||||
# Can be overriden by ENV or use the default settings
|
||||
n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers']))
|
||||
for _ in range(n_workers):
|
||||
new_worker = update_worker.update_worker(update_q, notification_q, app, datastore)
|
||||
running_update_threads.append(new_worker)
|
||||
new_worker.start()
|
||||
|
||||
while not app.config.exit.is_set():
|
||||
|
||||
@@ -1728,7 +1779,7 @@ def ticker_thread_check_time_launch_checks():
|
||||
priority = int(time.time())
|
||||
logger.debug(
|
||||
f"> Queued watch UUID {uuid} "
|
||||
f"last checked at {watch['last_checked']} "
|
||||
f"last checked at {watch['last_checked']} ({seconds_since_last_recheck} seconds ago!) recheck min was :{recheck_time_minimum_seconds} "
|
||||
f"queued at {now:0.2f} priority {priority} "
|
||||
f"jitter {watch.jitter_seconds:0.2f}s, "
|
||||
f"{now - watch['last_checked']:0.2f}s since last checked")
|
||||
|
||||
@@ -75,8 +75,12 @@ class difference_detection_processor():
|
||||
|
||||
proxy_url = None
|
||||
if preferred_proxy_id:
|
||||
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
|
||||
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
|
||||
# Custom browser endpoints should not have a proxy added
|
||||
if not preferred_proxy_id.startswith('ui-'):
|
||||
proxy_url = self.datastore.proxy_list.get(preferred_proxy_id).get('url')
|
||||
logger.debug(f"Selected proxy key '{preferred_proxy_id}' as proxy URL '{proxy_url}' for {url}")
|
||||
else:
|
||||
logger.debug(f"Skipping adding proxy data when custom Browser endpoint is specified.")
|
||||
|
||||
# Now call the fetcher (playwright/requests/etc) with arguments that only a fetcher would need.
|
||||
# When browser_connection_url is None, it method should default to working out whats the best defaults (os env vars etc)
|
||||
|
||||
@@ -369,6 +369,12 @@ class update_worker(threading.Thread):
|
||||
}
|
||||
)
|
||||
process_changedetection_results = False
|
||||
except content_fetchers.exceptions.BrowserFetchTimedOut as e:
|
||||
self.datastore.update_watch(uuid=uuid,
|
||||
update_obj={'last_error': e.msg
|
||||
}
|
||||
)
|
||||
process_changedetection_results = False
|
||||
except content_fetchers.exceptions.BrowserStepsStepException as e:
|
||||
|
||||
if not self.datastore.data['watching'].get(uuid):
|
||||
|
||||
Reference in New Issue
Block a user