Compare commits

...

6 Commits

Author SHA1 Message Date
dgtlmoon
78f3f2b26a Merge branch 'master' into selenium-proxy-fix 2025-05-02 14:05:45 +02:00
dgtlmoon
f57bc10973 Update selenium library (#3170)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-05-02 14:05:23 +02:00
dgtlmoon
535ee97ef7 Selenium proxy fixes 2025-05-02 10:54:01 +02:00
dgtlmoon
b2923b8c3a Fixes to ensure proxy errors are handled correctly 2025-05-02 10:21:27 +02:00
dgtlmoon
d2e8f822d6 Restock detection - adding new string
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-05-01 17:58:36 +02:00
dgtlmoon
5fd8200fd9 Conditions - Levenshtein text similarity plugin - adding test, fixing import, fixing check for watches with 1 snapshot history (#3161)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io Container Build Test / test-container-build (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-04-30 16:47:23 +02:00
12 changed files with 283 additions and 167 deletions

View File

@@ -168,9 +168,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
step_optional_value = request.form.get('optional_value') step_optional_value = request.form.get('optional_value')
is_last_step = strtobool(request.form.get('is_last_step')) is_last_step = strtobool(request.form.get('is_last_step'))
# @todo try.. accept.. nice errors not popups..
try: try:
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation, browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation,
selector=step_selector, selector=step_selector,
optional_value=step_optional_value) optional_value=step_optional_value)

View File

@@ -62,23 +62,6 @@ class steppable_browser_interface():
def __init__(self, start_url): def __init__(self, start_url):
self.start_url = start_url self.start_url = start_url
def safe_page_operation(self, operation_fn, default_return=None):
"""Safely execute a page operation with error handling"""
if self.page is None:
logger.warning("Attempted operation on None page object")
return default_return
try:
return operation_fn()
except Exception as e:
logger.debug(f"Page operation failed: {str(e)}")
# Try to reclaim memory if possible
try:
self.page.request_gc()
except:
pass
return default_return
# Convert and perform "Click Button" for example # Convert and perform "Click Button" for example
def call_action(self, action_name, selector=None, optional_value=None): def call_action(self, action_name, selector=None, optional_value=None):
if self.page is None: if self.page is None:
@@ -109,20 +92,11 @@ class steppable_browser_interface():
if optional_value and ('{%' in optional_value or '{{' in optional_value): if optional_value and ('{%' in optional_value or '{{' in optional_value):
optional_value = jinja_render(template_str=optional_value) optional_value = jinja_render(template_str=optional_value)
try:
action_handler(selector, optional_value) action_handler(selector, optional_value)
# Safely wait for timeout # Safely wait for timeout
def wait_timeout():
self.page.wait_for_timeout(1.5 * 1000) self.page.wait_for_timeout(1.5 * 1000)
self.safe_page_operation(wait_timeout)
logger.debug(f"Call action done in {time.time()-now:.2f}s") logger.debug(f"Call action done in {time.time()-now:.2f}s")
except Exception as e:
logger.error(f"Error executing action '{call_action_name}': {str(e)}")
# Request garbage collection to free up resources after error
try:
self.page.request_gc()
except:
pass
def action_goto_url(self, selector=None, value=None): def action_goto_url(self, selector=None, value=None):
if not value: if not value:
@@ -130,11 +104,7 @@ class steppable_browser_interface():
return None return None
now = time.time() now = time.time()
response = self.page.goto(value, timeout=0, wait_until='load')
def goto_operation():
return self.page.goto(value, timeout=0, wait_until='load')
response = self.safe_page_operation(goto_operation)
logger.debug(f"Time to goto URL {time.time()-now:.2f}s") logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
return response return response
@@ -147,61 +117,47 @@ class steppable_browser_interface():
if not value or not len(value.strip()): if not value or not len(value.strip()):
return return
def click_operation():
elem = self.page.get_by_text(value) elem = self.page.get_by_text(value)
if elem.count(): if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
self.safe_page_operation(click_operation)
def action_click_element_containing_text_if_exists(self, selector=None, value=''): def action_click_element_containing_text_if_exists(self, selector=None, value=''):
logger.debug("Clicking element containing text if exists") logger.debug("Clicking element containing text if exists")
if not value or not len(value.strip()): if not value or not len(value.strip()):
return return
def click_if_exists_operation():
elem = self.page.get_by_text(value) elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found") logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count(): if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
self.safe_page_operation(click_if_exists_operation)
def action_enter_text_in_field(self, selector, value): def action_enter_text_in_field(self, selector, value):
if not selector or not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def fill_operation():
self.page.fill(selector, value, timeout=self.action_timeout) self.page.fill(selector, value, timeout=self.action_timeout)
self.safe_page_operation(fill_operation)
def action_execute_js(self, selector, value): def action_execute_js(self, selector, value):
if not value: if not value:
return None return None
def evaluate_operation():
return self.page.evaluate(value) return self.page.evaluate(value)
return self.safe_page_operation(evaluate_operation)
def action_click_element(self, selector, value): def action_click_element(self, selector, value):
logger.debug("Clicking element") logger.debug("Clicking element")
if not selector or not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def click_operation():
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500)) self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
self.safe_page_operation(click_operation)
def action_click_element_if_exists(self, selector, value): def action_click_element_if_exists(self, selector, value):
import playwright._impl._errors as _api_types import playwright._impl._errors as _api_types
logger.debug("Clicking element if exists") logger.debug("Clicking element if exists")
if not selector or not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def click_if_exists_operation():
try: try:
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500)) self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
except _api_types.TimeoutError: except _api_types.TimeoutError:
@@ -210,7 +166,6 @@ class steppable_browser_interface():
# Element was there, but page redrew and now its long long gone # Element was there, but page redrew and now its long long gone
return return
self.safe_page_operation(click_if_exists_operation)
def action_click_x_y(self, selector, value): def action_click_x_y(self, selector, value):
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value): if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
@@ -222,10 +177,8 @@ class steppable_browser_interface():
x = int(float(x.strip())) x = int(float(x.strip()))
y = int(float(y.strip())) y = int(float(y.strip()))
def click_xy_operation():
self.page.mouse.click(x=x, y=y, delay=randint(200, 500)) self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
self.safe_page_operation(click_xy_operation)
except Exception as e: except Exception as e:
logger.error(f"Error parsing x,y coordinates: {str(e)}") logger.error(f"Error parsing x,y coordinates: {str(e)}")
@@ -233,27 +186,17 @@ class steppable_browser_interface():
if not selector or not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def select_operation():
self.page.select_option(selector, label=value, timeout=self.action_timeout) self.page.select_option(selector, label=value, timeout=self.action_timeout)
self.safe_page_operation(select_operation)
def action_scroll_down(self, selector, value): def action_scroll_down(self, selector, value):
def scroll_operation():
# Some sites this doesnt work on for some reason # Some sites this doesnt work on for some reason
self.page.mouse.wheel(0, 600) self.page.mouse.wheel(0, 600)
self.page.wait_for_timeout(1000) self.page.wait_for_timeout(1000)
self.safe_page_operation(scroll_operation)
def action_wait_for_seconds(self, selector, value): def action_wait_for_seconds(self, selector, value):
try: try:
seconds = float(value.strip()) if value else 1.0 seconds = float(value.strip()) if value else 1.0
def wait_operation():
self.page.wait_for_timeout(seconds * 1000) self.page.wait_for_timeout(seconds * 1000)
self.safe_page_operation(wait_operation)
except (ValueError, TypeError) as e: except (ValueError, TypeError) as e:
logger.error(f"Invalid value for wait_for_seconds: {str(e)}") logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
@@ -263,14 +206,11 @@ class steppable_browser_interface():
import json import json
v = json.dumps(value) v = json.dumps(value)
def wait_for_text_operation():
self.page.wait_for_function( self.page.wait_for_function(
f'document.querySelector("body").innerText.includes({v});', f'document.querySelector("body").innerText.includes({v});',
timeout=30000 timeout=30000
) )
self.safe_page_operation(wait_for_text_operation)
def action_wait_for_text_in_element(self, selector, value): def action_wait_for_text_in_element(self, selector, value):
if not selector or not value: if not selector or not value:
@@ -280,68 +220,48 @@ class steppable_browser_interface():
s = json.dumps(selector) s = json.dumps(selector)
v = json.dumps(value) v = json.dumps(value)
def wait_for_text_in_element_operation():
self.page.wait_for_function( self.page.wait_for_function(
f'document.querySelector({s}).innerText.includes({v});', f'document.querySelector({s}).innerText.includes({v});',
timeout=30000 timeout=30000
) )
self.safe_page_operation(wait_for_text_in_element_operation)
# @todo - in the future make some popout interface to capture what needs to be set # @todo - in the future make some popout interface to capture what needs to be set
# https://playwright.dev/python/docs/api/class-keyboard # https://playwright.dev/python/docs/api/class-keyboard
def action_press_enter(self, selector, value): def action_press_enter(self, selector, value):
def press_operation():
self.page.keyboard.press("Enter", delay=randint(200, 500)) self.page.keyboard.press("Enter", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_press_page_up(self, selector, value): def action_press_page_up(self, selector, value):
def press_operation():
self.page.keyboard.press("PageUp", delay=randint(200, 500)) self.page.keyboard.press("PageUp", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_press_page_down(self, selector, value): def action_press_page_down(self, selector, value):
def press_operation():
self.page.keyboard.press("PageDown", delay=randint(200, 500)) self.page.keyboard.press("PageDown", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_check_checkbox(self, selector, value): def action_check_checkbox(self, selector, value):
if not selector: if not selector:
return return
def check_operation():
self.page.locator(selector).check(timeout=self.action_timeout) self.page.locator(selector).check(timeout=self.action_timeout)
self.safe_page_operation(check_operation)
def action_uncheck_checkbox(self, selector, value): def action_uncheck_checkbox(self, selector, value):
if not selector: if not selector:
return return
def uncheck_operation():
self.page.locator(selector).uncheck(timeout=self.action_timeout) self.page.locator(selector).uncheck(timeout=self.action_timeout)
self.safe_page_operation(uncheck_operation)
def action_remove_elements(self, selector, value): def action_remove_elements(self, selector, value):
"""Removes all elements matching the given selector from the DOM.""" """Removes all elements matching the given selector from the DOM."""
if not selector: if not selector:
return return
def remove_operation():
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())") self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
self.safe_page_operation(remove_operation)
def action_make_all_child_elements_visible(self, selector, value): def action_make_all_child_elements_visible(self, selector, value):
"""Recursively makes all child elements inside the given selector fully visible.""" """Recursively makes all child elements inside the given selector fully visible."""
if not selector: if not selector:
return return
def make_visible_operation():
self.page.locator(selector).locator("*").evaluate_all(""" self.page.locator(selector).locator("*").evaluate_all("""
els => els.forEach(el => { els => els.forEach(el => {
el.style.display = 'block'; // Forces it to be displayed el.style.display = 'block'; // Forces it to be displayed
@@ -355,8 +275,6 @@ class steppable_browser_interface():
}) })
""") """)
self.safe_page_operation(make_visible_operation)
# Responsible for maintaining a live 'context' with the chrome CDP # Responsible for maintaining a live 'context' with the chrome CDP
# @todo - how long do contexts live for anyway? # @todo - how long do contexts live for anyway?
class browsersteps_live_ui(steppable_browser_interface): class browsersteps_live_ui(steppable_browser_interface):

View File

@@ -5,7 +5,7 @@ from json_logic.builtins import BUILTINS
from .exceptions import EmptyConditionRuleRowNotUsable from .exceptions import EmptyConditionRuleRowNotUsable
from .pluggy_interface import plugin_manager # Import the pluggy plugin manager from .pluggy_interface import plugin_manager # Import the pluggy plugin manager
from . import default_plugin from . import default_plugin
from loguru import logger
# List of all supported JSON Logic operators # List of all supported JSON Logic operators
operator_choices = [ operator_choices = [
(None, "Choose one - Operator"), (None, "Choose one - Operator"),
@@ -113,12 +113,14 @@ def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_dat
application_datastruct=application_datastruct, application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data ephemeral_data=ephemeral_data
) )
logger.debug(f"Trying plugin {plugin}....")
# Set a timeout of 10 seconds # Set a timeout of 10 seconds
try: try:
new_execute_data = future.result(timeout=10) new_execute_data = future.result(timeout=10)
if new_execute_data and isinstance(new_execute_data, dict): if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data) EXECUTE_DATA.update(new_execute_data)
except concurrent.futures.TimeoutError: except concurrent.futures.TimeoutError:
# The plugin took too long, abort processing for this watch # The plugin took too long, abort processing for this watch
raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.") raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.")

View File

@@ -9,15 +9,20 @@ def levenshtein_ratio_recent_history(watch, incoming_text=None):
try: try:
from Levenshtein import ratio, distance from Levenshtein import ratio, distance
k = list(watch.history.keys()) k = list(watch.history.keys())
if len(k) >= 2: a = None
b = None
# When called from ui_edit_stats_extras, we don't have incoming_text # When called from ui_edit_stats_extras, we don't have incoming_text
if incoming_text is None: if incoming_text is None:
a = watch.get_history_snapshot(timestamp=k[-1]) # Latest snapshot a = watch.get_history_snapshot(timestamp=k[-1]) # Latest snapshot
b = watch.get_history_snapshot(timestamp=k[-2]) # Previous snapshot b = watch.get_history_snapshot(timestamp=k[-2]) # Previous snapshot
else:
a = watch.get_history_snapshot(timestamp=k[-2]) # Second newest, incoming_text will be "newest"
b = incoming_text
# Needs atleast one snapshot
elif len(k) >= 1: # Should be atleast one snapshot to compare against
a = watch.get_history_snapshot(timestamp=k[-1]) # Latest saved snapshot
b = incoming_text if incoming_text else k[-2]
if a and b:
distance_value = distance(a, b) distance_value = distance(a, b)
ratio_value = ratio(a, b) ratio_value = ratio(a, b)
return { return {
@@ -53,7 +58,7 @@ def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
# ephemeral_data['text'] will be the current text after filters, they may have edited filters but not saved them yet etc # ephemeral_data['text'] will be the current text after filters, they may have edited filters but not saved them yet etc
if watch and 'text' in ephemeral_data: if watch and 'text' in ephemeral_data:
lev_data = levenshtein_ratio_recent_history(watch, ephemeral_data['text']) lev_data = levenshtein_ratio_recent_history(watch, ephemeral_data.get('text',''))
if isinstance(lev_data, dict): if isinstance(lev_data, dict):
res['levenshtein_ratio'] = lev_data.get('ratio', 0) res['levenshtein_ratio'] = lev_data.get('ratio', 0)
res['levenshtein_similarity'] = lev_data.get('percent_similar', 0) res['levenshtein_similarity'] = lev_data.get('percent_similar', 0)

View File

@@ -194,7 +194,6 @@ class fetcher(Fetcher):
browsersteps_interface.page = self.page browsersteps_interface.page = self.page
response = browsersteps_interface.action_goto_url(value=url) response = browsersteps_interface.action_goto_url(value=url)
self.headers = response.all_headers()
if response is None: if response is None:
context.close() context.close()
@@ -202,6 +201,8 @@ class fetcher(Fetcher):
logger.debug("Content Fetcher > Response object from the browser communication was none") logger.debug("Content Fetcher > Response object from the browser communication was none")
raise EmptyReply(url=url, status_code=None) raise EmptyReply(url=url, status_code=None)
self.headers = response.all_headers()
try: try:
if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code): if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None) browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None)

View File

@@ -28,6 +28,7 @@ class fetcher(Fetcher):
import chardet import chardet
import requests import requests
from requests.exceptions import ProxyError, ConnectionError, RequestException
if self.browser_steps_get_valid_steps(): if self.browser_steps_get_valid_steps():
raise BrowserStepsInUnsupportedFetcher(url=url) raise BrowserStepsInUnsupportedFetcher(url=url)
@@ -52,7 +53,7 @@ class fetcher(Fetcher):
if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'): if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
from requests_file import FileAdapter from requests_file import FileAdapter
session.mount('file://', FileAdapter()) session.mount('file://', FileAdapter())
try:
r = session.request(method=request_method, r = session.request(method=request_method,
data=request_body.encode('utf-8') if type(request_body) is str else request_body, data=request_body.encode('utf-8') if type(request_body) is str else request_body,
url=url, url=url,
@@ -60,6 +61,11 @@ class fetcher(Fetcher):
timeout=timeout, timeout=timeout,
proxies=proxies, proxies=proxies,
verify=False) verify=False)
except Exception as e:
msg = str(e)
if proxies and 'SOCKSHTTPSConnectionPool' in msg:
msg = f"Proxy connection failed? {msg}"
raise Exception(msg) from e
# If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks. # If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks.
# For example - some sites don't tell us it's utf-8, but return utf-8 content # For example - some sites don't tell us it's utf-8, but return utf-8 content

View File

@@ -51,6 +51,7 @@ async () => {
'niet op voorraad', 'niet op voorraad',
'no disponible', 'no disponible',
'no featured offers available', 'no featured offers available',
'no longer available',
'no longer in stock', 'no longer in stock',
'no tickets available', 'no tickets available',
'non disponibile', 'non disponibile',

View File

@@ -76,8 +76,7 @@ class fetcher(Fetcher):
for opt in CHROME_OPTIONS: for opt in CHROME_OPTIONS:
options.add_argument(opt) options.add_argument(opt)
if self.proxy: options.add_argument(f"--proxy-server={self.proxy}")
options.proxy = self.proxy
self.driver = webdriver.Remote( self.driver = webdriver.Remote(
command_executor=self.browser_connection_url, command_executor=self.browser_connection_url,

View File

@@ -82,3 +82,26 @@ done
docker kill squid-one squid-two squid-custom docker kill squid-one squid-two squid-custom
# Test that the UI is returning the correct error message when a proxy is not available
# Requests
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && pytest tests/proxy_list/test_proxy_noconnect.py'
# Playwright
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
# Puppeteer fast
docker run --network changedet-network \
test-changedetectionio \
bash -c 'cd changedetectionio && FAST_PUPPETEER_CHROME_FETCHER=1 PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
# Selenium - todo - fix proxies
docker run --network changedet-network \
-e "WEBDRIVER_URL=http://selenium:4444/wd/hub" \
test-changedetectionio \
bash -c 'cd changedetectionio && pytest tests/proxy_list/test_proxy_noconnect.py'

View File

@@ -0,0 +1,53 @@
#!/usr/bin/env python3
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks
import os
from ... import strtobool
# Just to be sure the UI outputs the right error message on proxy connection failed
def test_proxy_noconnect_custom(client, live_server, measure_memory_usage):
live_server_setup(live_server)
# Goto settings, add our custom one
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_webdriver' if os.getenv('PLAYWRIGHT_DRIVER_URL') else 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
# test:awesome is set in tests/proxy_list/squid-passwords.txt
"requests-extra_proxies-0-proxy_url": "http://THISPROXYDOESNTEXIST:3128",
},
follow_redirects=True
)
assert b"Settings updated." in res.data
res = client.post(
url_for("imports.import_page"),
# Because a URL wont show in squid/proxy logs due it being SSLed
# Use plain HTTP or a specific domain-name here
data={"urls": "https://changedetection.io/CHANGELOG.txt"},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'Page.goto: net::ERR_PROXY_CONNECTION_FAILED' in res.data
# Requests
check_string = b'Proxy connection failed?'
if os.getenv('PLAYWRIGHT_DRIVER_URL') or strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')):
check_string = b'ERR_PROXY_CONNECTION_FAILED'
if os.getenv("WEBDRIVER_URL"):
check_string = b'ERR_PROXY_CONNECTION_FAILED'
assert check_string in res.data

View File

@@ -196,7 +196,11 @@ def test_condition_validate_rule_row(client, live_server):
) )
assert res.status_code == 200 assert res.status_code == 200
assert b'false' in res.data assert b'false' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
@@ -236,3 +240,106 @@ def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
# Assert the word count is counted correctly # Assert the word count is counted correctly
assert b'<td>13</td>' in res.data assert b'<td>13</td>' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_lev_conditions_plugin(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
</body>
</html>
""")
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("ui.ui_views.form_quick_watch_add"),
data={"url": test_url, "tags": '', 'edit_and_watch_submit_button': 'Edit > Watch'},
follow_redirects=True
)
assert b"Watch added in Paused state, saving will unpause" in res.data
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# Give the thread time to pick it up
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid=uuid, unpause_on_save=1),
data={
"url": test_url,
"fetch_backend": "html_requests",
"conditions_match_logic": "ALL", # ALL = AND logic
"conditions-0-field": "levenshtein_ratio",
"conditions-0-operator": "<",
"conditions-0-value": "0.8" # needs to be more of a diff to trigger a change
},
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'unviewed' not in res.data
# Check the content saved initially, even tho a condition was set - this is the first snapshot so shouldnt be affected by conditions
res = client.get(
url_for("ui.ui_views.preview_page", uuid=uuid),
follow_redirects=True
)
assert b'Which is across multiple lines' in res.data
############### Now change it a LITTLE bit...
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("""<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happenxxxxxxxxx. <br>
</body>
</html>
""")
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
assert b'Queued 1 watch for rechecking.' in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'unviewed' not in res.data #because this will be like 0.90 not 0.8 threshold
############### Now change it a MORE THAN 50%
test_return_data = """<html>
<body>
Some sxxxx<br>
<p>Which is across a lines</p>
<br>
ok. <br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
res = client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
assert b'Queued 1 watch for rechecking.' in res.data
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'unviewed' in res.data
# cleanup for the next
client.get(
url_for("ui.form_delete", uuid="all"),
follow_redirects=True
)

View File

@@ -53,7 +53,8 @@ lxml >=4.8.0,<6,!=5.2.0,!=5.2.1
# XPath 2.0-3.1 support - 4.2.0 broke something? # XPath 2.0-3.1 support - 4.2.0 broke something?
elementpath==4.1.5 elementpath==4.1.5
selenium~=4.14.0 selenium==4.31.0
# https://github.com/pallets/werkzeug/issues/2985 # https://github.com/pallets/werkzeug/issues/2985
# Maybe related to pytest? # Maybe related to pytest?
@@ -90,6 +91,8 @@ extruct
# For cleaning up unknown currency formats # For cleaning up unknown currency formats
babel babel
levenshtein
# Needed for > 3.10, https://github.com/microsoft/playwright-python/issues/2096 # Needed for > 3.10, https://github.com/microsoft/playwright-python/issues/2096
greenlet >= 3.0.3 greenlet >= 3.0.3