mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-10 11:36:55 +00:00
Compare commits
6 Commits
browserste
...
improved-g
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
eb4cd35317 | ||
|
|
dc75043562 | ||
|
|
38fffda890 | ||
|
|
af568d064c | ||
|
|
a75f57de43 | ||
|
|
72a1c3dda1 |
98
changedetectionio/PLUGIN_README.md
Normal file
98
changedetectionio/PLUGIN_README.md
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
# Creating Plugins for changedetection.io
|
||||||
|
|
||||||
|
This document describes how to create plugins for changedetection.io. Plugins can be used to extend the functionality of the application in various ways.
|
||||||
|
|
||||||
|
## Plugin Types
|
||||||
|
|
||||||
|
### UI Stats Tab Plugins
|
||||||
|
|
||||||
|
These plugins can add content to the Stats tab in the Edit page. This is useful for adding custom statistics or visualizations about a watch.
|
||||||
|
|
||||||
|
#### Creating a UI Stats Tab Plugin
|
||||||
|
|
||||||
|
1. Create a Python file in a directory that will be loaded by the plugin system.
|
||||||
|
|
||||||
|
2. Use the `global_hookimpl` decorator to implement the `ui_edit_stats_extras` hook:
|
||||||
|
|
||||||
|
```python
|
||||||
|
import pluggy
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
|
||||||
|
|
||||||
|
@global_hookimpl
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Add custom content to the stats tab"""
|
||||||
|
# Calculate or retrieve your stats
|
||||||
|
my_stat = calculate_something(watch)
|
||||||
|
|
||||||
|
# Return HTML content as a string
|
||||||
|
html = f"""
|
||||||
|
<div class="my-plugin-stats">
|
||||||
|
<h4>My Plugin Statistics</h4>
|
||||||
|
<p>My statistic: {my_stat}</p>
|
||||||
|
</div>
|
||||||
|
"""
|
||||||
|
return html
|
||||||
|
```
|
||||||
|
|
||||||
|
3. The HTML you return will be included in the Stats tab.
|
||||||
|
|
||||||
|
## Plugin Loading
|
||||||
|
|
||||||
|
Plugins can be loaded from:
|
||||||
|
|
||||||
|
1. Built-in plugin directories in the codebase
|
||||||
|
2. External packages using setuptools entry points
|
||||||
|
|
||||||
|
To add a new plugin directory, modify the `plugin_dirs` dictionary in `pluggy_interface.py`.
|
||||||
|
|
||||||
|
## Example Plugin
|
||||||
|
|
||||||
|
Here's a simple example of a plugin that adds a word count statistic to the Stats tab:
|
||||||
|
|
||||||
|
```python
|
||||||
|
import pluggy
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
|
||||||
|
|
||||||
|
def count_words_in_history(watch):
|
||||||
|
"""Count words in the latest snapshot"""
|
||||||
|
try:
|
||||||
|
if not watch.history.keys():
|
||||||
|
return 0
|
||||||
|
|
||||||
|
latest_key = list(watch.history.keys())[-1]
|
||||||
|
latest_content = watch.get_history_snapshot(latest_key)
|
||||||
|
return len(latest_content.split())
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error counting words: {str(e)}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
@global_hookimpl
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Add word count to the Stats tab"""
|
||||||
|
word_count = count_words_in_history(watch)
|
||||||
|
|
||||||
|
html = f"""
|
||||||
|
<div class="word-count-stats">
|
||||||
|
<h4>Content Analysis</h4>
|
||||||
|
<table class="pure-table">
|
||||||
|
<tbody>
|
||||||
|
<tr>
|
||||||
|
<td>Word count (latest snapshot)</td>
|
||||||
|
<td>{word_count}</td>
|
||||||
|
</tr>
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
</div>
|
||||||
|
"""
|
||||||
|
return html
|
||||||
|
```
|
||||||
|
|
||||||
|
## Testing Your Plugin
|
||||||
|
|
||||||
|
1. Place your plugin in one of the directories scanned by the plugin system
|
||||||
|
2. Restart changedetection.io
|
||||||
|
3. Go to the Edit page of a watch and check the Stats tab to see your content
|
||||||
@@ -53,14 +53,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
a = "?" if not '?' in base_url else '&'
|
a = "?" if not '?' in base_url else '&'
|
||||||
base_url += a + f"timeout={keepalive_ms}"
|
base_url += a + f"timeout={keepalive_ms}"
|
||||||
|
|
||||||
try:
|
|
||||||
browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url)
|
browsersteps_start_session['browser'] = io_interface_context.chromium.connect_over_cdp(base_url)
|
||||||
except Exception as e:
|
|
||||||
if 'ECONNREFUSED' in str(e):
|
|
||||||
return make_response('Unable to start the Playwright Browser session, is it running?', 401)
|
|
||||||
else:
|
|
||||||
# Other errors, bad URL syntax, bad reply etc
|
|
||||||
return make_response(str(e), 401)
|
|
||||||
|
|
||||||
proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
|
proxy_id = datastore.get_preferred_proxy_for_watch(uuid=watch_uuid)
|
||||||
proxy = None
|
proxy = None
|
||||||
@@ -109,7 +102,16 @@ def construct_blueprint(datastore: ChangeDetectionStore):
|
|||||||
|
|
||||||
logger.debug("Starting connection with playwright")
|
logger.debug("Starting connection with playwright")
|
||||||
logger.debug("browser_steps.py connecting")
|
logger.debug("browser_steps.py connecting")
|
||||||
|
|
||||||
|
try:
|
||||||
browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid)
|
browsersteps_sessions[browsersteps_session_id] = start_browsersteps_session(watch_uuid)
|
||||||
|
except Exception as e:
|
||||||
|
if 'ECONNREFUSED' in str(e):
|
||||||
|
return make_response('Unable to start the Playwright Browser session, is sockpuppetbrowser running? Network configuration is OK?', 401)
|
||||||
|
else:
|
||||||
|
# Other errors, bad URL syntax, bad reply etc
|
||||||
|
return make_response(str(e), 401)
|
||||||
|
|
||||||
logger.debug("Starting connection with playwright - done")
|
logger.debug("Starting connection with playwright - done")
|
||||||
return {'browsersteps_session_id': browsersteps_session_id}
|
return {'browsersteps_session_id': browsersteps_session_id}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,8 @@
|
|||||||
import os
|
import os
|
||||||
import time
|
import time
|
||||||
import re
|
import re
|
||||||
|
import sys
|
||||||
|
import traceback
|
||||||
from random import randint
|
from random import randint
|
||||||
from loguru import logger
|
from loguru import logger
|
||||||
|
|
||||||
@@ -35,6 +37,7 @@ browser_step_ui_config = {'Choose one': '0 0',
|
|||||||
'Make all child elements visible': '1 0',
|
'Make all child elements visible': '1 0',
|
||||||
'Press Enter': '0 0',
|
'Press Enter': '0 0',
|
||||||
'Select by label': '1 1',
|
'Select by label': '1 1',
|
||||||
|
'<select> by option text': '1 1',
|
||||||
'Scroll down': '0 0',
|
'Scroll down': '0 0',
|
||||||
'Uncheck checkbox': '1 0',
|
'Uncheck checkbox': '1 0',
|
||||||
'Wait for seconds': '0 1',
|
'Wait for seconds': '0 1',
|
||||||
@@ -54,14 +57,34 @@ browser_step_ui_config = {'Choose one': '0 0',
|
|||||||
class steppable_browser_interface():
|
class steppable_browser_interface():
|
||||||
page = None
|
page = None
|
||||||
start_url = None
|
start_url = None
|
||||||
|
|
||||||
action_timeout = 10 * 1000
|
action_timeout = 10 * 1000
|
||||||
|
|
||||||
def __init__(self, start_url):
|
def __init__(self, start_url):
|
||||||
self.start_url = start_url
|
self.start_url = start_url
|
||||||
|
|
||||||
|
def safe_page_operation(self, operation_fn, default_return=None):
|
||||||
|
"""Safely execute a page operation with error handling"""
|
||||||
|
if self.page is None:
|
||||||
|
logger.warning("Attempted operation on None page object")
|
||||||
|
return default_return
|
||||||
|
|
||||||
|
try:
|
||||||
|
return operation_fn()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Page operation failed: {str(e)}")
|
||||||
|
# Try to reclaim memory if possible
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
return default_return
|
||||||
|
|
||||||
# Convert and perform "Click Button" for example
|
# Convert and perform "Click Button" for example
|
||||||
def call_action(self, action_name, selector=None, optional_value=None):
|
def call_action(self, action_name, selector=None, optional_value=None):
|
||||||
|
if self.page is None:
|
||||||
|
logger.warning("Cannot call action on None page object")
|
||||||
|
return
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower())
|
call_action_name = re.sub('[^0-9a-zA-Z]+', '_', action_name.lower())
|
||||||
if call_action_name == 'choose_one':
|
if call_action_name == 'choose_one':
|
||||||
@@ -72,28 +95,46 @@ class steppable_browser_interface():
|
|||||||
if selector and selector.startswith('/') and not selector.startswith('//'):
|
if selector and selector.startswith('/') and not selector.startswith('//'):
|
||||||
selector = "xpath=" + selector
|
selector = "xpath=" + selector
|
||||||
|
|
||||||
|
# Check if action handler exists
|
||||||
|
if not hasattr(self, "action_" + call_action_name):
|
||||||
|
logger.warning(f"Action handler for '{call_action_name}' not found")
|
||||||
|
return
|
||||||
|
|
||||||
action_handler = getattr(self, "action_" + call_action_name)
|
action_handler = getattr(self, "action_" + call_action_name)
|
||||||
|
|
||||||
# Support for Jinja2 variables in the value and selector
|
# Support for Jinja2 variables in the value and selector
|
||||||
|
|
||||||
if selector and ('{%' in selector or '{{' in selector):
|
if selector and ('{%' in selector or '{{' in selector):
|
||||||
selector = jinja_render(template_str=selector)
|
selector = jinja_render(template_str=selector)
|
||||||
|
|
||||||
if optional_value and ('{%' in optional_value or '{{' in optional_value):
|
if optional_value and ('{%' in optional_value or '{{' in optional_value):
|
||||||
optional_value = jinja_render(template_str=optional_value)
|
optional_value = jinja_render(template_str=optional_value)
|
||||||
|
|
||||||
|
try:
|
||||||
action_handler(selector, optional_value)
|
action_handler(selector, optional_value)
|
||||||
|
# Safely wait for timeout
|
||||||
|
def wait_timeout():
|
||||||
self.page.wait_for_timeout(1.5 * 1000)
|
self.page.wait_for_timeout(1.5 * 1000)
|
||||||
|
self.safe_page_operation(wait_timeout)
|
||||||
logger.debug(f"Call action done in {time.time()-now:.2f}s")
|
logger.debug(f"Call action done in {time.time()-now:.2f}s")
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error executing action '{call_action_name}': {str(e)}")
|
||||||
|
# Request garbage collection to free up resources after error
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
def action_goto_url(self, selector=None, value=None):
|
def action_goto_url(self, selector=None, value=None):
|
||||||
# self.page.set_viewport_size({"width": 1280, "height": 5000})
|
if not value:
|
||||||
|
logger.warning("No URL provided for goto_url action")
|
||||||
|
return None
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
response = self.page.goto(value, timeout=0, wait_until='load')
|
|
||||||
# Should be the same as the puppeteer_fetch.js methods, means, load with no timeout set (skip timeout)
|
def goto_operation():
|
||||||
#and also wait for seconds ?
|
return self.page.goto(value, timeout=0, wait_until='load')
|
||||||
#await page.waitForTimeout(1000);
|
|
||||||
#await page.waitForTimeout(extra_wait_ms);
|
response = self.safe_page_operation(goto_operation)
|
||||||
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
|
logger.debug(f"Time to goto URL {time.time()-now:.2f}s")
|
||||||
return response
|
return response
|
||||||
|
|
||||||
@@ -103,104 +144,204 @@ class steppable_browser_interface():
|
|||||||
|
|
||||||
def action_click_element_containing_text(self, selector=None, value=''):
|
def action_click_element_containing_text(self, selector=None, value=''):
|
||||||
logger.debug("Clicking element containing text")
|
logger.debug("Clicking element containing text")
|
||||||
if not len(value.strip()):
|
if not value or not len(value.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_operation():
|
||||||
elem = self.page.get_by_text(value)
|
elem = self.page.get_by_text(value)
|
||||||
if elem.count():
|
if elem.count():
|
||||||
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(click_operation)
|
||||||
|
|
||||||
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
|
def action_click_element_containing_text_if_exists(self, selector=None, value=''):
|
||||||
logger.debug("Clicking element containing text if exists")
|
logger.debug("Clicking element containing text if exists")
|
||||||
if not len(value.strip()):
|
if not value or not len(value.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_if_exists_operation():
|
||||||
elem = self.page.get_by_text(value)
|
elem = self.page.get_by_text(value)
|
||||||
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
|
logger.debug(f"Clicking element containing text - {elem.count()} elements found")
|
||||||
if elem.count():
|
if elem.count():
|
||||||
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
elem.first.click(delay=randint(200, 500), timeout=self.action_timeout)
|
||||||
else:
|
|
||||||
return
|
self.safe_page_operation(click_if_exists_operation)
|
||||||
|
|
||||||
def action_enter_text_in_field(self, selector, value):
|
def action_enter_text_in_field(self, selector, value):
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def fill_operation():
|
||||||
self.page.fill(selector, value, timeout=self.action_timeout)
|
self.page.fill(selector, value, timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(fill_operation)
|
||||||
|
|
||||||
def action_execute_js(self, selector, value):
|
def action_execute_js(self, selector, value):
|
||||||
response = self.page.evaluate(value)
|
if not value:
|
||||||
return response
|
return None
|
||||||
|
|
||||||
|
def evaluate_operation():
|
||||||
|
return self.page.evaluate(value)
|
||||||
|
|
||||||
|
return self.safe_page_operation(evaluate_operation)
|
||||||
|
|
||||||
def action_click_element(self, selector, value):
|
def action_click_element(self, selector, value):
|
||||||
logger.debug("Clicking element")
|
logger.debug("Clicking element")
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_operation():
|
||||||
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
|
self.page.click(selector=selector, timeout=self.action_timeout + 20 * 1000, delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(click_operation)
|
||||||
|
|
||||||
def action_click_element_if_exists(self, selector, value):
|
def action_click_element_if_exists(self, selector, value):
|
||||||
import playwright._impl._errors as _api_types
|
import playwright._impl._errors as _api_types
|
||||||
logger.debug("Clicking element if exists")
|
logger.debug("Clicking element if exists")
|
||||||
if not len(selector.strip()):
|
if not selector or not len(selector.strip()):
|
||||||
return
|
return
|
||||||
|
|
||||||
|
def click_if_exists_operation():
|
||||||
try:
|
try:
|
||||||
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
|
self.page.click(selector, timeout=self.action_timeout, delay=randint(200, 500))
|
||||||
except _api_types.TimeoutError as e:
|
except _api_types.TimeoutError:
|
||||||
return
|
return
|
||||||
except _api_types.Error as e:
|
except _api_types.Error:
|
||||||
# Element was there, but page redrew and now its long long gone
|
# Element was there, but page redrew and now its long long gone
|
||||||
return
|
return
|
||||||
|
|
||||||
def action_click_x_y(self, selector, value):
|
self.safe_page_operation(click_if_exists_operation)
|
||||||
if not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
|
|
||||||
raise Exception("'Click X,Y' step should be in the format of '100 , 90'")
|
|
||||||
|
|
||||||
|
def action_click_x_y(self, selector, value):
|
||||||
|
if not value or not re.match(r'^\s?\d+\s?,\s?\d+\s?$', value):
|
||||||
|
logger.warning("'Click X,Y' step should be in the format of '100 , 90'")
|
||||||
|
return
|
||||||
|
|
||||||
|
try:
|
||||||
x, y = value.strip().split(',')
|
x, y = value.strip().split(',')
|
||||||
x = int(float(x.strip()))
|
x = int(float(x.strip()))
|
||||||
y = int(float(y.strip()))
|
y = int(float(y.strip()))
|
||||||
|
|
||||||
|
def click_xy_operation():
|
||||||
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
|
self.page.mouse.click(x=x, y=y, delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(click_xy_operation)
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error parsing x,y coordinates: {str(e)}")
|
||||||
|
|
||||||
|
def action__select_by_option_text(self, selector, value):
|
||||||
|
if not selector or not len(selector.strip()):
|
||||||
|
return
|
||||||
|
|
||||||
|
def select_operation():
|
||||||
|
self.page.select_option(selector, label=value, timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(select_operation)
|
||||||
|
|
||||||
def action_scroll_down(self, selector, value):
|
def action_scroll_down(self, selector, value):
|
||||||
|
def scroll_operation():
|
||||||
# Some sites this doesnt work on for some reason
|
# Some sites this doesnt work on for some reason
|
||||||
self.page.mouse.wheel(0, 600)
|
self.page.mouse.wheel(0, 600)
|
||||||
self.page.wait_for_timeout(1000)
|
self.page.wait_for_timeout(1000)
|
||||||
|
|
||||||
|
self.safe_page_operation(scroll_operation)
|
||||||
|
|
||||||
def action_wait_for_seconds(self, selector, value):
|
def action_wait_for_seconds(self, selector, value):
|
||||||
self.page.wait_for_timeout(float(value.strip()) * 1000)
|
try:
|
||||||
|
seconds = float(value.strip()) if value else 1.0
|
||||||
|
|
||||||
|
def wait_operation():
|
||||||
|
self.page.wait_for_timeout(seconds * 1000)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_operation)
|
||||||
|
except (ValueError, TypeError) as e:
|
||||||
|
logger.error(f"Invalid value for wait_for_seconds: {str(e)}")
|
||||||
|
|
||||||
def action_wait_for_text(self, selector, value):
|
def action_wait_for_text(self, selector, value):
|
||||||
|
if not value:
|
||||||
|
return
|
||||||
|
|
||||||
import json
|
import json
|
||||||
v = json.dumps(value)
|
v = json.dumps(value)
|
||||||
self.page.wait_for_function(f'document.querySelector("body").innerText.includes({v});', timeout=30000)
|
|
||||||
|
def wait_for_text_operation():
|
||||||
|
self.page.wait_for_function(
|
||||||
|
f'document.querySelector("body").innerText.includes({v});',
|
||||||
|
timeout=30000
|
||||||
|
)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_for_text_operation)
|
||||||
|
|
||||||
def action_wait_for_text_in_element(self, selector, value):
|
def action_wait_for_text_in_element(self, selector, value):
|
||||||
|
if not selector or not value:
|
||||||
|
return
|
||||||
|
|
||||||
import json
|
import json
|
||||||
s = json.dumps(selector)
|
s = json.dumps(selector)
|
||||||
v = json.dumps(value)
|
v = json.dumps(value)
|
||||||
self.page.wait_for_function(f'document.querySelector({s}).innerText.includes({v});', timeout=30000)
|
|
||||||
|
def wait_for_text_in_element_operation():
|
||||||
|
self.page.wait_for_function(
|
||||||
|
f'document.querySelector({s}).innerText.includes({v});',
|
||||||
|
timeout=30000
|
||||||
|
)
|
||||||
|
|
||||||
|
self.safe_page_operation(wait_for_text_in_element_operation)
|
||||||
|
|
||||||
# @todo - in the future make some popout interface to capture what needs to be set
|
# @todo - in the future make some popout interface to capture what needs to be set
|
||||||
# https://playwright.dev/python/docs/api/class-keyboard
|
# https://playwright.dev/python/docs/api/class-keyboard
|
||||||
def action_press_enter(self, selector, value):
|
def action_press_enter(self, selector, value):
|
||||||
|
def press_operation():
|
||||||
self.page.keyboard.press("Enter", delay=randint(200, 500))
|
self.page.keyboard.press("Enter", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_press_page_up(self, selector, value):
|
def action_press_page_up(self, selector, value):
|
||||||
|
def press_operation():
|
||||||
self.page.keyboard.press("PageUp", delay=randint(200, 500))
|
self.page.keyboard.press("PageUp", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_press_page_down(self, selector, value):
|
def action_press_page_down(self, selector, value):
|
||||||
|
def press_operation():
|
||||||
self.page.keyboard.press("PageDown", delay=randint(200, 500))
|
self.page.keyboard.press("PageDown", delay=randint(200, 500))
|
||||||
|
|
||||||
|
self.safe_page_operation(press_operation)
|
||||||
|
|
||||||
def action_check_checkbox(self, selector, value):
|
def action_check_checkbox(self, selector, value):
|
||||||
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def check_operation():
|
||||||
self.page.locator(selector).check(timeout=self.action_timeout)
|
self.page.locator(selector).check(timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(check_operation)
|
||||||
|
|
||||||
def action_uncheck_checkbox(self, selector, value):
|
def action_uncheck_checkbox(self, selector, value):
|
||||||
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def uncheck_operation():
|
||||||
self.page.locator(selector).uncheck(timeout=self.action_timeout)
|
self.page.locator(selector).uncheck(timeout=self.action_timeout)
|
||||||
|
|
||||||
|
self.safe_page_operation(uncheck_operation)
|
||||||
|
|
||||||
def action_remove_elements(self, selector, value):
|
def action_remove_elements(self, selector, value):
|
||||||
"""Removes all elements matching the given selector from the DOM."""
|
"""Removes all elements matching the given selector from the DOM."""
|
||||||
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def remove_operation():
|
||||||
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
|
self.page.locator(selector).evaluate_all("els => els.forEach(el => el.remove())")
|
||||||
|
|
||||||
|
self.safe_page_operation(remove_operation)
|
||||||
|
|
||||||
def action_make_all_child_elements_visible(self, selector, value):
|
def action_make_all_child_elements_visible(self, selector, value):
|
||||||
"""Recursively makes all child elements inside the given selector fully visible."""
|
"""Recursively makes all child elements inside the given selector fully visible."""
|
||||||
|
if not selector:
|
||||||
|
return
|
||||||
|
|
||||||
|
def make_visible_operation():
|
||||||
self.page.locator(selector).locator("*").evaluate_all("""
|
self.page.locator(selector).locator("*").evaluate_all("""
|
||||||
els => els.forEach(el => {
|
els => els.forEach(el => {
|
||||||
el.style.display = 'block'; // Forces it to be displayed
|
el.style.display = 'block'; // Forces it to be displayed
|
||||||
@@ -214,6 +355,8 @@ class steppable_browser_interface():
|
|||||||
})
|
})
|
||||||
""")
|
""")
|
||||||
|
|
||||||
|
self.safe_page_operation(make_visible_operation)
|
||||||
|
|
||||||
# Responsible for maintaining a live 'context' with the chrome CDP
|
# Responsible for maintaining a live 'context' with the chrome CDP
|
||||||
# @todo - how long do contexts live for anyway?
|
# @todo - how long do contexts live for anyway?
|
||||||
class browsersteps_live_ui(steppable_browser_interface):
|
class browsersteps_live_ui(steppable_browser_interface):
|
||||||
@@ -224,6 +367,8 @@ class browsersteps_live_ui(steppable_browser_interface):
|
|||||||
# bump and kill this if idle after X sec
|
# bump and kill this if idle after X sec
|
||||||
age_start = 0
|
age_start = 0
|
||||||
headers = {}
|
headers = {}
|
||||||
|
# Track if resources are properly cleaned up
|
||||||
|
_is_cleaned_up = False
|
||||||
|
|
||||||
# use a special driver, maybe locally etc
|
# use a special driver, maybe locally etc
|
||||||
command_executor = os.getenv(
|
command_executor = os.getenv(
|
||||||
@@ -243,9 +388,14 @@ class browsersteps_live_ui(steppable_browser_interface):
|
|||||||
self.age_start = time.time()
|
self.age_start = time.time()
|
||||||
self.playwright_browser = playwright_browser
|
self.playwright_browser = playwright_browser
|
||||||
self.start_url = start_url
|
self.start_url = start_url
|
||||||
|
self._is_cleaned_up = False
|
||||||
if self.context is None:
|
if self.context is None:
|
||||||
self.connect(proxy=proxy)
|
self.connect(proxy=proxy)
|
||||||
|
|
||||||
|
def __del__(self):
|
||||||
|
# Ensure cleanup happens if object is garbage collected
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
# Connect and setup a new context
|
# Connect and setup a new context
|
||||||
def connect(self, proxy=None):
|
def connect(self, proxy=None):
|
||||||
# Should only get called once - test that
|
# Should only get called once - test that
|
||||||
@@ -264,31 +414,74 @@ class browsersteps_live_ui(steppable_browser_interface):
|
|||||||
user_agent=manage_user_agent(headers=self.headers),
|
user_agent=manage_user_agent(headers=self.headers),
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
self.page = self.context.new_page()
|
self.page = self.context.new_page()
|
||||||
|
|
||||||
# self.page.set_default_navigation_timeout(keep_open)
|
# self.page.set_default_navigation_timeout(keep_open)
|
||||||
self.page.set_default_timeout(keep_open)
|
self.page.set_default_timeout(keep_open)
|
||||||
# @todo probably this doesnt work
|
# Set event handlers
|
||||||
self.page.on(
|
self.page.on("close", self.mark_as_closed)
|
||||||
"close",
|
|
||||||
self.mark_as_closed,
|
|
||||||
)
|
|
||||||
# Listen for all console events and handle errors
|
# Listen for all console events and handle errors
|
||||||
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
|
self.page.on("console", lambda msg: print(f"Browser steps console - {msg.type}: {msg.text} {msg.args}"))
|
||||||
|
|
||||||
logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
|
logger.debug(f"Time to browser setup {time.time()-now:.2f}s")
|
||||||
self.page.wait_for_timeout(1 * 1000)
|
self.page.wait_for_timeout(1 * 1000)
|
||||||
|
|
||||||
|
|
||||||
def mark_as_closed(self):
|
def mark_as_closed(self):
|
||||||
logger.debug("Page closed, cleaning up..")
|
logger.debug("Page closed, cleaning up..")
|
||||||
|
self.cleanup()
|
||||||
|
|
||||||
|
def cleanup(self):
|
||||||
|
"""Properly clean up all resources to prevent memory leaks"""
|
||||||
|
if self._is_cleaned_up:
|
||||||
|
return
|
||||||
|
|
||||||
|
logger.debug("Cleaning up browser steps resources")
|
||||||
|
|
||||||
|
# Clean up page
|
||||||
|
if hasattr(self, 'page') and self.page is not None:
|
||||||
|
try:
|
||||||
|
# Force garbage collection before closing
|
||||||
|
self.page.request_gc()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error during page garbage collection: {str(e)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Remove event listeners before closing
|
||||||
|
self.page.remove_listener("close", self.mark_as_closed)
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error removing event listeners: {str(e)}")
|
||||||
|
|
||||||
|
try:
|
||||||
|
self.page.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error closing page: {str(e)}")
|
||||||
|
|
||||||
|
self.page = None
|
||||||
|
|
||||||
|
# Clean up context
|
||||||
|
if hasattr(self, 'context') and self.context is not None:
|
||||||
|
try:
|
||||||
|
self.context.close()
|
||||||
|
except Exception as e:
|
||||||
|
logger.debug(f"Error closing context: {str(e)}")
|
||||||
|
|
||||||
|
self.context = None
|
||||||
|
|
||||||
|
self._is_cleaned_up = True
|
||||||
|
logger.debug("Browser steps resources cleanup complete")
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def has_expired(self):
|
def has_expired(self):
|
||||||
if not self.page:
|
if not self.page or self._is_cleaned_up:
|
||||||
return True
|
return True
|
||||||
|
|
||||||
|
# Check if session has expired based on age
|
||||||
|
max_age_seconds = int(os.getenv("BROWSER_STEPS_MAX_AGE_SECONDS", 60 * 10)) # Default 10 minutes
|
||||||
|
if (time.time() - self.age_start) > max_age_seconds:
|
||||||
|
logger.debug(f"Browser steps session expired after {max_age_seconds} seconds")
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
def get_current_state(self):
|
def get_current_state(self):
|
||||||
"""Return the screenshot and interactive elements mapping, generally always called after action_()"""
|
"""Return the screenshot and interactive elements mapping, generally always called after action_()"""
|
||||||
@@ -297,20 +490,27 @@ class browsersteps_live_ui(steppable_browser_interface):
|
|||||||
# because we for now only run browser steps in playwright mode (not puppeteer mode)
|
# because we for now only run browser steps in playwright mode (not puppeteer mode)
|
||||||
from changedetectionio.content_fetchers.playwright import capture_full_page
|
from changedetectionio.content_fetchers.playwright import capture_full_page
|
||||||
|
|
||||||
|
# Safety check - don't proceed if resources are cleaned up
|
||||||
|
if self._is_cleaned_up or self.page is None:
|
||||||
|
logger.warning("Attempted to get current state after cleanup")
|
||||||
|
return (None, None)
|
||||||
|
|
||||||
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
|
xpath_element_js = importlib.resources.files("changedetectionio.content_fetchers.res").joinpath('xpath_element_scraper.js').read_text()
|
||||||
|
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self.page.wait_for_timeout(1 * 1000)
|
self.page.wait_for_timeout(1 * 1000)
|
||||||
|
|
||||||
screenshot = capture_full_page(page=self.page)
|
screenshot = None
|
||||||
|
xpath_data = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Get screenshot first
|
||||||
|
screenshot = capture_full_page(page=self.page)
|
||||||
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
|
logger.debug(f"Time to get screenshot from browser {time.time() - now:.2f}s")
|
||||||
|
|
||||||
|
# Then get interactive elements
|
||||||
now = time.time()
|
now = time.time()
|
||||||
self.page.evaluate("var include_filters=''")
|
self.page.evaluate("var include_filters=''")
|
||||||
# Go find the interactive elements
|
|
||||||
# @todo in the future, something smarter that can scan for elements with .click/focus etc event handlers?
|
|
||||||
|
|
||||||
self.page.request_gc()
|
self.page.request_gc()
|
||||||
|
|
||||||
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
|
scan_elements = 'a,button,input,select,textarea,i,th,td,p,li,h1,h2,h3,h4,div,span'
|
||||||
@@ -322,11 +522,23 @@ class browsersteps_live_ui(steppable_browser_interface):
|
|||||||
}))
|
}))
|
||||||
self.page.request_gc()
|
self.page.request_gc()
|
||||||
|
|
||||||
# So the JS will find the smallest one first
|
# Sort elements by size
|
||||||
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
|
xpath_data['size_pos'] = sorted(xpath_data['size_pos'], key=lambda k: k['width'] * k['height'], reverse=True)
|
||||||
logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s")
|
logger.debug(f"Time to scrape xPath element data in browser {time.time()-now:.2f}s")
|
||||||
|
|
||||||
# playwright._impl._api_types.Error: Browser closed.
|
except Exception as e:
|
||||||
# @todo show some countdown timer?
|
logger.error(f"Error getting current state: {str(e)}")
|
||||||
|
# Attempt recovery - force garbage collection
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Request garbage collection one final time
|
||||||
|
try:
|
||||||
|
self.page.request_gc()
|
||||||
|
except:
|
||||||
|
pass
|
||||||
|
|
||||||
return (screenshot, xpath_data)
|
return (screenshot, xpath_data)
|
||||||
|
|
||||||
|
|||||||
@@ -233,6 +233,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
|
|
||||||
# Only works reliably with Playwright
|
# Only works reliably with Playwright
|
||||||
|
|
||||||
|
# Import the global plugin system
|
||||||
|
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
|
||||||
|
|
||||||
template_args = {
|
template_args = {
|
||||||
'available_processors': processors.available_processors(),
|
'available_processors': processors.available_processors(),
|
||||||
'available_timezones': sorted(available_timezones()),
|
'available_timezones': sorted(available_timezones()),
|
||||||
@@ -250,6 +253,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
|
|||||||
'settings_application': datastore.data['settings']['application'],
|
'settings_application': datastore.data['settings']['application'],
|
||||||
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
|
||||||
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
|
||||||
|
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
|
||||||
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
|
||||||
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
|
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
|
||||||
'using_global_webdriver_wait': not default['webdriver_delay'],
|
'using_global_webdriver_wait': not default['webdriver_delay'],
|
||||||
|
|||||||
@@ -102,12 +102,31 @@ def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_dat
|
|||||||
if complete_rules:
|
if complete_rules:
|
||||||
# Give all plugins a chance to update the data dict again (that we will test the conditions against)
|
# Give all plugins a chance to update the data dict again (that we will test the conditions against)
|
||||||
for plugin in plugin_manager.get_plugins():
|
for plugin in plugin_manager.get_plugins():
|
||||||
new_execute_data = plugin.add_data(current_watch_uuid=current_watch_uuid,
|
try:
|
||||||
application_datastruct=application_datastruct,
|
import concurrent.futures
|
||||||
ephemeral_data=ephemeral_data)
|
import time
|
||||||
|
|
||||||
|
with concurrent.futures.ThreadPoolExecutor() as executor:
|
||||||
|
future = executor.submit(
|
||||||
|
plugin.add_data,
|
||||||
|
current_watch_uuid=current_watch_uuid,
|
||||||
|
application_datastruct=application_datastruct,
|
||||||
|
ephemeral_data=ephemeral_data
|
||||||
|
)
|
||||||
|
|
||||||
|
# Set a timeout of 10 seconds
|
||||||
|
try:
|
||||||
|
new_execute_data = future.result(timeout=10)
|
||||||
if new_execute_data and isinstance(new_execute_data, dict):
|
if new_execute_data and isinstance(new_execute_data, dict):
|
||||||
EXECUTE_DATA.update(new_execute_data)
|
EXECUTE_DATA.update(new_execute_data)
|
||||||
|
except concurrent.futures.TimeoutError:
|
||||||
|
# The plugin took too long, abort processing for this watch
|
||||||
|
raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.")
|
||||||
|
except Exception as e:
|
||||||
|
# Log the error but continue with the next plugin
|
||||||
|
import logging
|
||||||
|
logging.error(f"Error executing plugin {plugin.__class__.__name__}: {str(e)}")
|
||||||
|
continue
|
||||||
|
|
||||||
# Create the ruleset
|
# Create the ruleset
|
||||||
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
|
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
|
||||||
@@ -132,3 +151,18 @@ for plugin in plugin_manager.get_plugins():
|
|||||||
if isinstance(new_field_choices, list):
|
if isinstance(new_field_choices, list):
|
||||||
field_choices.extend(new_field_choices)
|
field_choices.extend(new_field_choices)
|
||||||
|
|
||||||
|
def collect_ui_edit_stats_extras(watch):
|
||||||
|
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
|
||||||
|
extras_content = []
|
||||||
|
|
||||||
|
for plugin in plugin_manager.get_plugins():
|
||||||
|
try:
|
||||||
|
content = plugin.ui_edit_stats_extras(watch=watch)
|
||||||
|
if content:
|
||||||
|
extras_content.append(content)
|
||||||
|
except Exception as e:
|
||||||
|
# Skip plugins that don't implement the hook or have errors
|
||||||
|
pass
|
||||||
|
|
||||||
|
return "\n".join(extras_content) if extras_content else ""
|
||||||
|
|
||||||
|
|||||||
@@ -1,5 +1,8 @@
|
|||||||
import pluggy
|
import pluggy
|
||||||
from . import default_plugin # Import the default plugin
|
import os
|
||||||
|
import importlib
|
||||||
|
import sys
|
||||||
|
from . import default_plugin
|
||||||
|
|
||||||
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager
|
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager
|
||||||
PLUGIN_NAMESPACE = "changedetectionio_conditions"
|
PLUGIN_NAMESPACE = "changedetectionio_conditions"
|
||||||
@@ -31,6 +34,11 @@ class ConditionsSpec:
|
|||||||
"""Add to the datadict"""
|
"""Add to the datadict"""
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Return HTML content to add to the stats tab in the edit view"""
|
||||||
|
pass
|
||||||
|
|
||||||
# ✅ Set up Pluggy Plugin Manager
|
# ✅ Set up Pluggy Plugin Manager
|
||||||
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
@@ -40,5 +48,27 @@ plugin_manager.add_hookspecs(ConditionsSpec)
|
|||||||
# ✅ Register built-in plugins manually
|
# ✅ Register built-in plugins manually
|
||||||
plugin_manager.register(default_plugin, "default_plugin")
|
plugin_manager.register(default_plugin, "default_plugin")
|
||||||
|
|
||||||
|
# ✅ Load plugins from the plugins directory
|
||||||
|
def load_plugins_from_directory():
|
||||||
|
plugins_dir = os.path.join(os.path.dirname(__file__), 'plugins')
|
||||||
|
if not os.path.exists(plugins_dir):
|
||||||
|
return
|
||||||
|
|
||||||
|
# Get all Python files (excluding __init__.py)
|
||||||
|
for filename in os.listdir(plugins_dir):
|
||||||
|
if filename.endswith(".py") and filename != "__init__.py":
|
||||||
|
module_name = filename[:-3] # Remove .py extension
|
||||||
|
module_path = f"changedetectionio.conditions.plugins.{module_name}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
module = importlib.import_module(module_path)
|
||||||
|
# Register the plugin with pluggy
|
||||||
|
plugin_manager.register(module, module_name)
|
||||||
|
except (ImportError, AttributeError) as e:
|
||||||
|
print(f"Error loading plugin {module_name}: {e}")
|
||||||
|
|
||||||
|
# Load plugins from the plugins directory
|
||||||
|
load_plugins_from_directory()
|
||||||
|
|
||||||
# ✅ Discover installed plugins from external packages (if any)
|
# ✅ Discover installed plugins from external packages (if any)
|
||||||
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
|
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
|
||||||
|
|||||||
1
changedetectionio/conditions/plugins/__init__.py
Normal file
1
changedetectionio/conditions/plugins/__init__.py
Normal file
@@ -0,0 +1 @@
|
|||||||
|
# Import plugins package to make them discoverable
|
||||||
102
changedetectionio/conditions/plugins/levenshtein_plugin.py
Normal file
102
changedetectionio/conditions/plugins/levenshtein_plugin.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
import pluggy
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
# Support both plugin systems
|
||||||
|
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
|
||||||
|
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
|
||||||
|
|
||||||
|
def levenshtein_ratio_recent_history(watch, incoming_text=None):
|
||||||
|
try:
|
||||||
|
from Levenshtein import ratio, distance
|
||||||
|
k = list(watch.history.keys())
|
||||||
|
if len(k) >= 2:
|
||||||
|
# When called from ui_edit_stats_extras, we don't have incoming_text
|
||||||
|
if incoming_text is None:
|
||||||
|
a = watch.get_history_snapshot(timestamp=k[-1]) # Latest snapshot
|
||||||
|
b = watch.get_history_snapshot(timestamp=k[-2]) # Previous snapshot
|
||||||
|
else:
|
||||||
|
a = watch.get_history_snapshot(timestamp=k[-2]) # Second newest, incoming_text will be "newest"
|
||||||
|
b = incoming_text
|
||||||
|
|
||||||
|
distance_value = distance(a, b)
|
||||||
|
ratio_value = ratio(a, b)
|
||||||
|
return {
|
||||||
|
'distance': distance_value,
|
||||||
|
'ratio': ratio_value,
|
||||||
|
'percent_similar': round(ratio_value * 100, 2)
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Unable to calc similarity: {str(e)}")
|
||||||
|
|
||||||
|
return ''
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_operators():
|
||||||
|
pass
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_operator_choices():
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_field_choices():
|
||||||
|
return [
|
||||||
|
("levenshtein_ratio", "Levenshtein - Text similarity ratio"),
|
||||||
|
("levenshtein_distance", "Levenshtein - Text change distance"),
|
||||||
|
]
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
|
||||||
|
res = {}
|
||||||
|
watch = application_datastruct['watching'].get(current_watch_uuid)
|
||||||
|
# ephemeral_data['text'] will be the current text after filters, they may have edited filters but not saved them yet etc
|
||||||
|
|
||||||
|
if watch and 'text' in ephemeral_data:
|
||||||
|
lev_data = levenshtein_ratio_recent_history(watch, ephemeral_data['text'])
|
||||||
|
if isinstance(lev_data, dict):
|
||||||
|
res['levenshtein_ratio'] = lev_data.get('ratio', 0)
|
||||||
|
res['levenshtein_similarity'] = lev_data.get('percent_similar', 0)
|
||||||
|
res['levenshtein_distance'] = lev_data.get('distance', 0)
|
||||||
|
|
||||||
|
return res
|
||||||
|
|
||||||
|
@global_hookimpl
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Add Levenshtein stats to the UI using the global plugin system"""
|
||||||
|
"""Generate the HTML for Levenshtein stats - shared by both plugin systems"""
|
||||||
|
if len(watch.history.keys()) < 2:
|
||||||
|
return "<p>Not enough history to calculate Levenshtein metrics</p>"
|
||||||
|
|
||||||
|
try:
|
||||||
|
lev_data = levenshtein_ratio_recent_history(watch)
|
||||||
|
if not lev_data or not isinstance(lev_data, dict):
|
||||||
|
return "<p>Unable to calculate Levenshtein metrics</p>"
|
||||||
|
|
||||||
|
html = f"""
|
||||||
|
<div class="levenshtein-stats">
|
||||||
|
<h4>Levenshtein Text Similarity Details</h4>
|
||||||
|
<table class="pure-table">
|
||||||
|
<tbody>
|
||||||
|
<tr>
|
||||||
|
<td>Raw distance (edits needed)</td>
|
||||||
|
<td>{lev_data['distance']}</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Similarity ratio</td>
|
||||||
|
<td>{lev_data['ratio']:.4f}</td>
|
||||||
|
</tr>
|
||||||
|
<tr>
|
||||||
|
<td>Percent similar</td>
|
||||||
|
<td>{lev_data['percent_similar']}%</td>
|
||||||
|
</tr>
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
<p style="font-size: 80%;">Levenshtein metrics compare the last two snapshots, measuring how many character edits are needed to transform one into the other.</p>
|
||||||
|
</div>
|
||||||
|
"""
|
||||||
|
return html
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error generating Levenshtein UI extras: {str(e)}")
|
||||||
|
return "<p>Error calculating Levenshtein metrics</p>"
|
||||||
|
|
||||||
82
changedetectionio/conditions/plugins/wordcount_plugin.py
Normal file
82
changedetectionio/conditions/plugins/wordcount_plugin.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
import pluggy
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
|
# Support both plugin systems
|
||||||
|
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
|
||||||
|
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
|
||||||
|
|
||||||
|
def count_words_in_history(watch, incoming_text=None):
|
||||||
|
"""Count words in snapshot text"""
|
||||||
|
try:
|
||||||
|
if incoming_text is not None:
|
||||||
|
# When called from add_data with incoming text
|
||||||
|
return len(incoming_text.split())
|
||||||
|
elif watch.history.keys():
|
||||||
|
# When called from UI extras to count latest snapshot
|
||||||
|
latest_key = list(watch.history.keys())[-1]
|
||||||
|
latest_content = watch.get_history_snapshot(latest_key)
|
||||||
|
return len(latest_content.split())
|
||||||
|
return 0
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error counting words: {str(e)}")
|
||||||
|
return 0
|
||||||
|
|
||||||
|
# Implement condition plugin hooks
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_operators():
|
||||||
|
# No custom operators needed
|
||||||
|
return {}
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_operator_choices():
|
||||||
|
# No custom operator choices needed
|
||||||
|
return []
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def register_field_choices():
|
||||||
|
# Add a field that will be available in conditions
|
||||||
|
return [
|
||||||
|
("word_count", "Word count of content"),
|
||||||
|
]
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
|
||||||
|
"""Add word count data for conditions"""
|
||||||
|
result = {}
|
||||||
|
watch = application_datastruct['watching'].get(current_watch_uuid)
|
||||||
|
|
||||||
|
if watch and 'text' in ephemeral_data:
|
||||||
|
word_count = count_words_in_history(watch, ephemeral_data['text'])
|
||||||
|
result['word_count'] = word_count
|
||||||
|
|
||||||
|
return result
|
||||||
|
|
||||||
|
def _generate_stats_html(watch):
|
||||||
|
"""Generate the HTML content for the stats tab"""
|
||||||
|
word_count = count_words_in_history(watch)
|
||||||
|
|
||||||
|
html = f"""
|
||||||
|
<div class="word-count-stats">
|
||||||
|
<h4>Content Analysis</h4>
|
||||||
|
<table class="pure-table">
|
||||||
|
<tbody>
|
||||||
|
<tr>
|
||||||
|
<td>Word count (latest snapshot)</td>
|
||||||
|
<td>{word_count}</td>
|
||||||
|
</tr>
|
||||||
|
</tbody>
|
||||||
|
</table>
|
||||||
|
<p style="font-size: 80%;">Word count is a simple measure of content length, calculated by splitting text on whitespace.</p>
|
||||||
|
</div>
|
||||||
|
"""
|
||||||
|
return html
|
||||||
|
|
||||||
|
@conditions_hookimpl
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Add word count stats to the UI through conditions plugin system"""
|
||||||
|
return _generate_stats_html(watch)
|
||||||
|
|
||||||
|
@global_hookimpl
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Add word count stats to the UI using the global plugin system"""
|
||||||
|
return _generate_stats_html(watch)
|
||||||
82
changedetectionio/pluggy_interface.py
Normal file
82
changedetectionio/pluggy_interface.py
Normal file
@@ -0,0 +1,82 @@
|
|||||||
|
import pluggy
|
||||||
|
import os
|
||||||
|
import importlib
|
||||||
|
import sys
|
||||||
|
|
||||||
|
# Global plugin namespace for changedetection.io
|
||||||
|
PLUGIN_NAMESPACE = "changedetectionio"
|
||||||
|
|
||||||
|
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
|
||||||
|
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
|
|
||||||
|
class ChangeDetectionSpec:
|
||||||
|
"""Hook specifications for extending changedetection.io functionality."""
|
||||||
|
|
||||||
|
@hookspec
|
||||||
|
def ui_edit_stats_extras(watch):
|
||||||
|
"""Return HTML content to add to the stats tab in the edit view.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
watch: The watch object being edited
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: HTML content to be inserted in the stats tab
|
||||||
|
"""
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
# Set up Plugin Manager
|
||||||
|
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
|
# Register hookspecs
|
||||||
|
plugin_manager.add_hookspecs(ChangeDetectionSpec)
|
||||||
|
|
||||||
|
# Load plugins from subdirectories
|
||||||
|
def load_plugins_from_directories():
|
||||||
|
# Dictionary of directories to scan for plugins
|
||||||
|
plugin_dirs = {
|
||||||
|
'conditions': os.path.join(os.path.dirname(__file__), 'conditions', 'plugins'),
|
||||||
|
# Add more plugin directories here as needed
|
||||||
|
}
|
||||||
|
|
||||||
|
# Note: Removed the direct import of example_word_count_plugin as it's now in the conditions/plugins directory
|
||||||
|
|
||||||
|
for dir_name, dir_path in plugin_dirs.items():
|
||||||
|
if not os.path.exists(dir_path):
|
||||||
|
continue
|
||||||
|
|
||||||
|
# Get all Python files (excluding __init__.py)
|
||||||
|
for filename in os.listdir(dir_path):
|
||||||
|
if filename.endswith(".py") and filename != "__init__.py":
|
||||||
|
module_name = filename[:-3] # Remove .py extension
|
||||||
|
module_path = f"changedetectionio.{dir_name}.plugins.{module_name}"
|
||||||
|
|
||||||
|
try:
|
||||||
|
module = importlib.import_module(module_path)
|
||||||
|
# Register the plugin with pluggy
|
||||||
|
plugin_manager.register(module, module_name)
|
||||||
|
except (ImportError, AttributeError) as e:
|
||||||
|
print(f"Error loading plugin {module_name}: {e}")
|
||||||
|
|
||||||
|
# Load plugins
|
||||||
|
load_plugins_from_directories()
|
||||||
|
|
||||||
|
# Discover installed plugins from external packages (if any)
|
||||||
|
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
|
||||||
|
|
||||||
|
# Helper function to collect UI stats extras from all plugins
|
||||||
|
def collect_ui_edit_stats_extras(watch):
|
||||||
|
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
|
||||||
|
extras_content = []
|
||||||
|
|
||||||
|
# Get all plugins that implement the ui_edit_stats_extras hook
|
||||||
|
results = plugin_manager.hook.ui_edit_stats_extras(watch=watch)
|
||||||
|
|
||||||
|
# If we have results, add them to our content
|
||||||
|
if results:
|
||||||
|
for result in results:
|
||||||
|
if result: # Skip empty results
|
||||||
|
extras_content.append(result)
|
||||||
|
|
||||||
|
return "\n".join(extras_content) if extras_content else ""
|
||||||
@@ -211,7 +211,14 @@ $(document).ready(function () {
|
|||||||
$('input[type=text]', first_available).first().val(x['xpath']);
|
$('input[type=text]', first_available).first().val(x['xpath']);
|
||||||
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
|
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
|
||||||
found_something = true;
|
found_something = true;
|
||||||
} else {
|
}
|
||||||
|
else if (x['tagName'] === 'select') {
|
||||||
|
$('select', first_available).val('<select> by option text').change();
|
||||||
|
$('input[type=text]', first_available).first().val(x['xpath']);
|
||||||
|
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
|
||||||
|
found_something = true;
|
||||||
|
}
|
||||||
|
else {
|
||||||
// There's no good way (that I know) to find if this
|
// There's no good way (that I know) to find if this
|
||||||
// see https://stackoverflow.com/questions/446892/how-to-find-event-listeners-on-a-dom-node-in-javascript-or-in-debugging
|
// see https://stackoverflow.com/questions/446892/how-to-find-event-listeners-on-a-dom-node-in-javascript-or-in-debugging
|
||||||
// https://codepen.io/azaslavsky/pen/DEJVWv
|
// https://codepen.io/azaslavsky/pen/DEJVWv
|
||||||
@@ -251,6 +258,10 @@ $(document).ready(function () {
|
|||||||
400: function () {
|
400: function () {
|
||||||
// More than likely the CSRF token was lost when the server restarted
|
// More than likely the CSRF token was lost when the server restarted
|
||||||
alert("There was a problem processing the request, please reload the page.");
|
alert("There was a problem processing the request, please reload the page.");
|
||||||
|
},
|
||||||
|
401: function (err) {
|
||||||
|
// This will be a custom error
|
||||||
|
alert(err.responseText);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}).done(function (data) {
|
}).done(function (data) {
|
||||||
|
|||||||
@@ -450,6 +450,13 @@ Math: {{ 1 + 1 }}") }}
|
|||||||
</tr>
|
</tr>
|
||||||
</tbody>
|
</tbody>
|
||||||
</table>
|
</table>
|
||||||
|
|
||||||
|
{% if ui_edit_stats_extras %}
|
||||||
|
<div class="plugin-stats-extras"> <!-- from pluggy plugin -->
|
||||||
|
{{ ui_edit_stats_extras|safe }}
|
||||||
|
</div>
|
||||||
|
{% endif %}
|
||||||
|
|
||||||
{% if watch.history_n %}
|
{% if watch.history_n %}
|
||||||
<p>
|
<p>
|
||||||
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">Download latest HTML snapshot</a>
|
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">Download latest HTML snapshot</a>
|
||||||
|
|||||||
@@ -45,11 +45,15 @@ def set_number_out_of_range_response(number="150"):
|
|||||||
f.write(test_return_data)
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
|
def test_setup(client, live_server):
|
||||||
|
"""Test that both text and number conditions work together with AND logic."""
|
||||||
|
live_server_setup(live_server)
|
||||||
|
|
||||||
def test_conditions_with_text_and_number(client, live_server):
|
def test_conditions_with_text_and_number(client, live_server):
|
||||||
"""Test that both text and number conditions work together with AND logic."""
|
"""Test that both text and number conditions work together with AND logic."""
|
||||||
|
|
||||||
set_original_response("50")
|
set_original_response("50")
|
||||||
live_server_setup(live_server)
|
#live_server_setup(live_server)
|
||||||
|
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
|
||||||
@@ -195,3 +199,40 @@ def test_condition_validate_rule_row(client, live_server):
|
|||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
# If there was only a change in the whitespacing, then we shouldnt have a change detected
|
||||||
|
def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
|
||||||
|
test_return_data = """<html>
|
||||||
|
<body>
|
||||||
|
Some initial text<br>
|
||||||
|
<p>Which is across multiple lines</p>
|
||||||
|
<br>
|
||||||
|
So let's see what happens. <br>
|
||||||
|
</body>
|
||||||
|
</html>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
res = client.post(
|
||||||
|
url_for("imports.import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
# Give the thread time to pick it up
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
# Check it saved
|
||||||
|
res = client.get(
|
||||||
|
url_for("ui.ui_edit.edit_page", uuid="first"),
|
||||||
|
)
|
||||||
|
|
||||||
|
# Assert the word count is counted correctly
|
||||||
|
assert b'<td>13</td>' in res.data
|
||||||
@@ -32,13 +32,14 @@ def test_strip_text_func():
|
|||||||
stripped_content = html_tools.strip_ignore_text(test_content, ignore)
|
stripped_content = html_tools.strip_ignore_text(test_content, ignore)
|
||||||
assert stripped_content == "Some initial text\n\nWhich is across multiple lines\n\n\n\nSo let's see what happens."
|
assert stripped_content == "Some initial text\n\nWhich is across multiple lines\n\n\n\nSo let's see what happens."
|
||||||
|
|
||||||
def set_original_ignore_response():
|
def set_original_ignore_response(ver_stamp="123"):
|
||||||
test_return_data = """<html>
|
test_return_data = f"""<html>
|
||||||
<body>
|
<body>
|
||||||
Some initial text<br>
|
Some initial text<br>
|
||||||
<p>Which is across multiple lines</p>
|
<p>Which is across multiple lines</p>
|
||||||
<br>
|
<br>
|
||||||
So let's see what happens. <br>
|
So let's see what happens. <br>
|
||||||
|
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|
||||||
@@ -48,13 +49,14 @@ def set_original_ignore_response():
|
|||||||
f.write(test_return_data)
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
def set_modified_original_ignore_response():
|
def set_modified_original_ignore_response(ver_stamp="123"):
|
||||||
test_return_data = """<html>
|
test_return_data = f"""<html>
|
||||||
<body>
|
<body>
|
||||||
Some NEW nice initial text<br>
|
Some NEW nice initial text<br>
|
||||||
<p>Which is across multiple lines</p>
|
<p>Which is across multiple lines</p>
|
||||||
<br>
|
<br>
|
||||||
So let's see what happens. <br>
|
So let's see what happens. <br>
|
||||||
|
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
|
||||||
<p>new ignore stuff</p>
|
<p>new ignore stuff</p>
|
||||||
<p>blah</p>
|
<p>blah</p>
|
||||||
</body>
|
</body>
|
||||||
@@ -67,14 +69,15 @@ def set_modified_original_ignore_response():
|
|||||||
|
|
||||||
|
|
||||||
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
|
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
|
||||||
def set_modified_ignore_response():
|
def set_modified_ignore_response(ver_stamp="123"):
|
||||||
test_return_data = """<html>
|
test_return_data = f"""<html>
|
||||||
<body>
|
<body>
|
||||||
Some initial text<br>
|
Some initial text<br>
|
||||||
<p>Which is across multiple lines</p>
|
<p>Which is across multiple lines</p>
|
||||||
<P>ZZZZz</P>
|
<P>ZZZZz</P>
|
||||||
<br>
|
<br>
|
||||||
So let's see what happens. <br>
|
So let's see what happens. <br>
|
||||||
|
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
|
||||||
</body>
|
</body>
|
||||||
</html>
|
</html>
|
||||||
|
|
||||||
@@ -165,9 +168,9 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
|
|||||||
assert b'Deleted' in res.data
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
# When adding some ignore text, it should not trigger a change, even if something else on that line changes
|
# When adding some ignore text, it should not trigger a change, even if something else on that line changes
|
||||||
def test_check_global_ignore_text_functionality(client, live_server, measure_memory_usage):
|
def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
|
||||||
#live_server_setup(live_server)
|
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ\r\n"+extra_ignore
|
||||||
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ"
|
|
||||||
set_original_ignore_response()
|
set_original_ignore_response()
|
||||||
|
|
||||||
# Goto the settings page, add our ignore text
|
# Goto the settings page, add our ignore text
|
||||||
@@ -186,6 +189,10 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
|
|||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', _external=True)
|
test_url = url_for('test_endpoint', _external=True)
|
||||||
|
if as_source:
|
||||||
|
# Switch to source mode so we can test that too!
|
||||||
|
test_url = "source:"+test_url
|
||||||
|
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("imports.import_page"),
|
url_for("imports.import_page"),
|
||||||
data={"urls": test_url},
|
data={"urls": test_url},
|
||||||
@@ -203,12 +210,15 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
assert b"Updated watch." in res.data
|
assert b"Updated watch." in res.data
|
||||||
|
wait_for_all_checks(client)
|
||||||
# Check it saved
|
# Check it saved
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("settings.settings_page"),
|
url_for("settings.settings_page"),
|
||||||
)
|
)
|
||||||
assert bytes(ignore_text.encode('utf-8')) in res.data
|
|
||||||
|
for i in ignore_text.splitlines():
|
||||||
|
assert bytes(i.encode('utf-8')) in res.data
|
||||||
|
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
@@ -221,7 +231,8 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
|
|||||||
|
|
||||||
# Make a change which includes the ignore text, it should be ignored and no 'change' triggered
|
# Make a change which includes the ignore text, it should be ignored and no 'change' triggered
|
||||||
# It adds text with "ZZZZzzzz" and "ZZZZ" is in the ignore list
|
# It adds text with "ZZZZzzzz" and "ZZZZ" is in the ignore list
|
||||||
set_modified_ignore_response()
|
# And tweaks the ver_stamp which should be picked up by global regex ignore
|
||||||
|
set_modified_ignore_response(ver_stamp=time.time())
|
||||||
|
|
||||||
# Trigger a check
|
# Trigger a check
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
@@ -243,3 +254,11 @@ def test_check_global_ignore_text_functionality(client, live_server, measure_mem
|
|||||||
|
|
||||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||||
assert b'Deleted' in res.data
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
|
def test_check_global_ignore_text_functionality(client, live_server):
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
_run_test_global_ignore(client, as_source=False)
|
||||||
|
|
||||||
|
def test_check_global_ignore_text_functionality_as_source(client, live_server):
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
_run_test_global_ignore(client, as_source=True, extra_ignore='/\?v=\d/')
|
||||||
|
|||||||
Reference in New Issue
Block a user