mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 08:34:57 +00:00 
			
		
		
		
	Compare commits
	
		
			3 Commits
		
	
	
		
			favicon-de
			...
			selenium-p
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					78f3f2b26a | ||
| 
						 | 
					535ee97ef7 | ||
| 
						 | 
					b2923b8c3a | 
@@ -168,9 +168,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
 | 
			
		||||
            step_optional_value = request.form.get('optional_value')
 | 
			
		||||
            is_last_step = strtobool(request.form.get('is_last_step'))
 | 
			
		||||
 | 
			
		||||
            # @todo try.. accept.. nice errors not popups..
 | 
			
		||||
            try:
 | 
			
		||||
 | 
			
		||||
                browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(action_name=step_operation,
 | 
			
		||||
                                         selector=step_selector,
 | 
			
		||||
                                         optional_value=step_optional_value)
 | 
			
		||||
 
 | 
			
		||||
@@ -61,23 +61,6 @@ class steppable_browser_interface():
 | 
			
		||||
 | 
			
		||||
    def __init__(self, start_url):
 | 
			
		||||
        self.start_url = start_url
 | 
			
		||||
        
 | 
			
		||||
    def safe_page_operation(self, operation_fn, default_return=None):
 | 
			
		||||
        """Safely execute a page operation with error handling"""
 | 
			
		||||
        if self.page is None:
 | 
			
		||||
            logger.warning("Attempted operation on None page object")
 | 
			
		||||
            return default_return
 | 
			
		||||
            
 | 
			
		||||
        try:
 | 
			
		||||
            return operation_fn()
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.debug(f"Page operation failed: {str(e)}")
 | 
			
		||||
            # Try to reclaim memory if possible
 | 
			
		||||
            try:
 | 
			
		||||
                self.page.request_gc()
 | 
			
		||||
            except:
 | 
			
		||||
                pass
 | 
			
		||||
            return default_return
 | 
			
		||||
 | 
			
		||||
    # Convert and perform "Click Button" for example
 | 
			
		||||
    def call_action(self, action_name, selector=None, optional_value=None):
 | 
			
		||||
@@ -109,20 +92,11 @@ class steppable_browser_interface():
 | 
			
		||||
        if optional_value and ('{%' in optional_value or '{{' in optional_value):
 | 
			
		||||
            optional_value = jinja_render(template_str=optional_value)
 | 
			
		||||
 | 
			
		||||
        try:
 | 
			
		||||
            action_handler(selector, optional_value)
 | 
			
		||||
            # Safely wait for timeout
 | 
			
		||||
            def wait_timeout():
 | 
			
		||||
                self.page.wait_for_timeout(1.5 * 1000)
 | 
			
		||||
            self.safe_page_operation(wait_timeout)
 | 
			
		||||
            logger.debug(f"Call action done in {time.time()-now:.2f}s")
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error executing action '{call_action_name}': {str(e)}")
 | 
			
		||||
            # Request garbage collection to free up resources after error
 | 
			
		||||
            try:
 | 
			
		||||
                self.page.request_gc()
 | 
			
		||||
            except:
 | 
			
		||||
                pass
 | 
			
		||||
 | 
			
		||||
        action_handler(selector, optional_value)
 | 
			
		||||
        # Safely wait for timeout
 | 
			
		||||
        self.page.wait_for_timeout(1.5 * 1000)
 | 
			
		||||
        logger.debug(f"Call action done in {time.time()-now:.2f}s")
 | 
			
		||||
 | 
			
		||||
    def action_goto_url(self, selector=None, value=None):
 | 
			
		||||
        if not value:
 | 
			
		||||
@@ -130,11 +104,7 @@ class steppable_browser_interface():
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        now = time.time()
 | 
			
		||||
        
 | 
			
		||||
        def goto_operation():
 | 
			
		||||
            return self.page.goto(value, timeout=0, wait_until='load')
 | 
			
		||||
            
 | 
			
		||||
        response = self.safe_page_operation(goto_operation)
 | 
			
		||||
        response = self.page.goto(value, timeout=0, wait_until='load')
 | 
			
		||||
        logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
 | 
			
		||||
        return response
 | 
			
		||||
 | 
			
		||||
@@ -147,53 +117,40 @@ class steppable_browser_interface():
 | 
			
		||||
        if not value or not len(value.strip()):
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def click_operation():
 | 
			
		||||
            elem = self.page.get_by_text(value)
 | 
			
		||||
            if elem.count():
 | 
			
		||||
                elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
 | 
			
		||||
                
 | 
			
		||||
        self.safe_page_operation(click_operation)
 | 
			
		||||
        elem = self.page.get_by_text(value)
 | 
			
		||||
        if elem.count():
 | 
			
		||||
            elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
    def action_click_element_containing_text_if_exists(self, selector=None, value=''):
 | 
			
		||||
        logger.debug("Clicking element containing text if exists")
 | 
			
		||||
        if not value or not len(value.strip()):
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def click_if_exists_operation():
 | 
			
		||||
            elem = self.page.get_by_text(value)
 | 
			
		||||
            logger.debug(f"Clicking element containing text - {elem.count()} elements found")
 | 
			
		||||
            if elem.count():
 | 
			
		||||
                elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
 | 
			
		||||
        elem = self.page.get_by_text(value)
 | 
			
		||||
        logger.debug(f"Clicking element containing text - {elem.count()} elements found")
 | 
			
		||||
        if elem.count():
 | 
			
		||||
            elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
 | 
			
		||||
                
 | 
			
		||||
        self.safe_page_operation(click_if_exists_operation)
 | 
			
		||||
 | 
			
		||||
    def action_enter_text_in_field(self, selector, value):
 | 
			
		||||
        if not selector or not len(selector.strip()):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        def fill_operation():
 | 
			
		||||
            self.page.fill(selector, value, timeout=self.action_timeout)
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(fill_operation)
 | 
			
		||||
        self.page.fill(selector, value, timeout=self.action_timeout)
 | 
			
		||||
 | 
			
		||||
    def action_execute_js(self, selector, value):
 | 
			
		||||
        if not value:
 | 
			
		||||
            return None
 | 
			
		||||
            
 | 
			
		||||
        def evaluate_operation():
 | 
			
		||||
            return self.page.evaluate(value)
 | 
			
		||||
            
 | 
			
		||||
        return self.safe_page_operation(evaluate_operation)
 | 
			
		||||
        return self.page.evaluate(value)
 | 
			
		||||
 | 
			
		||||
    def action_click_element(self, selector, value):
 | 
			
		||||
        logger.debug("Clicking element")
 | 
			
		||||
        if not selector or not len(selector.strip()):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        def click_operation():
 | 
			
		||||
            self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(click_operation)
 | 
			
		||||
        self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
 | 
			
		||||
 | 
			
		||||
    def action_click_element_if_exists(self, selector, value):
 | 
			
		||||
        import playwright._impl._errors as _api_types
 | 
			
		||||
@@ -201,16 +158,14 @@ class steppable_browser_interface():
 | 
			
		||||
        if not selector or not len(selector.strip()):
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def click_if_exists_operation():
 | 
			
		||||
            try:
 | 
			
		||||
                self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
 | 
			
		||||
            except _api_types.TimeoutError:
 | 
			
		||||
                return
 | 
			
		||||
            except _api_types.Error:
 | 
			
		||||
                # Element was there, but page redrew and now its long long gone
 | 
			
		||||
                return
 | 
			
		||||
        try:
 | 
			
		||||
            self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
 | 
			
		||||
        except _api_types.TimeoutError:
 | 
			
		||||
            return
 | 
			
		||||
        except _api_types.Error:
 | 
			
		||||
            # Element was there, but page redrew and now its long long gone
 | 
			
		||||
            return
 | 
			
		||||
                
 | 
			
		||||
        self.safe_page_operation(click_if_exists_operation)
 | 
			
		||||
 | 
			
		||||
    def action_click_x_y(self, selector, value):
 | 
			
		||||
        if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
 | 
			
		||||
@@ -222,10 +177,8 @@ class steppable_browser_interface():
 | 
			
		||||
            x = int(float(x.strip()))
 | 
			
		||||
            y = int(float(y.strip()))
 | 
			
		||||
            
 | 
			
		||||
            def click_xy_operation():
 | 
			
		||||
                self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
 | 
			
		||||
            self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
 | 
			
		||||
                
 | 
			
		||||
            self.safe_page_operation(click_xy_operation)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            logger.error(f"Error parsing x,y coordinates: {str(e)}")
 | 
			
		||||
 | 
			
		||||
@@ -233,27 +186,17 @@ class steppable_browser_interface():
 | 
			
		||||
        if not selector or not len(selector.strip()):
 | 
			
		||||
            return
 | 
			
		||||
 | 
			
		||||
        def select_operation():
 | 
			
		||||
            self.page.select_option(selector, label=value, timeout=self.action_timeout)
 | 
			
		||||
 | 
			
		||||
        self.safe_page_operation(select_operation)
 | 
			
		||||
        self.page.select_option(selector, label=value, timeout=self.action_timeout)
 | 
			
		||||
 | 
			
		||||
    def action_scroll_down(self, selector, value):
 | 
			
		||||
        def scroll_operation():
 | 
			
		||||
            # Some sites this doesnt work on for some reason
 | 
			
		||||
            self.page.mouse.wheel(0, 600)
 | 
			
		||||
            self.page.wait_for_timeout(1000)
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(scroll_operation)
 | 
			
		||||
        # Some sites this doesnt work on for some reason
 | 
			
		||||
        self.page.mouse.wheel(0, 600)
 | 
			
		||||
        self.page.wait_for_timeout(1000)
 | 
			
		||||
 | 
			
		||||
    def action_wait_for_seconds(self, selector, value):
 | 
			
		||||
        try:
 | 
			
		||||
            seconds = float(value.strip()) if value else 1.0
 | 
			
		||||
            
 | 
			
		||||
            def wait_operation():
 | 
			
		||||
                self.page.wait_for_timeout(seconds * 1000)
 | 
			
		||||
                
 | 
			
		||||
            self.safe_page_operation(wait_operation)
 | 
			
		||||
            self.page.wait_for_timeout(seconds * 1000)
 | 
			
		||||
        except (ValueError, TypeError) as e:
 | 
			
		||||
            logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
 | 
			
		||||
 | 
			
		||||
@@ -263,14 +206,11 @@ class steppable_browser_interface():
 | 
			
		||||
            
 | 
			
		||||
        import json
 | 
			
		||||
        v = json.dumps(value)
 | 
			
		||||
        
 | 
			
		||||
        def wait_for_text_operation():
 | 
			
		||||
            self.page.wait_for_function(
 | 
			
		||||
                f'document.querySelector("body").innerText.includes({v});', 
 | 
			
		||||
                timeout=30000
 | 
			
		||||
            )
 | 
			
		||||
        self.page.wait_for_function(
 | 
			
		||||
            f'document.querySelector("body").innerText.includes({v});',
 | 
			
		||||
            timeout=30000
 | 
			
		||||
        )
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(wait_for_text_operation)
 | 
			
		||||
 | 
			
		||||
    def action_wait_for_text_in_element(self, selector, value):
 | 
			
		||||
        if not selector or not value:
 | 
			
		||||
@@ -280,82 +220,60 @@ class steppable_browser_interface():
 | 
			
		||||
        s = json.dumps(selector)
 | 
			
		||||
        v = json.dumps(value)
 | 
			
		||||
        
 | 
			
		||||
        def wait_for_text_in_element_operation():
 | 
			
		||||
            self.page.wait_for_function(
 | 
			
		||||
                f'document.querySelector({s}).innerText.includes({v});', 
 | 
			
		||||
                timeout=30000
 | 
			
		||||
            )
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(wait_for_text_in_element_operation)
 | 
			
		||||
        self.page.wait_for_function(
 | 
			
		||||
            f'document.querySelector({s}).innerText.includes({v});',
 | 
			
		||||
            timeout=30000
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
    # @todo - in the future make some popout interface to capture what needs to be set
 | 
			
		||||
    # https://playwright.dev/python/docs/api/class-keyboard
 | 
			
		||||
    def action_press_enter(self, selector, value):
 | 
			
		||||
        def press_operation():
 | 
			
		||||
            self.page.keyboard.press("Enter", delay=randint(200, 500))
 | 
			
		||||
        self.page.keyboard.press("Enter", delay=randint(200, 500))
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(press_operation)
 | 
			
		||||
 | 
			
		||||
    def action_press_page_up(self, selector, value):
 | 
			
		||||
        def press_operation():
 | 
			
		||||
            self.page.keyboard.press("PageUp", delay=randint(200, 500))
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(press_operation)
 | 
			
		||||
        self.page.keyboard.press("PageUp", delay=randint(200, 500))
 | 
			
		||||
 | 
			
		||||
    def action_press_page_down(self, selector, value):
 | 
			
		||||
        def press_operation():
 | 
			
		||||
            self.page.keyboard.press("PageDown", delay=randint(200, 500))
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(press_operation)
 | 
			
		||||
        self.page.keyboard.press("PageDown", delay=randint(200, 500))
 | 
			
		||||
 | 
			
		||||
    def action_check_checkbox(self, selector, value):
 | 
			
		||||
        if not selector:
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def check_operation():
 | 
			
		||||
            self.page.locator(selector).check(timeout=self.action_timeout)
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(check_operation)
 | 
			
		||||
 | 
			
		||||
        self.page.locator(selector).check(timeout=self.action_timeout)
 | 
			
		||||
 | 
			
		||||
    def action_uncheck_checkbox(self, selector, value):
 | 
			
		||||
        if not selector:
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def uncheck_operation():
 | 
			
		||||
            self.page.locator(selector).uncheck(timeout=self.action_timeout)
 | 
			
		||||
        self.page.locator(selector).uncheck(timeout=self.action_timeout)
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(uncheck_operation)
 | 
			
		||||
 | 
			
		||||
    def action_remove_elements(self, selector, value):
 | 
			
		||||
        """Removes all elements matching the given selector from the DOM."""
 | 
			
		||||
        if not selector:
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def remove_operation():
 | 
			
		||||
            self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(remove_operation)
 | 
			
		||||
        self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
 | 
			
		||||
 | 
			
		||||
    def action_make_all_child_elements_visible(self, selector, value):
 | 
			
		||||
        """Recursively makes all child elements inside the given selector fully visible."""
 | 
			
		||||
        if not selector:
 | 
			
		||||
            return
 | 
			
		||||
            
 | 
			
		||||
        def make_visible_operation():
 | 
			
		||||
            self.page.locator(selector).locator("*").evaluate_all("""
 | 
			
		||||
                els => els.forEach(el => {
 | 
			
		||||
                    el.style.display = 'block';   // Forces it to be displayed
 | 
			
		||||
                    el.style.visibility = 'visible';   // Ensures it's not hidden
 | 
			
		||||
                    el.style.opacity = '1';   // Fully opaque
 | 
			
		||||
                    el.style.position = 'relative';   // Avoids 'absolute' hiding
 | 
			
		||||
                    el.style.height = 'auto';   // Expands collapsed elements
 | 
			
		||||
                    el.style.width = 'auto';   // Ensures full visibility
 | 
			
		||||
                    el.removeAttribute('hidden');   // Removes hidden attribute
 | 
			
		||||
                    el.classList.remove('hidden', 'd-none');  // Removes common CSS hidden classes
 | 
			
		||||
                })
 | 
			
		||||
            """)
 | 
			
		||||
            
 | 
			
		||||
        self.safe_page_operation(make_visible_operation)
 | 
			
		||||
        self.page.locator(selector).locator("*").evaluate_all("""
 | 
			
		||||
            els => els.forEach(el => {
 | 
			
		||||
                el.style.display = 'block';   // Forces it to be displayed
 | 
			
		||||
                el.style.visibility = 'visible';   // Ensures it's not hidden
 | 
			
		||||
                el.style.opacity = '1';   // Fully opaque
 | 
			
		||||
                el.style.position = 'relative';   // Avoids 'absolute' hiding
 | 
			
		||||
                el.style.height = 'auto';   // Expands collapsed elements
 | 
			
		||||
                el.style.width = 'auto';   // Ensures full visibility
 | 
			
		||||
                el.removeAttribute('hidden');   // Removes hidden attribute
 | 
			
		||||
                el.classList.remove('hidden', 'd-none');  // Removes common CSS hidden classes
 | 
			
		||||
            })
 | 
			
		||||
        """)
 | 
			
		||||
 | 
			
		||||
# Responsible for maintaining a live 'context' with the chrome CDP
 | 
			
		||||
# @todo - how long do contexts live for anyway?
 | 
			
		||||
 
 | 
			
		||||
@@ -194,7 +194,6 @@ class fetcher(Fetcher):
 | 
			
		||||
            browsersteps_interface.page = self.page
 | 
			
		||||
 | 
			
		||||
            response = browsersteps_interface.action_goto_url(value=url)
 | 
			
		||||
            self.headers = response.all_headers()
 | 
			
		||||
 | 
			
		||||
            if response is None:
 | 
			
		||||
                context.close()
 | 
			
		||||
@@ -202,6 +201,8 @@ class fetcher(Fetcher):
 | 
			
		||||
                logger.debug("Content Fetcher > Response object from the browser communication was none")
 | 
			
		||||
                raise EmptyReply(url=url, status_code=None)
 | 
			
		||||
 | 
			
		||||
            self.headers = response.all_headers()
 | 
			
		||||
 | 
			
		||||
            try:
 | 
			
		||||
                if self.webdriver_js_execute_code is not None and len(self.webdriver_js_execute_code):
 | 
			
		||||
                    browsersteps_interface.action_execute_js(value=self.webdriver_js_execute_code, selector=None)
 | 
			
		||||
 
 | 
			
		||||
@@ -28,6 +28,7 @@ class fetcher(Fetcher):
 | 
			
		||||
 | 
			
		||||
        import chardet
 | 
			
		||||
        import requests
 | 
			
		||||
        from requests.exceptions import ProxyError, ConnectionError, RequestException
 | 
			
		||||
 | 
			
		||||
        if self.browser_steps_get_valid_steps():
 | 
			
		||||
            raise BrowserStepsInUnsupportedFetcher(url=url)
 | 
			
		||||
@@ -52,14 +53,19 @@ class fetcher(Fetcher):
 | 
			
		||||
        if strtobool(os.getenv('ALLOW_FILE_URI', 'false')) and url.startswith('file://'):
 | 
			
		||||
            from requests_file import FileAdapter
 | 
			
		||||
            session.mount('file://', FileAdapter())
 | 
			
		||||
 | 
			
		||||
        r = session.request(method=request_method,
 | 
			
		||||
                            data=request_body.encode('utf-8') if type(request_body) is str else request_body,
 | 
			
		||||
                            url=url,
 | 
			
		||||
                            headers=request_headers,
 | 
			
		||||
                            timeout=timeout,
 | 
			
		||||
                            proxies=proxies,
 | 
			
		||||
                            verify=False)
 | 
			
		||||
        try:
 | 
			
		||||
            r = session.request(method=request_method,
 | 
			
		||||
                                data=request_body.encode('utf-8') if type(request_body) is str else request_body,
 | 
			
		||||
                                url=url,
 | 
			
		||||
                                headers=request_headers,
 | 
			
		||||
                                timeout=timeout,
 | 
			
		||||
                                proxies=proxies,
 | 
			
		||||
                                verify=False)
 | 
			
		||||
        except Exception as e:
 | 
			
		||||
            msg = str(e)
 | 
			
		||||
            if proxies and 'SOCKSHTTPSConnectionPool' in msg:
 | 
			
		||||
                msg = f"Proxy connection failed? {msg}"
 | 
			
		||||
            raise Exception(msg) from e
 | 
			
		||||
 | 
			
		||||
        # If the response did not tell us what encoding format to expect, Then use chardet to override what `requests` thinks.
 | 
			
		||||
        # For example - some sites don't tell us it's utf-8, but return utf-8 content
 | 
			
		||||
 
 | 
			
		||||
@@ -76,8 +76,7 @@ class fetcher(Fetcher):
 | 
			
		||||
        for opt in CHROME_OPTIONS:
 | 
			
		||||
            options.add_argument(opt)
 | 
			
		||||
 | 
			
		||||
        if self.proxy:
 | 
			
		||||
            options.proxy = self.proxy
 | 
			
		||||
        options.add_argument(f"--proxy-server={self.proxy}")
 | 
			
		||||
 | 
			
		||||
        self.driver = webdriver.Remote(
 | 
			
		||||
            command_executor=self.browser_connection_url,
 | 
			
		||||
 
 | 
			
		||||
@@ -82,3 +82,26 @@ done
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
docker kill squid-one squid-two squid-custom
 | 
			
		||||
 | 
			
		||||
# Test that the UI is returning the correct error message when a proxy is not available
 | 
			
		||||
 | 
			
		||||
# Requests
 | 
			
		||||
docker run --network changedet-network \
 | 
			
		||||
  test-changedetectionio \
 | 
			
		||||
  bash -c 'cd changedetectionio && pytest tests/proxy_list/test_proxy_noconnect.py'
 | 
			
		||||
 | 
			
		||||
# Playwright
 | 
			
		||||
docker run --network changedet-network \
 | 
			
		||||
  test-changedetectionio \
 | 
			
		||||
  bash -c 'cd changedetectionio && PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
 | 
			
		||||
 | 
			
		||||
# Puppeteer fast
 | 
			
		||||
docker run --network changedet-network \
 | 
			
		||||
  test-changedetectionio \
 | 
			
		||||
  bash -c 'cd changedetectionio && FAST_PUPPETEER_CHROME_FETCHER=1 PLAYWRIGHT_DRIVER_URL=ws://sockpuppetbrowser:3000 pytest tests/proxy_list/test_proxy_noconnect.py'
 | 
			
		||||
 | 
			
		||||
# Selenium - todo - fix proxies
 | 
			
		||||
docker run --network changedet-network \
 | 
			
		||||
  -e "WEBDRIVER_URL=http://selenium:4444/wd/hub" \
 | 
			
		||||
  test-changedetectionio \
 | 
			
		||||
  bash -c 'cd changedetectionio && pytest tests/proxy_list/test_proxy_noconnect.py'
 | 
			
		||||
 
 | 
			
		||||
							
								
								
									
										53
									
								
								changedetectionio/tests/proxy_list/test_proxy_noconnect.py
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										53
									
								
								changedetectionio/tests/proxy_list/test_proxy_noconnect.py
									
									
									
									
									
										Normal file
									
								
							@@ -0,0 +1,53 @@
 | 
			
		||||
#!/usr/bin/env python3
 | 
			
		||||
 | 
			
		||||
from flask import url_for
 | 
			
		||||
from ..util import live_server_setup, wait_for_all_checks
 | 
			
		||||
import os
 | 
			
		||||
from ... import strtobool
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# Just to be sure the UI outputs the right error message on proxy connection failed
 | 
			
		||||
def test_proxy_noconnect_custom(client, live_server, measure_memory_usage):
 | 
			
		||||
    live_server_setup(live_server)
 | 
			
		||||
 | 
			
		||||
    # Goto settings, add our custom one
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("settings.settings_page"),
 | 
			
		||||
        data={
 | 
			
		||||
            "requests-time_between_check-minutes": 180,
 | 
			
		||||
            "application-ignore_whitespace": "y",
 | 
			
		||||
            "application-fetch_backend": 'html_webdriver' if os.getenv('PLAYWRIGHT_DRIVER_URL') else 'html_requests',
 | 
			
		||||
            "requests-extra_proxies-0-proxy_name": "custom-test-proxy",
 | 
			
		||||
            # test:awesome is set in tests/proxy_list/squid-passwords.txt
 | 
			
		||||
            "requests-extra_proxies-0-proxy_url": "http://THISPROXYDOESNTEXIST:3128",
 | 
			
		||||
        },
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"Settings updated." in res.data
 | 
			
		||||
 | 
			
		||||
    res = client.post(
 | 
			
		||||
        url_for("imports.import_page"),
 | 
			
		||||
        # Because a URL wont show in squid/proxy logs due it being SSLed
 | 
			
		||||
        # Use plain HTTP or a specific domain-name here
 | 
			
		||||
        data={"urls": "https://changedetection.io/CHANGELOG.txt"},
 | 
			
		||||
        follow_redirects=True
 | 
			
		||||
    )
 | 
			
		||||
 | 
			
		||||
    assert b"1 Imported" in res.data
 | 
			
		||||
    wait_for_all_checks(client)
 | 
			
		||||
 | 
			
		||||
    res = client.get(url_for("watchlist.index"))
 | 
			
		||||
    assert b'Page.goto: net::ERR_PROXY_CONNECTION_FAILED' in res.data
 | 
			
		||||
 | 
			
		||||
    # Requests
 | 
			
		||||
    check_string = b'Proxy connection failed?'
 | 
			
		||||
 | 
			
		||||
    if os.getenv('PLAYWRIGHT_DRIVER_URL') or strtobool(os.getenv('FAST_PUPPETEER_CHROME_FETCHER', 'False')):
 | 
			
		||||
        check_string = b'ERR_PROXY_CONNECTION_FAILED'
 | 
			
		||||
 | 
			
		||||
    if os.getenv("WEBDRIVER_URL"):
 | 
			
		||||
        check_string = b'ERR_PROXY_CONNECTION_FAILED'
 | 
			
		||||
 | 
			
		||||
    assert check_string in res.data
 | 
			
		||||
 | 
			
		||||
@@ -53,7 +53,8 @@ lxml >=4.8.0,<6,!=5.2.0,!=5.2.1
 | 
			
		||||
# XPath 2.0-3.1 support - 4.2.0 broke something?
 | 
			
		||||
elementpath==4.1.5
 | 
			
		||||
 | 
			
		||||
selenium~=4.31.0
 | 
			
		||||
selenium==4.31.0
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
# https://github.com/pallets/werkzeug/issues/2985
 | 
			
		||||
# Maybe related to pytest?
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user