Compare commits

...

4 Commits

Author SHA1 Message Date
dgtlmoon
8df61f5eaa 0.49.16
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-05-03 16:43:04 +02:00
dgtlmoon
162f573967 Fixes to ensure proxy errors are handled correctly (#3168) 2025-05-03 16:05:40 +02:00
dgtlmoon
eada0ef08d UI - Custom headers should have validation (#3172) 2025-05-03 13:57:42 +02:00
dgtlmoon
f57bc10973 Update selenium library (#3170)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-05-02 14:05:23 +02:00
12 changed files with 286 additions and 239 deletions

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.49.15'
__version__ = '0.49.16'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -168,9 +168,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
step_optional_value = request.form.get('optional_value')
is_last_step = strtobool(request.form.get('is_last_step'))
# @todo try.. accept.. nice errors not popups..
try:
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation,
selector=step_selector,
optional_value=step_optional_value)

View File

@@ -61,23 +61,6 @@ class steppable_browser_interface():
def __init__(self, start_url):
self.start_url = start_url
def safe_page_operation(self, operation_fn, default_return=None):
"""Safely execute a page operation with error handling"""
if self.page is None:
logger.warning("Attempted operation on None page object")
return default_return
try:
return operation_fn()
except Exception as e:
logger.debug(f"Page operation failed: {str(e)}")
# Try to reclaim memory if possible
try:
self.page.request_gc()
except:
pass
return default_return
# Convert and perform "Click Button" for example
def call_action(self, action_name, selector=None, optional_value=None):
@@ -109,20 +92,11 @@ class steppable_browser_interface():
if optional_value and ('{%' in optional_value or '{{' in optional_value):
optional_value = jinja_render(template_str=optional_value)
try:
action_handler(selector, optional_value)
# Safely wait for timeout
def wait_timeout():
self.page.wait_for_timeout(1.5 * 1000)
self.safe_page_operation(wait_timeout)
logger.debug(f"Call action done in {time.time()-now:.2f}s")
except Exception as e:
logger.error(f"Error executing action '{call_action_name}': {str(e)}")
# Request garbage collection to free up resources after error
try:
self.page.request_gc()
except:
pass
action_handler(selector, optional_value)
# Safely wait for timeout
self.page.wait_for_timeout(1.5 * 1000)
logger.debug(f"Call action done in {time.time()-now:.2f}s")
def action_goto_url(self, selector=None, value=None):
if not value:
@@ -130,11 +104,7 @@ class steppable_browser_interface():
return None
now = time.time()
def goto_operation():
return self.page.goto(value, timeout=0, wait_until='load')
response = self.safe_page_operation(goto_operation)
response = self.page.goto(value, timeout=0, wait_until='load')
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
return response
@@ -147,53 +117,40 @@ class steppable_browser_interface():
if not value or not len(value.strip()):
return
def click_operation():
elem = self.page.get_by_text(value)
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
self.safe_page_operation(click_operation)
elem = self.page.get_by_text(value)
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
logger.debug("Clicking element containing text if exists")
if not value or not len(value.strip()):
return
def click_if_exists_operation():
elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
self.safe_page_operation(click_if_exists_operation)
def action_enter_text_in_field(self, selector, value):
if not selector or not len(selector.strip()):
return
def fill_operation():
self.page.fill(selector, value, timeout=self.action_timeout)
self.safe_page_operation(fill_operation)
self.page.fill(selector, value, timeout=self.action_timeout)
def action_execute_js(self, selector, value):
if not value:
return None
def evaluate_operation():
return self.page.evaluate(value)
return self.safe_page_operation(evaluate_operation)
return self.page.evaluate(value)
def action_click_element(self, selector, value):
logger.debug("Clicking element")
if not selector or not len(selector.strip()):
return
def click_operation():
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
self.safe_page_operation(click_operation)
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
def action_click_element_if_exists(self, selector, value):
import playwright._impl._errors as _api_types
@@ -201,16 +158,14 @@ class steppable_browser_interface():
if not selector or not len(selector.strip()):
return
def click_if_exists_operation():
try:
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
except _api_types.TimeoutError:
return
except _api_types.Error:
# Element was there, but page redrew and now its long long gone
return
try:
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
except _api_types.TimeoutError:
return
except _api_types.Error:
# Element was there, but page redrew and now its long long gone
return
self.safe_page_operation(click_if_exists_operation)
def action_click_x_y(self, selector, value):
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
@@ -222,10 +177,8 @@ class steppable_browser_interface():
x = int(float(x.strip()))
y = int(float(y.strip()))
def click_xy_operation():
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
self.safe_page_operation(click_xy_operation)
except Exception as e:
logger.error(f"Error parsing x,y coordinates: {str(e)}")
@@ -233,27 +186,17 @@ class steppable_browser_interface():
if not selector or not len(selector.strip()):
return
def select_operation():
self.page.select_option(selector, label=value, timeout=self.action_timeout)
self.safe_page_operation(select_operation)
self.page.select_option(selector, label=value, timeout=self.action_timeout)
def action_scroll_down(self, selector, value):
def scroll_operation():
# Some sites this doesnt work on for some reason
self.page.mouse.wheel(0, 600)
self.page.wait_for_timeout(1000)
self.safe_page_operation(scroll_operation)
# Some sites this doesnt work on for some reason
self.page.mouse.wheel(0, 600)
self.page.wait_for_timeout(1000)
def action_wait_for_seconds(self, selector, value):
try:
seconds = float(value.strip()) if value else 1.0
def wait_operation():
self.page.wait_for_timeout(seconds * 1000)
self.safe_page_operation(wait_operation)
self.page.wait_for_timeout(seconds * 1000)
except (ValueError, TypeError) as e:
logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
@@ -263,14 +206,11 @@ class steppable_browser_interface():
import json
v = json.dumps(value)
def wait_for_text_operation():
self.page.wait_for_function(
f'document.querySelector("body").innerText.includes({v});',
timeout=30000
)
self.page.wait_for_function(
f'document.querySelector("body").innerText.includes({v});',
timeout=30000
)
self.safe_page_operation(wait_for_text_operation)
def action_wait_for_text_in_element(self, selector, value):
if not selector or not value:
@@ -280,82 +220,60 @@ class steppable_browser_interface():
s = json.dumps(selector)
v = json.dumps(value)
def wait_for_text_in_element_operation():
self.page.wait_for_function(
f'document.querySelector({s}).innerText.includes({v});',
timeout=30000
)
self.safe_page_operation(wait_for_text_in_element_operation)
self.page.wait_for_function(
f'document.querySelector({s}).innerText.includes({v});',
timeout=30000
)
# @todo - in the future make some popout interface to capture what needs to be set
# https://playwright.dev/python/docs/api/class-keyboard
def action_press_enter(self, selector, value):
def press_operation():
self.page.keyboard.press("Enter", delay=randint(200, 500))
self.page.keyboard.press("Enter", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_press_page_up(self, selector, value):
def press_operation():
self.page.keyboard.press("PageUp", delay=randint(200, 500))
self.safe_page_operation(press_operation)
self.page.keyboard.press("PageUp", delay=randint(200, 500))
def action_press_page_down(self, selector, value):
def press_operation():
self.page.keyboard.press("PageDown", delay=randint(200, 500))
self.safe_page_operation(press_operation)
self.page.keyboard.press("PageDown", delay=randint(200, 500))
def action_check_checkbox(self, selector, value):
if not selector:
return
def check_operation():
self.page.locator(selector).check(timeout=self.action_timeout)
self.safe_page_operation(check_operation)
self.page.locator(selector).check(timeout=self.action_timeout)
def action_uncheck_checkbox(self, selector, value):
if not selector:
return
def uncheck_operation():
self.page.locator(selector).uncheck(timeout=self.action_timeout)
self.page.locator(selector).uncheck(timeout=self.action_timeout)
self.safe_page_operation(uncheck_operation)
def action_remove_elements(self, selector, value):
"""Removes all elements matching the given selector from the DOM."""
if not selector:
return
def remove_operation():
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
self.safe_page_operation(remove_operation)
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
def action_make_all_child_elements_visible(self, selector, value):
"""Recursively makes all child elements inside the given selector fully visible."""
if not selector:
return
def make_visible_operation():
self.page.locator(selector).locator("*").evaluate_all("""
els => els.forEach(el => {
el.style.display = 'block'; // Forces it to be displayed
el.style.visibility = 'visible'; // Ensures it's not hidden
el.style.opacity = '1'; // Fully opaque
el.style.position = 'relative'; // Avoids 'absolute' hiding
el.style.height = 'auto'; // Expands collapsed elements
el.style.width = 'auto'; // Ensures full visibility
el.removeAttribute('hidden'); // Removes hidden attribute
el.classList.remove('hidden', 'd-none'); // Removes common CSS hidden classes
})
""")
self.safe_page_operation(make_visible_operation)
self.page.locator(selector).locator("*").evaluate_all("""
els => els.forEach(el => {
el.style.display = 'block'; // Forces it to be displayed
el.style.visibility = 'visible'; // Ensures it's not hidden
el.style.opacity = '1'; // Fully opaque
el.style.position = 'relative'; // Avoids 'absolute' hiding
el.style.height = 'auto'; // Expands collapsed elements
el.style.width = 'auto'; // Ensures full visibility
el.removeAttribute('hidden'); // Removes hidden attribute
el.classList.remove('hidden', 'd-none'); // Removes common CSS hidden classes
})
""")
# Responsible for maintaining a live 'context' with the chrome CDP
# @todo - how long do contexts live for anyway?

View File

@@ -194,7 +194,6 @@ class fetcher(Fetcher):
browsersteps_interface.page = self.page
response = browsersteps_interface.action_goto_url(value=url)
self.headers = response.all_headers()
if response is None:
context.close()
@@ -202,6 +201,8 @@ class fetcher(Fetcher):
logger.debug("Content Fetcher > Response object from the browser communication was none")
raise EmptyReply(url=url, status_code=None)
self.headers = response.all_headers()
try:
if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None)

View File

@@ -28,6 +28,7 @@ class fetcher(Fetcher):
import chardet
import requests
from requests.exceptions import ProxyError, ConnectionError, RequestException
if self.browser_steps_get_valid_steps():
raise BrowserStepsInUnsupportedFetcher(url=url)
@@ -52,14 +53,19 @@ class fetcher(Fetcher):
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
from requests_file import FileAdapter
session.mount('file://', FileAdapter())
r = session.request(method=request_method,
data=request_body.encode('utf-8') if type(request_body) is str else request_body,
url=url,
headers=request_headers,
timeout=timeout,
proxies=proxies,
verify=False)
try:
r = session.request(method=request_method,
data=request_body.encode('utf-8') if type(request_body) is str else request_body,
url=url,
headers=request_headers,
timeout=timeout,
proxies=proxies,
verify=False)
except Exception as e:
msg = str(e)
if proxies and 'SOCKSHTTPSConnectionPool' in msg:
msg = f"Proxy connection failed? {msg}"
raise Exception(msg) from e
# If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks.
# For example - some sites don't tell us it's utf-8, but return utf-8 content

View File

@@ -10,16 +10,13 @@ class fetcher(Fetcher):
else:
fetcher_description = "WebDriver Chrome/Javascript"
# Configs for Proxy setup
# In the ENV vars, is prefixed with "webdriver_", so it is for example "webdriver_sslProxy"
selenium_proxy_settings_mappings = ['proxyType', 'ftpProxy', 'httpProxy', 'noProxy',
'proxyAutoconfigUrl', 'sslProxy', 'autodetect',
'socksProxy', 'socksVersion', 'socksUsername', 'socksPassword']
proxy = None
proxy_url = None
def __init__(self, proxy_override=None, custom_browser_connection_url=None):
super().__init__()
from selenium.webdriver.common.proxy import Proxy as SeleniumProxy
from urllib.parse import urlparse
from selenium.webdriver.common.proxy import Proxy
# .strip('"') is going to save someone a lot of time when they accidently wrap the env value
if not custom_browser_connection_url:
@@ -28,25 +25,27 @@ class fetcher(Fetcher):
self.browser_connection_is_custom = True
self.browser_connection_url = custom_browser_connection_url
# If any proxy settings are enabled, then we should setup the proxy object
proxy_args = {}
for k in self.selenium_proxy_settings_mappings:
v = os.getenv('webdriver_' + k, False)
if v:
proxy_args[k] = v.strip('"')
# Map back standard HTTP_ and HTTPS_PROXY to webDriver httpProxy/sslProxy
if not proxy_args.get('webdriver_httpProxy') and self.system_http_proxy:
proxy_args['httpProxy'] = self.system_http_proxy
if not proxy_args.get('webdriver_sslProxy') and self.system_https_proxy:
proxy_args['httpsProxy'] = self.system_https_proxy
##### PROXY SETUP #####
# Allows override the proxy on a per-request basis
if proxy_override is not None:
proxy_args['httpProxy'] = proxy_override
proxy_sources = [
self.system_http_proxy,
self.system_https_proxy,
os.getenv('webdriver_proxySocks'),
os.getenv('webdriver_socksProxy'),
os.getenv('webdriver_proxyHttp'),
os.getenv('webdriver_httpProxy'),
os.getenv('webdriver_proxyHttps'),
os.getenv('webdriver_httpsProxy'),
os.getenv('webdriver_sslProxy'),
proxy_override, # last one should override
]
# The built in selenium proxy handling is super unreliable!!! so we just grab which ever proxy setting we can find and throw it in --proxy-server=
for k in filter(None, proxy_sources):
if not k:
continue
self.proxy_url = k.strip()
if proxy_args:
self.proxy = SeleniumProxy(raw=proxy_args)
def run(self,
url,
@@ -59,9 +58,7 @@ class fetcher(Fetcher):
is_binary=False,
empty_pages_are_a_change=False):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
from selenium.common.exceptions import WebDriverException
# request_body, request_method unused for now, until some magic in the future happens.
options = ChromeOptions()
@@ -76,59 +73,62 @@ class fetcher(Fetcher):
for opt in CHROME_OPTIONS:
options.add_argument(opt)
if self.proxy:
options.proxy = self.proxy
# 1. proxy_config /Proxy(proxy_config) selenium object is REALLY unreliable
# 2. selenium-wire cant be used because the websocket version conflicts with pypeteer-ng
# 3. selenium only allows ONE runner at a time by default!
# 4. driver must use quit() or it will continue to block/hold the selenium process!!
self.driver = webdriver.Remote(
command_executor=self.browser_connection_url,
options=options)
if self.proxy_url:
options.add_argument(f'--proxy-server={self.proxy_url}')
from selenium.webdriver.remote.remote_connection import RemoteConnection
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
driver = None
try:
# Create the RemoteConnection and set timeout (e.g., 30 seconds)
remote_connection = RemoteConnection(
self.browser_connection_url,
)
remote_connection.set_timeout(30) # seconds
# Now create the driver with the RemoteConnection
driver = RemoteWebDriver(
command_executor=remote_connection,
options=options
)
driver.set_page_load_timeout(int(os.getenv("WEBDRIVER_PAGELOAD_TIMEOUT", 45)))
except Exception as e:
if driver:
driver.quit()
raise e
try:
self.driver.get(url)
except WebDriverException as e:
# Be sure we close the session window
self.quit()
raise
driver.get(url)
if not "--window-size" in os.getenv("CHROME_OPTIONS", ""):
self.driver.set_window_size(1280, 1024)
if not "--window-size" in os.getenv("CHROME_OPTIONS", ""):
driver.set_window_size(1280, 1024)
self.driver.implicitly_wait(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)))
driver.implicitly_wait(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)))
if self.webdriver_js_execute_code is not None:
self.driver.execute_script(self.webdriver_js_execute_code)
# Selenium doesn't automatically wait for actions as good as Playwright, so wait again
self.driver.implicitly_wait(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)))
if self.webdriver_js_execute_code is not None:
driver.execute_script(self.webdriver_js_execute_code)
# Selenium doesn't automatically wait for actions as good as Playwright, so wait again
driver.implicitly_wait(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)))
# @todo - how to check this? is it possible?
self.status_code = 200
# @todo somehow we should try to get this working for WebDriver
# raise EmptyReply(url=url, status_code=r.status_code)
# @todo - how to check this? is it possible?
self.status_code = 200
# @todo somehow we should try to get this working for WebDriver
# raise EmptyReply(url=url, status_code=r.status_code)
# @todo - dom wait loaded?
time.sleep(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay)
self.content = driver.page_source
self.headers = {}
self.screenshot = driver.get_screenshot_as_png()
except Exception as e:
driver.quit()
raise e
# @todo - dom wait loaded?
time.sleep(int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay)
self.content = self.driver.page_source
self.headers = {}
driver.quit()
self.screenshot = self.driver.get_screenshot_as_png()
# Does the connection to the webdriver work? run a test connection.
def is_ready(self):
from selenium import webdriver
from selenium.webdriver.chrome.options import Options as ChromeOptions
self.driver = webdriver.Remote(
command_executor=self.command_executor,
options=ChromeOptions())
# driver.quit() seems to cause better exceptions
self.quit()
return True
def quit(self, watch=None):
if self.driver:
try:
self.driver.quit()
except Exception as e:
logger.debug(f"Content Fetcher > Exception in chrome shutdown/quit {str(e)}")

View File

@@ -224,27 +224,37 @@ class StringDictKeyValue(StringField):
def _value(self):
if self.data:
output = u''
for k in self.data.keys():
output += "{}: {}\r\n".format(k, self.data[k])
output = ''
for k, v in self.data.items():
output += f"{k}: {v}\r\n"
return output
else:
return u''
return ''
# incoming
# incoming data processing + validation
def process_formdata(self, valuelist):
self.data = {}
errors = []
if valuelist:
self.data = {}
# Remove empty strings
cleaned = list(filter(None, valuelist[0].split("\n")))
for s in cleaned:
parts = s.strip().split(':', 1)
if len(parts) == 2:
self.data.update({parts[0].strip(): parts[1].strip()})
# Remove empty strings (blank lines)
cleaned = [line.strip() for line in valuelist[0].split("\n") if line.strip()]
for idx, s in enumerate(cleaned, start=1):
if ':' not in s:
errors.append(f"Line {idx} is missing a ':' separator.")
continue
parts = s.split(':', 1)
key = parts[0].strip()
value = parts[1].strip()
else:
self.data = {}
if not key:
errors.append(f"Line {idx} has an empty key.")
if not value:
errors.append(f"Line {idx} has an empty value.")
self.data[key] = value
if errors:
raise ValidationError("Invalid input:\n" + "\n".join(errors))
class ValidateContentFetcherIsReady(object):
"""

View File

@@ -82,3 +82,25 @@ done
docker kill squid-one squid-two squid-custom
# Test that the UI is returning the correct error message when a proxy is not available
# Requests
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && pytest tests/proxy_list/test_proxy_noconnect.py'
# Playwright
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
# Puppeteer fast
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && FAST_PUPPETEER_CHROME_FETCHER=1 PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
# Selenium
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && WEBDRIVER_URL=http://selenium:4444/wd/hub pytest tests/proxy_list/test_proxy_noconnect.py'

View File

@@ -0,0 +1,68 @@
#!/usr/bin/env python3
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks
import os
from ... import strtobool
# Just to be sure the UI outputs the right error message on proxy connection failed
# docker run -p 4444:4444 --rm --shm-size="2g" selenium/standalone-chrome:4
# PLAYWRIGHT_DRIVER_URL=ws://127.0.0.1:3000 pytest tests/proxy_list/test_proxy_noconnect.py
# FAST_PUPPETEER_CHROME_FETCHER=True PLAYWRIGHT_DRIVER_URL=ws://127.0.0.1:3000 pytest tests/proxy_list/test_proxy_noconnect.py
# WEBDRIVER_URL=http://127.0.0.1:4444/wd/hub pytest tests/proxy_list/test_proxy_noconnect.py
def test_proxy_noconnect_custom(client, live_server, measure_memory_usage):
live_server_setup(live_server)
# Goto settings, add our custom one
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_webdriver' if os.getenv('PLAYWRIGHT_DRIVER_URL') or os.getenv("WEBDRIVER_URL") else 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
# test:awesome is set in tests/proxy_list/squid-passwords.txt
"requests-extra_proxies-0-proxy_url": "http://127.0.0.1:3128",
},
follow_redirects=True
)
assert b"Settings updated." in res.data
test_url = "https://changedetection.io"
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
options = {
"url": test_url,
"fetch_backend": "html_webdriver" if os.getenv('PLAYWRIGHT_DRIVER_URL') or os.getenv("WEBDRIVER_URL") else "html_requests",
"proxy": "ui-0custom-test-proxy",
}
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first", unpause_on_save=1),
data=options,
follow_redirects=True
)
assert b"unpaused" in res.data
import time
wait_for_all_checks(client)
# Requests default
check_string = b'Cannot connect to proxy'
if os.getenv('PLAYWRIGHT_DRIVER_URL') or strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')) or os.getenv("WEBDRIVER_URL"):
check_string = b'ERR_PROXY_CONNECTION_FAILED'
res = client.get(url_for("watchlist.index"))
#with open("/tmp/debug.html", 'wb') as f:
# f.write(res.data)
assert check_string in res.data

View File

@@ -424,3 +424,27 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
# unlink headers.txt on start/stop
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_headers_validation(client, live_server):
#live_server_setup(live_server)
test_url = url_for('test_headers', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={
"url": test_url,
"fetch_backend": 'html_requests',
"headers": "User-AGent agent-from-watch\r\nsadfsadfsadfsdaf\r\n:foobar"},
follow_redirects=True
)
assert b"Line 1 is missing a ':' separator." in res.data
assert b"Line 3 has an empty key." in res.data

View File

@@ -136,7 +136,7 @@ def wait_for_all_checks(client):
res = client.get(url_for("watchlist.index"))
if not b'Checking now' in res.data:
break
logging.getLogger().info("Waiting for watch-list to not say 'Checking now'.. {}".format(attempt))
logging.getLogger().info(f"Waiting for watch-list to not say 'Checking now'.. {attempt}")
time.sleep(1)
attempt += 1

View File

@@ -53,7 +53,7 @@ lxml >=4.8.0,<6,!=5.2.0,!=5.2.1
# XPath 2.0-3.1 support - 4.2.0 broke something?
elementpath==4.1.5
selenium~=4.14.0
selenium~=4.31.0
# https://github.com/pallets/werkzeug/issues/2985
# Maybe related to pytest?
@@ -70,7 +70,7 @@ jq~=1.3; python_version >= "3.8" and sys_platform == "linux"
# playwright is installed at Dockerfile build time because it's not available on all platforms
pyppeteer-ng==2.0.0rc9
pyppeteer-ng==2.0.0rc10
pyppeteerstealth>=0.0.4