Compare commits

...

6 Commits

Author SHA1 Message Date
dgtlmoon
eb4cd35317 bump test
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-04-25 18:45:21 +02:00
dgtlmoon
dc75043562 Improve test coverage for source: 2025-04-22 18:55:24 +02:00
dgtlmoon
38fffda890 Global ignore check - add regex 2025-04-22 18:42:11 +02:00
dgtlmoon
af568d064c Plugins for conditions (and include Similarity / Levenshtein, wordcount conditions) Re #3108
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
2025-04-22 18:19:56 +02:00
dgtlmoon
a75f57de43 Browser Steps - <Select> by Option Text - #1224, #1228 (#3138) 2025-04-22 14:33:35 +02:00
dgtlmoon
72a1c3dda1 Browser Steps - error reporting and session shutdown improvements (#3137) 2025-04-22 12:18:51 +02:00
14 changed files with 853 additions and 128 deletions

View File

@@ -0,0 +1,98 @@
# Creating Plugins for changedetection.io
This document describes how to create plugins for changedetection.io. Plugins can be used to extend the functionality of the application in various ways.
## Plugin Types
### UI Stats Tab Plugins
These plugins can add content to the Stats tab in the Edit page. This is useful for adding custom statistics or visualizations about a watch.
#### Creating a UI Stats Tab Plugin
1. Create a Python file in a directory that will be loaded by the plugin system.
2. Use the `global_hookimpl` decorator to implement the `ui_edit_stats_extras` hook:
```python
import pluggy
from loguru import logger
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add custom content to the stats tab"""
# Calculate or retrieve your stats
my_stat = calculate_something(watch)
# Return HTML content as a string
html = f"""
<div class="my-plugin-stats">
<h4>My Plugin Statistics</h4>
<p>My statistic: {my_stat}</p>
</div>
"""
return html
```
3. The HTML you return will be included in the Stats tab.
## Plugin Loading
Plugins can be loaded from:
1. Built-in plugin directories in the codebase
2. External packages using setuptools entry points
To add a new plugin directory, modify the `plugin_dirs` dictionary in `pluggy_interface.py`.
## Example Plugin
Here's a simple example of a plugin that adds a word count statistic to the Stats tab:
```python
import pluggy
from loguru import logger
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def count_words_in_history(watch):
"""Count words in the latest snapshot"""
try:
if not watch.history.keys():
return 0
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
return len(latest_content.split())
except Exception as e:
logger.error(f"Error counting words: {str(e)}")
return 0
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count to the Stats tab"""
word_count = count_words_in_history(watch)
html = f"""
<div class="word-count-stats">
<h4>Content Analysis</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Word count (latest snapshot)</td>
<td>{word_count}</td>
</tr>
</tbody>
</table>
</div>
"""
return html
```
## Testing Your Plugin
1. Place your plugin in one of the directories scanned by the plugin system
2. Restart changedetection.io
3. Go to the Edit page of a watch and check the Stats tab to see your content

View File

@@ -53,14 +53,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
a = "?" if not '?' in base_url else '&' a = "?" if not '?' in base_url else '&'
base_url += a + f"timeout={keepalive_ms}" base_url += a + f"timeout={keepalive_ms}"
try:
browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url) browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url)
except Exception as e:
if 'ECONNREFUSED' in str(e):
return make_response('Unable to start the Playwright Browser session, is it running?', 401)
else:
# Other errors, bad URL syntax, bad reply etc
return make_response(str(e), 401)
proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid) proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
proxy = None proxy = None
@@ -109,7 +102,16 @@ def construct_blueprint(datastore: ChangeDetectionStore):
logger.debug("Starting connection with playwright") logger.debug("Starting connection with playwright")
logger.debug("browser_steps.py connecting") logger.debug("browser_steps.py connecting")
try:
browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid) browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid)
except Exception as e:
if 'ECONNREFUSED' in str(e):
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
else:
# Other errors, bad URL syntax, bad reply etc
return make_response(str(e), 401)
logger.debug("Starting connection with playwright - done") logger.debug("Starting connection with playwright - done")
return {'browsersteps_session_id': browsersteps_session_id} return {'browsersteps_session_id': browsersteps_session_id}

View File

@@ -1,6 +1,8 @@
import os import os
import time import time
import re import re
import sys
import traceback
from random import randint from random import randint
from loguru import logger from loguru import logger
@@ -35,6 +37,7 @@ browser_step_ui_config = {'Choose one': '0 0',
'Make all child elements visible': '1 0', 'Make all child elements visible': '1 0',
'Press Enter': '0 0', 'Press Enter': '0 0',
'Select by label': '1 1', 'Select by label': '1 1',
'<select> by option text': '1 1',
'Scroll down': '0 0', 'Scroll down': '0 0',
'Uncheck checkbox': '1 0', 'Uncheck checkbox': '1 0',
'Wait for seconds': '0 1', 'Wait for seconds': '0 1',
@@ -54,14 +57,34 @@ browser_step_ui_config = {'Choose one': '0 0',
class steppable_browser_interface(): class steppable_browser_interface():
page = None page = None
start_url = None start_url = None
action_timeout = 10 * 1000 action_timeout = 10 * 1000
def __init__(self, start_url): def __init__(self, start_url):
self.start_url = start_url self.start_url = start_url
def safe_page_operation(self, operation_fn, default_return=None):
"""Safely execute a page operation with error handling"""
if self.page is None:
logger.warning("Attempted operation on None page object")
return default_return
try:
return operation_fn()
except Exception as e:
logger.debug(f"Page operation failed: {str(e)}")
# Try to reclaim memory if possible
try:
self.page.request_gc()
except:
pass
return default_return
# Convert and perform "Click Button" for example # Convert and perform "Click Button" for example
def call_action(self, action_name, selector=None, optional_value=None): def call_action(self, action_name, selector=None, optional_value=None):
if self.page is None:
logger.warning("Cannot call action on None page object")
return
now = time.time() now = time.time()
call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower()) call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower())
if call_action_name == 'choose_one': if call_action_name == 'choose_one':
@@ -72,28 +95,46 @@ class steppable_browser_interface():
if selector and selector.startswith('/') and not selector.startswith('//'): if selector and selector.startswith('/') and not selector.startswith('//'):
selector = "xpath=" + selector selector = "xpath=" + selector
# Check if action handler exists
if not hasattr(self, "action_" + call_action_name):
logger.warning(f"Action handler for '{call_action_name}' not found")
return
action_handler = getattr(self, "action_" + call_action_name) action_handler = getattr(self, "action_" + call_action_name)
# Support for Jinja2 variables in the value and selector # Support for Jinja2 variables in the value and selector
if selector and ('{%' in selector or '{{' in selector): if selector and ('{%' in selector or '{{' in selector):
selector = jinja_render(template_str=selector) selector = jinja_render(template_str=selector)
if optional_value and ('{%' in optional_value or '{{' in optional_value): if optional_value and ('{%' in optional_value or '{{' in optional_value):
optional_value = jinja_render(template_str=optional_value) optional_value = jinja_render(template_str=optional_value)
try:
action_handler(selector, optional_value) action_handler(selector, optional_value)
# Safely wait for timeout
def wait_timeout():
self.page.wait_for_timeout(1.5 * 1000) self.page.wait_for_timeout(1.5 * 1000)
self.safe_page_operation(wait_timeout)
logger.debug(f"Call action done in {time.time()-now:.2f}s") logger.debug(f"Call action done in {time.time()-now:.2f}s")
except Exception as e:
logger.error(f"Error executing action '{call_action_name}': {str(e)}")
# Request garbage collection to free up resources after error
try:
self.page.request_gc()
except:
pass
def action_goto_url(self, selector=None, value=None): def action_goto_url(self, selector=None, value=None):
# self.page.set_viewport_size({"width": 1280, "height": 5000}) if not value:
logger.warning("No URL provided for goto_url action")
return None
now = time.time() now = time.time()
response = self.page.goto(value, timeout=0, wait_until='load')
# Should be the same as the puppeteer_fetch.js methods, means, load with no timeout set (skip timeout) def goto_operation():
#and also wait for seconds ? return self.page.goto(value, timeout=0, wait_until='load')
#await page.waitForTimeout(1000);
#await page.waitForTimeout(extra_wait_ms); response = self.safe_page_operation(goto_operation)
logger.debug(f"Time to goto URL {time.time()-now:.2f}s") logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
return response return response
@@ -103,104 +144,204 @@ class steppable_browser_interface():
def action_click_element_containing_text(self, selector=None, value=''): def action_click_element_containing_text(self, selector=None, value=''):
logger.debug("Clicking element containing text") logger.debug("Clicking element containing text")
if not len(value.strip()): if not value or not len(value.strip()):
return return
def click_operation():
elem = self.page.get_by_text(value) elem = self.page.get_by_text(value)
if elem.count(): if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
self.safe_page_operation(click_operation)
def action_click_element_containing_text_if_exists(self, selector=None, value=''): def action_click_element_containing_text_if_exists(self, selector=None, value=''):
logger.debug("Clicking element containing text if exists") logger.debug("Clicking element containing text if exists")
if not len(value.strip()): if not value or not len(value.strip()):
return return
def click_if_exists_operation():
elem = self.page.get_by_text(value) elem = self.page.get_by_text(value)
logger.debug(f"Clicking element containing text - {elem.count()} elements found") logger.debug(f"Clicking element containing text - {elem.count()} elements found")
if elem.count(): if elem.count():
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout) elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
else:
return self.safe_page_operation(click_if_exists_operation)
def action_enter_text_in_field(self, selector, value): def action_enter_text_in_field(self, selector, value):
if not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def fill_operation():
self.page.fill(selector, value, timeout=self.action_timeout) self.page.fill(selector, value, timeout=self.action_timeout)
self.safe_page_operation(fill_operation)
def action_execute_js(self, selector, value): def action_execute_js(self, selector, value):
response = self.page.evaluate(value) if not value:
return response return None
def evaluate_operation():
return self.page.evaluate(value)
return self.safe_page_operation(evaluate_operation)
def action_click_element(self, selector, value): def action_click_element(self, selector, value):
logger.debug("Clicking element") logger.debug("Clicking element")
if not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def click_operation():
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500)) self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
self.safe_page_operation(click_operation)
def action_click_element_if_exists(self, selector, value): def action_click_element_if_exists(self, selector, value):
import playwright._impl._errors as _api_types import playwright._impl._errors as _api_types
logger.debug("Clicking element if exists") logger.debug("Clicking element if exists")
if not len(selector.strip()): if not selector or not len(selector.strip()):
return return
def click_if_exists_operation():
try: try:
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500)) self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
except _api_types.TimeoutError as e: except _api_types.TimeoutError:
return return
except _api_types.Error as e: except _api_types.Error:
# Element was there, but page redrew and now its long long gone # Element was there, but page redrew and now its long long gone
return return
def action_click_x_y(self, selector, value): self.safe_page_operation(click_if_exists_operation)
if not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
raise Exception("'Click X,Y' step should be in the format of '100 , 90'")
def action_click_x_y(self, selector, value):
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
logger.warning("'Click X,Y' step should be in the format of '100 , 90'")
return
try:
x, y = value.strip().split(',') x, y = value.strip().split(',')
x = int(float(x.strip())) x = int(float(x.strip()))
y = int(float(y.strip())) y = int(float(y.strip()))
def click_xy_operation():
self.page.mouse.click(x=x, y=y, delay=randint(200, 500)) self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
self.safe_page_operation(click_xy_operation)
except Exception as e:
logger.error(f"Error parsing x,y coordinates: {str(e)}")
def action__select_by_option_text(self, selector, value):
if not selector or not len(selector.strip()):
return
def select_operation():
self.page.select_option(selector, label=value, timeout=self.action_timeout)
self.safe_page_operation(select_operation)
def action_scroll_down(self, selector, value): def action_scroll_down(self, selector, value):
def scroll_operation():
# Some sites this doesnt work on for some reason # Some sites this doesnt work on for some reason
self.page.mouse.wheel(0, 600) self.page.mouse.wheel(0, 600)
self.page.wait_for_timeout(1000) self.page.wait_for_timeout(1000)
self.safe_page_operation(scroll_operation)
def action_wait_for_seconds(self, selector, value): def action_wait_for_seconds(self, selector, value):
self.page.wait_for_timeout(float(value.strip()) * 1000) try:
seconds = float(value.strip()) if value else 1.0
def wait_operation():
self.page.wait_for_timeout(seconds * 1000)
self.safe_page_operation(wait_operation)
except (ValueError, TypeError) as e:
logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
def action_wait_for_text(self, selector, value): def action_wait_for_text(self, selector, value):
if not value:
return
import json import json
v = json.dumps(value) v = json.dumps(value)
self.page.wait_for_function(f'document.querySelector("body").innerText.includes({v});', timeout=30000)
def wait_for_text_operation():
self.page.wait_for_function(
f'document.querySelector("body").innerText.includes({v});',
timeout=30000
)
self.safe_page_operation(wait_for_text_operation)
def action_wait_for_text_in_element(self, selector, value): def action_wait_for_text_in_element(self, selector, value):
if not selector or not value:
return
import json import json
s = json.dumps(selector) s = json.dumps(selector)
v = json.dumps(value) v = json.dumps(value)
self.page.wait_for_function(f'document.querySelector({s}).innerText.includes({v});', timeout=30000)
def wait_for_text_in_element_operation():
self.page.wait_for_function(
f'document.querySelector({s}).innerText.includes({v});',
timeout=30000
)
self.safe_page_operation(wait_for_text_in_element_operation)
# @todo - in the future make some popout interface to capture what needs to be set # @todo - in the future make some popout interface to capture what needs to be set
# https://playwright.dev/python/docs/api/class-keyboard # https://playwright.dev/python/docs/api/class-keyboard
def action_press_enter(self, selector, value): def action_press_enter(self, selector, value):
def press_operation():
self.page.keyboard.press("Enter", delay=randint(200, 500)) self.page.keyboard.press("Enter", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_press_page_up(self, selector, value): def action_press_page_up(self, selector, value):
def press_operation():
self.page.keyboard.press("PageUp", delay=randint(200, 500)) self.page.keyboard.press("PageUp", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_press_page_down(self, selector, value): def action_press_page_down(self, selector, value):
def press_operation():
self.page.keyboard.press("PageDown", delay=randint(200, 500)) self.page.keyboard.press("PageDown", delay=randint(200, 500))
self.safe_page_operation(press_operation)
def action_check_checkbox(self, selector, value): def action_check_checkbox(self, selector, value):
if not selector:
return
def check_operation():
self.page.locator(selector).check(timeout=self.action_timeout) self.page.locator(selector).check(timeout=self.action_timeout)
self.safe_page_operation(check_operation)
def action_uncheck_checkbox(self, selector, value): def action_uncheck_checkbox(self, selector, value):
if not selector:
return
def uncheck_operation():
self.page.locator(selector).uncheck(timeout=self.action_timeout) self.page.locator(selector).uncheck(timeout=self.action_timeout)
self.safe_page_operation(uncheck_operation)
def action_remove_elements(self, selector, value): def action_remove_elements(self, selector, value):
"""Removes all elements matching the given selector from the DOM.""" """Removes all elements matching the given selector from the DOM."""
if not selector:
return
def remove_operation():
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())") self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
self.safe_page_operation(remove_operation)
def action_make_all_child_elements_visible(self, selector, value): def action_make_all_child_elements_visible(self, selector, value):
"""Recursively makes all child elements inside the given selector fully visible.""" """Recursively makes all child elements inside the given selector fully visible."""
if not selector:
return
def make_visible_operation():
self.page.locator(selector).locator("*").evaluate_all(""" self.page.locator(selector).locator("*").evaluate_all("""
els => els.forEach(el => { els => els.forEach(el => {
el.style.display = 'block'; // Forces it to be displayed el.style.display = 'block'; // Forces it to be displayed
@@ -214,6 +355,8 @@ class steppable_browser_interface():
}) })
""") """)
self.safe_page_operation(make_visible_operation)
# Responsible for maintaining a live 'context' with the chrome CDP # Responsible for maintaining a live 'context' with the chrome CDP
# @todo - how long do contexts live for anyway? # @todo - how long do contexts live for anyway?
class browsersteps_live_ui(steppable_browser_interface): class browsersteps_live_ui(steppable_browser_interface):
@@ -224,6 +367,8 @@ class browsersteps_live_ui(steppable_browser_interface):
# bump and kill this if idle after X sec # bump and kill this if idle after X sec
age_start = 0 age_start = 0
headers = {} headers = {}
# Track if resources are properly cleaned up
_is_cleaned_up = False
# use a special driver, maybe locally etc # use a special driver, maybe locally etc
command_executor = os.getenv( command_executor = os.getenv(
@@ -243,9 +388,14 @@ class browsersteps_live_ui(steppable_browser_interface):
self.age_start = time.time() self.age_start = time.time()
self.playwright_browser = playwright_browser self.playwright_browser = playwright_browser
self.start_url = start_url self.start_url = start_url
self._is_cleaned_up = False
if self.context is None: if self.context is None:
self.connect(proxy=proxy) self.connect(proxy=proxy)
def __del__(self):
# Ensure cleanup happens if object is garbage collected
self.cleanup()
# Connect and setup a new context # Connect and setup a new context
def connect(self, proxy=None): def connect(self, proxy=None):
# Should only get called once - test that # Should only get called once - test that
@@ -264,31 +414,74 @@ class browsersteps_live_ui(steppable_browser_interface):
user_agent=manage_user_agent(headers=self.headers), user_agent=manage_user_agent(headers=self.headers),
) )
self.page = self.context.new_page() self.page = self.context.new_page()
# self.page.set_default_navigation_timeout(keep_open) # self.page.set_default_navigation_timeout(keep_open)
self.page.set_default_timeout(keep_open) self.page.set_default_timeout(keep_open)
# @todo probably this doesnt work # Set event handlers
self.page.on( self.page.on("close", self.mark_as_closed)
"close",
self.mark_as_closed,
)
# Listen for all console events and handle errors # Listen for all console events and handle errors
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}")) self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
logger.debug(f"Time to browser setup {time.time()-now:.2f}s") logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
self.page.wait_for_timeout(1 * 1000) self.page.wait_for_timeout(1 * 1000)
def mark_as_closed(self): def mark_as_closed(self):
logger.debug("Page closed, cleaning up..") logger.debug("Page closed, cleaning up..")
self.cleanup()
def cleanup(self):
"""Properly clean up all resources to prevent memory leaks"""
if self._is_cleaned_up:
return
logger.debug("Cleaning up browser steps resources")
# Clean up page
if hasattr(self, 'page') and self.page is not None:
try:
# Force garbage collection before closing
self.page.request_gc()
except Exception as e:
logger.debug(f"Error during page garbage collection: {str(e)}")
try:
# Remove event listeners before closing
self.page.remove_listener("close", self.mark_as_closed)
except Exception as e:
logger.debug(f"Error removing event listeners: {str(e)}")
try:
self.page.close()
except Exception as e:
logger.debug(f"Error closing page: {str(e)}")
self.page = None
# Clean up context
if hasattr(self, 'context') and self.context is not None:
try:
self.context.close()
except Exception as e:
logger.debug(f"Error closing context: {str(e)}")
self.context = None
self._is_cleaned_up = True
logger.debug("Browser steps resources cleanup complete")
@property @property
def has_expired(self): def has_expired(self):
if not self.page: if not self.page or self._is_cleaned_up:
return True return True
# Check if session has expired based on age
max_age_seconds = int(os.getenv("BROWSER_STEPS_MAX_AGE_SECONDS", 60 * 10)) # Default 10 minutes
if (time.time() - self.age_start) > max_age_seconds:
logger.debug(f"Browser steps session expired after {max_age_seconds} seconds")
return True
return False
def get_current_state(self): def get_current_state(self):
"""Return the screenshot and interactive elements mapping, generally always called after action_()""" """Return the screenshot and interactive elements mapping, generally always called after action_()"""
@@ -297,20 +490,27 @@ class browsersteps_live_ui(steppable_browser_interface):
# because we for now only run browser steps in playwright mode (not puppeteer mode) # because we for now only run browser steps in playwright mode (not puppeteer mode)
from changedetectionio.content_fetchers.playwright import capture_full_page from changedetectionio.content_fetchers.playwright import capture_full_page
# Safety check - don't proceed if resources are cleaned up
if self._is_cleaned_up or self.page is None:
logger.warning("Attempted to get current state after cleanup")
return (None, None)
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text() xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
now = time.time() now = time.time()
self.page.wait_for_timeout(1 * 1000) self.page.wait_for_timeout(1 * 1000)
screenshot = capture_full_page(page=self.page) screenshot = None
xpath_data = None
try:
# Get screenshot first
screenshot = capture_full_page(page=self.page)
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s") logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
# Then get interactive elements
now = time.time() now = time.time()
self.page.evaluate("var include_filters=''") self.page.evaluate("var include_filters=''")
# Go find the interactive elements
# @todo in the future, something smarter that can scan for elements with .click/focus etc event handlers?
self.page.request_gc() self.page.request_gc()
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span' scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
@@ -322,11 +522,23 @@ class browsersteps_live_ui(steppable_browser_interface):
})) }))
self.page.request_gc() self.page.request_gc()
# So the JS will find the smallest one first # Sort elements by size
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True) xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s") logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s")
# playwright._impl._api_types.Error: Browser closed. except Exception as e:
# @todo show some countdown timer? logger.error(f"Error getting current state: {str(e)}")
# Attempt recovery - force garbage collection
try:
self.page.request_gc()
except:
pass
# Request garbage collection one final time
try:
self.page.request_gc()
except:
pass
return (screenshot, xpath_data) return (screenshot, xpath_data)

View File

@@ -233,6 +233,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Only works reliably with Playwright # Only works reliably with Playwright
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
template_args = { template_args = {
'available_processors': processors.available_processors(), 'available_processors': processors.available_processors(),
'available_timezones': sorted(available_timezones()), 'available_timezones': sorted(available_timezones()),
@@ -250,6 +253,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'settings_application': datastore.data['settings']['application'], 'settings_application': datastore.data['settings']['application'],
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'), 'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'), 'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid), 'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
'timezone_default_config': datastore.data['settings']['application'].get('timezone'), 'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
'using_global_webdriver_wait': not default['webdriver_delay'], 'using_global_webdriver_wait': not default['webdriver_delay'],

View File

@@ -102,12 +102,31 @@ def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_dat
if complete_rules: if complete_rules:
# Give all plugins a chance to update the data dict again (that we will test the conditions against) # Give all plugins a chance to update the data dict again (that we will test the conditions against)
for plugin in plugin_manager.get_plugins(): for plugin in plugin_manager.get_plugins():
new_execute_data = plugin.add_data(current_watch_uuid=current_watch_uuid, try:
application_datastruct=application_datastruct, import concurrent.futures
ephemeral_data=ephemeral_data) import time
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
plugin.add_data,
current_watch_uuid=current_watch_uuid,
application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data
)
# Set a timeout of 10 seconds
try:
new_execute_data = future.result(timeout=10)
if new_execute_data and isinstance(new_execute_data, dict): if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data) EXECUTE_DATA.update(new_execute_data)
except concurrent.futures.TimeoutError:
# The plugin took too long, abort processing for this watch
raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.")
except Exception as e:
# Log the error but continue with the next plugin
import logging
logging.error(f"Error executing plugin {plugin.__class__.__name__}: {str(e)}")
continue
# Create the ruleset # Create the ruleset
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules) ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
@@ -132,3 +151,18 @@ for plugin in plugin_manager.get_plugins():
if isinstance(new_field_choices, list): if isinstance(new_field_choices, list):
field_choices.extend(new_field_choices) field_choices.extend(new_field_choices)
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
for plugin in plugin_manager.get_plugins():
try:
content = plugin.ui_edit_stats_extras(watch=watch)
if content:
extras_content.append(content)
except Exception as e:
# Skip plugins that don't implement the hook or have errors
pass
return "\n".join(extras_content) if extras_content else ""

View File

@@ -1,5 +1,8 @@
import pluggy import pluggy
from . import default_plugin # Import the default plugin import os
import importlib
import sys
from . import default_plugin
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager # ✅ Ensure that the namespace in HookspecMarker matches PluginManager
PLUGIN_NAMESPACE = "changedetectionio_conditions" PLUGIN_NAMESPACE = "changedetectionio_conditions"
@@ -31,6 +34,11 @@ class ConditionsSpec:
"""Add to the datadict""" """Add to the datadict"""
pass pass
@hookspec
def ui_edit_stats_extras(watch):
"""Return HTML content to add to the stats tab in the edit view"""
pass
# ✅ Set up Pluggy Plugin Manager # ✅ Set up Pluggy Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE) plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
@@ -40,5 +48,27 @@ plugin_manager.add_hookspecs(ConditionsSpec)
# ✅ Register built-in plugins manually # ✅ Register built-in plugins manually
plugin_manager.register(default_plugin, "default_plugin") plugin_manager.register(default_plugin, "default_plugin")
# ✅ Load plugins from the plugins directory
def load_plugins_from_directory():
plugins_dir = os.path.join(os.path.dirname(__file__), 'plugins')
if not os.path.exists(plugins_dir):
return
# Get all Python files (excluding __init__.py)
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove .py extension
module_path = f"changedetectionio.conditions.plugins.{module_name}"
try:
module = importlib.import_module(module_path)
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
print(f"Error loading plugin {module_name}: {e}")
# Load plugins from the plugins directory
load_plugins_from_directory()
# ✅ Discover installed plugins from external packages (if any) # ✅ Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE) plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)

View File

@@ -0,0 +1 @@
# Import plugins package to make them discoverable

View File

@@ -0,0 +1,102 @@
import pluggy
from loguru import logger
# Support both plugin systems
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def levenshtein_ratio_recent_history(watch, incoming_text=None):
try:
from Levenshtein import ratio, distance
k = list(watch.history.keys())
if len(k) >= 2:
# When called from ui_edit_stats_extras, we don't have incoming_text
if incoming_text is None:
a = watch.get_history_snapshot(timestamp=k[-1]) # Latest snapshot
b = watch.get_history_snapshot(timestamp=k[-2]) # Previous snapshot
else:
a = watch.get_history_snapshot(timestamp=k[-2]) # Second newest, incoming_text will be "newest"
b = incoming_text
distance_value = distance(a, b)
ratio_value = ratio(a, b)
return {
'distance': distance_value,
'ratio': ratio_value,
'percent_similar': round(ratio_value * 100, 2)
}
except Exception as e:
logger.warning(f"Unable to calc similarity: {str(e)}")
return ''
@conditions_hookimpl
def register_operators():
pass
@conditions_hookimpl
def register_operator_choices():
pass
@conditions_hookimpl
def register_field_choices():
return [
("levenshtein_ratio", "Levenshtein - Text similarity ratio"),
("levenshtein_distance", "Levenshtein - Text change distance"),
]
@conditions_hookimpl
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
res = {}
watch = application_datastruct['watching'].get(current_watch_uuid)
# ephemeral_data['text'] will be the current text after filters, they may have edited filters but not saved them yet etc
if watch and 'text' in ephemeral_data:
lev_data = levenshtein_ratio_recent_history(watch, ephemeral_data['text'])
if isinstance(lev_data, dict):
res['levenshtein_ratio'] = lev_data.get('ratio', 0)
res['levenshtein_similarity'] = lev_data.get('percent_similar', 0)
res['levenshtein_distance'] = lev_data.get('distance', 0)
return res
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add Levenshtein stats to the UI using the global plugin system"""
"""Generate the HTML for Levenshtein stats - shared by both plugin systems"""
if len(watch.history.keys()) < 2:
return "<p>Not enough history to calculate Levenshtein metrics</p>"
try:
lev_data = levenshtein_ratio_recent_history(watch)
if not lev_data or not isinstance(lev_data, dict):
return "<p>Unable to calculate Levenshtein metrics</p>"
html = f"""
<div class="levenshtein-stats">
<h4>Levenshtein Text Similarity Details</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Raw distance (edits needed)</td>
<td>{lev_data['distance']}</td>
</tr>
<tr>
<td>Similarity ratio</td>
<td>{lev_data['ratio']:.4f}</td>
</tr>
<tr>
<td>Percent similar</td>
<td>{lev_data['percent_similar']}%</td>
</tr>
</tbody>
</table>
<p style="font-size: 80%;">Levenshtein metrics compare the last two snapshots, measuring how many character edits are needed to transform one into the other.</p>
</div>
"""
return html
except Exception as e:
logger.error(f"Error generating Levenshtein UI extras: {str(e)}")
return "<p>Error calculating Levenshtein metrics</p>"

View File

@@ -0,0 +1,82 @@
import pluggy
from loguru import logger
# Support both plugin systems
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def count_words_in_history(watch, incoming_text=None):
"""Count words in snapshot text"""
try:
if incoming_text is not None:
# When called from add_data with incoming text
return len(incoming_text.split())
elif watch.history.keys():
# When called from UI extras to count latest snapshot
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
return len(latest_content.split())
return 0
except Exception as e:
logger.error(f"Error counting words: {str(e)}")
return 0
# Implement condition plugin hooks
@conditions_hookimpl
def register_operators():
# No custom operators needed
return {}
@conditions_hookimpl
def register_operator_choices():
# No custom operator choices needed
return []
@conditions_hookimpl
def register_field_choices():
# Add a field that will be available in conditions
return [
("word_count", "Word count of content"),
]
@conditions_hookimpl
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
"""Add word count data for conditions"""
result = {}
watch = application_datastruct['watching'].get(current_watch_uuid)
if watch and 'text' in ephemeral_data:
word_count = count_words_in_history(watch, ephemeral_data['text'])
result['word_count'] = word_count
return result
def _generate_stats_html(watch):
"""Generate the HTML content for the stats tab"""
word_count = count_words_in_history(watch)
html = f"""
<div class="word-count-stats">
<h4>Content Analysis</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Word count (latest snapshot)</td>
<td>{word_count}</td>
</tr>
</tbody>
</table>
<p style="font-size: 80%;">Word count is a simple measure of content length, calculated by splitting text on whitespace.</p>
</div>
"""
return html
@conditions_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count stats to the UI through conditions plugin system"""
return _generate_stats_html(watch)
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count stats to the UI using the global plugin system"""
return _generate_stats_html(watch)

View File

@@ -0,0 +1,82 @@
import pluggy
import os
import importlib
import sys
# Global plugin namespace for changedetection.io
PLUGIN_NAMESPACE = "changedetectionio"
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
class ChangeDetectionSpec:
"""Hook specifications for extending changedetection.io functionality."""
@hookspec
def ui_edit_stats_extras(watch):
"""Return HTML content to add to the stats tab in the edit view.
Args:
watch: The watch object being edited
Returns:
str: HTML content to be inserted in the stats tab
"""
pass
# Set up Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
# Register hookspecs
plugin_manager.add_hookspecs(ChangeDetectionSpec)
# Load plugins from subdirectories
def load_plugins_from_directories():
# Dictionary of directories to scan for plugins
plugin_dirs = {
'conditions': os.path.join(os.path.dirname(__file__), 'conditions', 'plugins'),
# Add more plugin directories here as needed
}
# Note: Removed the direct import of example_word_count_plugin as it's now in the conditions/plugins directory
for dir_name, dir_path in plugin_dirs.items():
if not os.path.exists(dir_path):
continue
# Get all Python files (excluding __init__.py)
for filename in os.listdir(dir_path):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove .py extension
module_path = f"changedetectionio.{dir_name}.plugins.{module_name}"
try:
module = importlib.import_module(module_path)
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
print(f"Error loading plugin {module_name}: {e}")
# Load plugins
load_plugins_from_directories()
# Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
# Helper function to collect UI stats extras from all plugins
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
# Get all plugins that implement the ui_edit_stats_extras hook
results = plugin_manager.hook.ui_edit_stats_extras(watch=watch)
# If we have results, add them to our content
if results:
for result in results:
if result: # Skip empty results
extras_content.append(result)
return "\n".join(extras_content) if extras_content else ""

View File

@@ -211,7 +211,14 @@ $(document).ready(function () {
$('input[type=text]', first_available).first().val(x['xpath']); $('input[type=text]', first_available).first().val(x['xpath']);
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus(); $('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
found_something = true; found_something = true;
} else { }
else if (x['tagName'] === 'select') {
$('select', first_available).val('<select> by option text').change();
$('input[type=text]', first_available).first().val(x['xpath']);
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
found_something = true;
}
else {
// There's no good way (that I know) to find if this // There's no good way (that I know) to find if this
// see https://stackoverflow.com/questions/446892/how-to-find-event-listeners-on-a-dom-node-in-javascript-or-in-debugging // see https://stackoverflow.com/questions/446892/how-to-find-event-listeners-on-a-dom-node-in-javascript-or-in-debugging
// https://codepen.io/azaslavsky/pen/DEJVWv // https://codepen.io/azaslavsky/pen/DEJVWv
@@ -251,6 +258,10 @@ $(document).ready(function () {
400: function () { 400: function () {
// More than likely the CSRF token was lost when the server restarted // More than likely the CSRF token was lost when the server restarted
alert("There was a problem processing the request, please reload the page."); alert("There was a problem processing the request, please reload the page.");
},
401: function (err) {
// This will be a custom error
alert(err.responseText);
} }
} }
}).done(function (data) { }).done(function (data) {

View File

@@ -450,6 +450,13 @@ Math: {{ 1 + 1 }}") }}
</tr> </tr>
</tbody> </tbody>
</table> </table>
{% if ui_edit_stats_extras %}
<div class="plugin-stats-extras"> <!-- from pluggy plugin -->
{{ ui_edit_stats_extras|safe }}
</div>
{% endif %}
{% if watch.history_n %} {% if watch.history_n %}
<p> <p>
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">Download latest HTML snapshot</a> <a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">Download latest HTML snapshot</a>

View File

@@ -45,11 +45,15 @@ def set_number_out_of_range_response(number="150"):
f.write(test_return_data) f.write(test_return_data)
def test_setup(client, live_server):
"""Test that both text and number conditions work together with AND logic."""
live_server_setup(live_server)
def test_conditions_with_text_and_number(client, live_server): def test_conditions_with_text_and_number(client, live_server):
"""Test that both text and number conditions work together with AND logic.""" """Test that both text and number conditions work together with AND logic."""
set_original_response("50") set_original_response("50")
live_server_setup(live_server) #live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
@@ -195,3 +199,40 @@ def test_condition_validate_rule_row(client, live_server):
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
)
# Assert the word count is counted correctly
assert b'<td>13</td>' in res.data

View File

@@ -32,13 +32,14 @@ def test_strip_text_func():
stripped_content = html_tools.strip_ignore_text(test_content, ignore) stripped_content = html_tools.strip_ignore_text(test_content, ignore)
assert stripped_content == "Some initial text\n\nWhich is across multiple lines\n\n\n\nSo let's see what happens." assert stripped_content == "Some initial text\n\nWhich is across multiple lines\n\n\n\nSo let's see what happens."
def set_original_ignore_response(): def set_original_ignore_response(ver_stamp="123"):
test_return_data = """<html> test_return_data = f"""<html>
<body> <body>
Some initial text<br> Some initial text<br>
<p>Which is across multiple lines</p> <p>Which is across multiple lines</p>
<br> <br>
So let's see what happens. <br> So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
</body> </body>
</html> </html>
@@ -48,13 +49,14 @@ def set_original_ignore_response():
f.write(test_return_data) f.write(test_return_data)
def set_modified_original_ignore_response(): def set_modified_original_ignore_response(ver_stamp="123"):
test_return_data = """<html> test_return_data = f"""<html>
<body> <body>
Some NEW nice initial text<br> Some NEW nice initial text<br>
<p>Which is across multiple lines</p> <p>Which is across multiple lines</p>
<br> <br>
So let's see what happens. <br> So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
<p>new ignore stuff</p> <p>new ignore stuff</p>
<p>blah</p> <p>blah</p>
</body> </body>
@@ -67,14 +69,15 @@ def set_modified_original_ignore_response():
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text # Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
def set_modified_ignore_response(): def set_modified_ignore_response(ver_stamp="123"):
test_return_data = """<html> test_return_data = f"""<html>
<body> <body>
Some initial text<br> Some initial text<br>
<p>Which is across multiple lines</p> <p>Which is across multiple lines</p>
<P>ZZZZz</P> <P>ZZZZz</P>
<br> <br>
So let's see what happens. <br> So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
</body> </body>
</html> </html>
@@ -165,9 +168,9 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
assert b'Deleted' in res.data assert b'Deleted' in res.data
# When adding some ignore text, it should not trigger a change, even if something else on that line changes # When adding some ignore text, it should not trigger a change, even if something else on that line changes
def test_check_global_ignore_text_functionality(client, live_server, measure_memory_usage): def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
#live_server_setup(live_server) ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ\r\n"+extra_ignore
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ"
set_original_ignore_response() set_original_ignore_response()
# Goto the settings page, add our ignore text # Goto the settings page, add our ignore text
@@ -186,6 +189,10 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
# Add our URL to the import page # Add our URL to the import page
test_url = url_for('test_endpoint', _external=True) test_url = url_for('test_endpoint', _external=True)
if as_source:
# Switch to source mode so we can test that too!
test_url = "source:"+test_url
res = client.post( res = client.post(
url_for("imports.import_page"), url_for("imports.import_page"),
data={"urls": test_url}, data={"urls": test_url},
@@ -203,12 +210,15 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
follow_redirects=True follow_redirects=True
) )
assert b"Updated watch." in res.data assert b"Updated watch." in res.data
wait_for_all_checks(client)
# Check it saved # Check it saved
res = client.get( res = client.get(
url_for("settings.settings_page"), url_for("settings.settings_page"),
) )
assert bytes(ignore_text.encode('utf-8')) in res.data
for i in ignore_text.splitlines():
assert bytes(i.encode('utf-8')) in res.data
# Trigger a check # Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
@@ -221,7 +231,8 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
# Make a change which includes the ignore text, it should be ignored and no 'change' triggered # Make a change which includes the ignore text, it should be ignored and no 'change' triggered
# It adds text with "ZZZZzzzz" and "ZZZZ" is in the ignore list # It adds text with "ZZZZzzzz" and "ZZZZ" is in the ignore list
set_modified_ignore_response() # And tweaks the ver_stamp which should be picked up by global regex ignore
set_modified_ignore_response(ver_stamp=time.time())
# Trigger a check # Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
@@ -243,3 +254,11 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data assert b'Deleted' in res.data
def test_check_global_ignore_text_functionality(client, live_server):
#live_server_setup(live_server)
_run_test_global_ignore(client, as_source=False)
def test_check_global_ignore_text_functionality_as_source(client, live_server):
#live_server_setup(live_server)
_run_test_global_ignore(client, as_source=True, extra_ignore='/\?v=\d/')