Compare commits

..

4 Commits

Author SHA1 Message Date
dgtlmoon
960c0510b3 Fix error handler 2025-04-22 11:25:30 +02:00
dgtlmoon
440847820f Revert "Better error reporting"
This reverts commit c9f0921b02.
2025-04-22 11:15:28 +02:00
dgtlmoon
c9f0921b02 Better error reporting 2025-04-22 11:11:57 +02:00
dgtlmoon
0d1366dfb9 Make browsersteps UI a little more resilient 2025-04-22 11:03:47 +02:00
18 changed files with 38 additions and 576 deletions

View File

@@ -1,98 +0,0 @@
# Creating Plugins for changedetection.io
This document describes how to create plugins for changedetection.io. Plugins can be used to extend the functionality of the application in various ways.
## Plugin Types
### UI Stats Tab Plugins
These plugins can add content to the Stats tab in the Edit page. This is useful for adding custom statistics or visualizations about a watch.
#### Creating a UI Stats Tab Plugin
1. Create a Python file in a directory that will be loaded by the plugin system.
2. Use the `global_hookimpl` decorator to implement the `ui_edit_stats_extras` hook:
```python
import pluggy
from loguru import logger
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add custom content to the stats tab"""
# Calculate or retrieve your stats
my_stat = calculate_something(watch)
# Return HTML content as a string
html = f"""
<div class="my-plugin-stats">
<h4>My Plugin Statistics</h4>
<p>My statistic: {my_stat}</p>
</div>
"""
return html
```
3. The HTML you return will be included in the Stats tab.
## Plugin Loading
Plugins can be loaded from:
1. Built-in plugin directories in the codebase
2. External packages using setuptools entry points
To add a new plugin directory, modify the `plugin_dirs` dictionary in `pluggy_interface.py`.
## Example Plugin
Here's a simple example of a plugin that adds a word count statistic to the Stats tab:
```python
import pluggy
from loguru import logger
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def count_words_in_history(watch):
"""Count words in the latest snapshot"""
try:
if not watch.history.keys():
return 0
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
return len(latest_content.split())
except Exception as e:
logger.error(f"Error counting words: {str(e)}")
return 0
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count to the Stats tab"""
word_count = count_words_in_history(watch)
html = f"""
<div class="word-count-stats">
<h4>Content Analysis</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Word count (latest snapshot)</td>
<td>{word_count}</td>
</tr>
</tbody>
</table>
</div>
"""
return html
```
## Testing Your Plugin
1. Place your plugin in one of the directories scanned by the plugin system
2. Restart changedetection.io
3. Go to the Edit page of a watch and check the Stats tab to see your content

View File

@@ -37,7 +37,6 @@ browser_step_ui_config = {'Choose one': '0 0',
'Make all child elements visible': '1 0',
'Press Enter': '0 0',
'Select by label': '1 1',
'<select> by option text': '1 1',
'Scroll down': '0 0',
'Uncheck checkbox': '1 0',
'Wait for seconds': '0 1',
@@ -229,15 +228,6 @@ class steppable_browser_interface():
except Exception as e:
logger.error(f"Error parsing x,y coordinates: {str(e)}")
def action__select_by_option_text(self, selector, value):
if not selector or not len(selector.strip()):
return
def select_operation():
self.page.select_option(selector, label=value, timeout=self.action_timeout)
self.safe_page_operation(select_operation)
def action_scroll_down(self, selector, value):
def scroll_operation():
# Some sites this doesnt work on for some reason

View File

@@ -233,9 +233,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# Only works reliably with Playwright
# Import the global plugin system
from changedetectionio.pluggy_interface import collect_ui_edit_stats_extras
template_args = {
'available_processors': processors.available_processors(),
'available_timezones': sorted(available_timezones()),
@@ -253,7 +250,6 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
'settings_application': datastore.data['settings']['application'],
'system_has_playwright_configured': os.getenv('PLAYWRIGHT_DRIVER_URL'),
'system_has_webdriver_configured': os.getenv('WEBDRIVER_URL'),
'ui_edit_stats_extras': collect_ui_edit_stats_extras(watch),
'visual_selector_data_ready': datastore.visualselector_data_is_ready(watch_uuid=uuid),
'timezone_default_config': datastore.data['settings']['application'].get('timezone'),
'using_global_webdriver_wait': not default['webdriver_delay'],

View File

@@ -102,31 +102,12 @@ def execute_ruleset_against_all_plugins(current_watch_uuid: str, application_dat
if complete_rules:
# Give all plugins a chance to update the data dict again (that we will test the conditions against)
for plugin in plugin_manager.get_plugins():
try:
import concurrent.futures
import time
with concurrent.futures.ThreadPoolExecutor() as executor:
future = executor.submit(
plugin.add_data,
current_watch_uuid=current_watch_uuid,
application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data
)
# Set a timeout of 10 seconds
try:
new_execute_data = future.result(timeout=10)
if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data)
except concurrent.futures.TimeoutError:
# The plugin took too long, abort processing for this watch
raise Exception(f"Plugin {plugin.__class__.__name__} took more than 10 seconds to run.")
except Exception as e:
# Log the error but continue with the next plugin
import logging
logging.error(f"Error executing plugin {plugin.__class__.__name__}: {str(e)}")
continue
new_execute_data = plugin.add_data(current_watch_uuid=current_watch_uuid,
application_datastruct=application_datastruct,
ephemeral_data=ephemeral_data)
if new_execute_data and isinstance(new_execute_data, dict):
EXECUTE_DATA.update(new_execute_data)
# Create the ruleset
ruleset = convert_to_jsonlogic(logic_operator=logic_operator, rule_dict=complete_rules)
@@ -151,18 +132,3 @@ for plugin in plugin_manager.get_plugins():
if isinstance(new_field_choices, list):
field_choices.extend(new_field_choices)
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
for plugin in plugin_manager.get_plugins():
try:
content = plugin.ui_edit_stats_extras(watch=watch)
if content:
extras_content.append(content)
except Exception as e:
# Skip plugins that don't implement the hook or have errors
pass
return "\n".join(extras_content) if extras_content else ""

View File

@@ -1,8 +1,5 @@
import pluggy
import os
import importlib
import sys
from . import default_plugin
from . import default_plugin # Import the default plugin
# ✅ Ensure that the namespace in HookspecMarker matches PluginManager
PLUGIN_NAMESPACE = "changedetectionio_conditions"
@@ -33,11 +30,6 @@ class ConditionsSpec:
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
"""Add to the datadict"""
pass
@hookspec
def ui_edit_stats_extras(watch):
"""Return HTML content to add to the stats tab in the edit view"""
pass
# ✅ Set up Pluggy Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
@@ -48,27 +40,5 @@ plugin_manager.add_hookspecs(ConditionsSpec)
# ✅ Register built-in plugins manually
plugin_manager.register(default_plugin, "default_plugin")
# ✅ Load plugins from the plugins directory
def load_plugins_from_directory():
plugins_dir = os.path.join(os.path.dirname(__file__), 'plugins')
if not os.path.exists(plugins_dir):
return
# Get all Python files (excluding __init__.py)
for filename in os.listdir(plugins_dir):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove .py extension
module_path = f"changedetectionio.conditions.plugins.{module_name}"
try:
module = importlib.import_module(module_path)
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
print(f"Error loading plugin {module_name}: {e}")
# Load plugins from the plugins directory
load_plugins_from_directory()
# ✅ Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)

View File

@@ -1 +0,0 @@
# Import plugins package to make them discoverable

View File

@@ -1,102 +0,0 @@
import pluggy
from loguru import logger
# Support both plugin systems
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def levenshtein_ratio_recent_history(watch, incoming_text=None):
try:
from Levenshtein import ratio, distance
k = list(watch.history.keys())
if len(k) >= 2:
# When called from ui_edit_stats_extras, we don't have incoming_text
if incoming_text is None:
a = watch.get_history_snapshot(timestamp=k[-1]) # Latest snapshot
b = watch.get_history_snapshot(timestamp=k[-2]) # Previous snapshot
else:
a = watch.get_history_snapshot(timestamp=k[-2]) # Second newest, incoming_text will be "newest"
b = incoming_text
distance_value = distance(a, b)
ratio_value = ratio(a, b)
return {
'distance': distance_value,
'ratio': ratio_value,
'percent_similar': round(ratio_value * 100, 2)
}
except Exception as e:
logger.warning(f"Unable to calc similarity: {str(e)}")
return ''
@conditions_hookimpl
def register_operators():
pass
@conditions_hookimpl
def register_operator_choices():
pass
@conditions_hookimpl
def register_field_choices():
return [
("levenshtein_ratio", "Levenshtein - Text similarity ratio"),
("levenshtein_distance", "Levenshtein - Text change distance"),
]
@conditions_hookimpl
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
res = {}
watch = application_datastruct['watching'].get(current_watch_uuid)
# ephemeral_data['text'] will be the current text after filters, they may have edited filters but not saved them yet etc
if watch and 'text' in ephemeral_data:
lev_data = levenshtein_ratio_recent_history(watch, ephemeral_data['text'])
if isinstance(lev_data, dict):
res['levenshtein_ratio'] = lev_data.get('ratio', 0)
res['levenshtein_similarity'] = lev_data.get('percent_similar', 0)
res['levenshtein_distance'] = lev_data.get('distance', 0)
return res
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add Levenshtein stats to the UI using the global plugin system"""
"""Generate the HTML for Levenshtein stats - shared by both plugin systems"""
if len(watch.history.keys()) < 2:
return "<p>Not enough history to calculate Levenshtein metrics</p>"
try:
lev_data = levenshtein_ratio_recent_history(watch)
if not lev_data or not isinstance(lev_data, dict):
return "<p>Unable to calculate Levenshtein metrics</p>"
html = f"""
<div class="levenshtein-stats">
<h4>Levenshtein Text Similarity Details</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Raw distance (edits needed)</td>
<td>{lev_data['distance']}</td>
</tr>
<tr>
<td>Similarity ratio</td>
<td>{lev_data['ratio']:.4f}</td>
</tr>
<tr>
<td>Percent similar</td>
<td>{lev_data['percent_similar']}%</td>
</tr>
</tbody>
</table>
<p style="font-size: 80%;">Levenshtein metrics compare the last two snapshots, measuring how many character edits are needed to transform one into the other.</p>
</div>
"""
return html
except Exception as e:
logger.error(f"Error generating Levenshtein UI extras: {str(e)}")
return "<p>Error calculating Levenshtein metrics</p>"

View File

@@ -1,82 +0,0 @@
import pluggy
from loguru import logger
# Support both plugin systems
conditions_hookimpl = pluggy.HookimplMarker("changedetectionio_conditions")
global_hookimpl = pluggy.HookimplMarker("changedetectionio")
def count_words_in_history(watch, incoming_text=None):
"""Count words in snapshot text"""
try:
if incoming_text is not None:
# When called from add_data with incoming text
return len(incoming_text.split())
elif watch.history.keys():
# When called from UI extras to count latest snapshot
latest_key = list(watch.history.keys())[-1]
latest_content = watch.get_history_snapshot(latest_key)
return len(latest_content.split())
return 0
except Exception as e:
logger.error(f"Error counting words: {str(e)}")
return 0
# Implement condition plugin hooks
@conditions_hookimpl
def register_operators():
# No custom operators needed
return {}
@conditions_hookimpl
def register_operator_choices():
# No custom operator choices needed
return []
@conditions_hookimpl
def register_field_choices():
# Add a field that will be available in conditions
return [
("word_count", "Word count of content"),
]
@conditions_hookimpl
def add_data(current_watch_uuid, application_datastruct, ephemeral_data):
"""Add word count data for conditions"""
result = {}
watch = application_datastruct['watching'].get(current_watch_uuid)
if watch and 'text' in ephemeral_data:
word_count = count_words_in_history(watch, ephemeral_data['text'])
result['word_count'] = word_count
return result
def _generate_stats_html(watch):
"""Generate the HTML content for the stats tab"""
word_count = count_words_in_history(watch)
html = f"""
<div class="word-count-stats">
<h4>Content Analysis</h4>
<table class="pure-table">
<tbody>
<tr>
<td>Word count (latest snapshot)</td>
<td>{word_count}</td>
</tr>
</tbody>
</table>
<p style="font-size: 80%;">Word count is a simple measure of content length, calculated by splitting text on whitespace.</p>
</div>
"""
return html
@conditions_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count stats to the UI through conditions plugin system"""
return _generate_stats_html(watch)
@global_hookimpl
def ui_edit_stats_extras(watch):
"""Add word count stats to the UI using the global plugin system"""
return _generate_stats_html(watch)

View File

@@ -147,7 +147,7 @@ class fetcher(Fetcher):
is_binary,
empty_pages_are_a_change
):
import re
self.delete_browser_steps_screenshots()
extra_wait = int(os.getenv("WEBDRIVER_DELAY_BEFORE_CONTENT_READY", 5)) + self.render_extract_delay
@@ -172,17 +172,6 @@ class fetcher(Fetcher):
# headless - ask a new page
self.page = (pages := await browser.pages) and len(pages) or await browser.newPage()
if '--window-size' in self.browser_connection_url:
# Be sure the viewport is always the window-size, this is often not the same thing
match = re.search(r'--window-size=(\d+),(\d+)', self.browser_connection_url)
if match:
logger.debug(f"Setting viewport to same as --window-size in browser connection URL {int(match.group(1))},{int(match.group(2))}")
await self.page.setViewport({
"width": int(match.group(1)),
"height": int(match.group(2))
})
logger.debug(f"Puppeteer viewport size {self.page.viewport}")
try:
from pyppeteerstealth import inject_evasions_into_page
except ImportError:
@@ -229,6 +218,7 @@ class fetcher(Fetcher):
response = await self.page.goto(url, waitUntil="load")
if response is None:
await self.page.close()
await browser.close()

View File

@@ -125,20 +125,6 @@ async () => {
// so it's good to filter to just the 'above the fold' elements
// and it should be atleast 100px from the top to ignore items in the toolbar, sometimes menu items like "Coming soon" exist
function elementIsInEyeBallRange(element) {
// outside the 'fold' or some weird text in the heading area
// .getBoundingClientRect() was causing a crash in chrome 119, can only be run on contentVisibility != hidden
// Note: theres also an automated test that places the 'out of stock' text fairly low down
// Skip text that could be in the header area
if (element.getBoundingClientRect().bottom + window.scrollY <= 300 ) {
return false;
}
// Skip text that could be much further down (like a list of "you may like" products that have 'sold out' in there
if (element.getBoundingClientRect().bottom + window.scrollY >= 1300 ) {
return false;
}
return true;
}
// @todo - if it's SVG or IMG, go into image diff mode
@@ -175,7 +161,9 @@ async () => {
for (let i = elementsToScan.length - 1; i >= 0; i--) {
const element = elementsToScan[i];
if (!elementIsInEyeBallRange(element)) {
// outside the 'fold' or some weird text in the heading area
// .getBoundingClientRect() was causing a crash in chrome 119, can only be run on contentVisibility != hidden
if (element.getBoundingClientRect().top + window.scrollY >= vh || element.getBoundingClientRect().top + window.scrollY <= 100) {
continue
}
@@ -189,11 +177,11 @@ async () => {
} catch (e) {
console.warn('stock-not-in-stock.js scraper - handling element for gettext failed', e);
}
if (elementText.length) {
// try which ones could mean its in stock
if (negateOutOfStockRegex.test(elementText) && !elementText.includes('(0 products)')) {
console.log(`Negating/overriding 'Out of Stock' back to "Possibly in stock" found "${elementText}"`)
element.style.border = "2px solid green"; // highlight the element that was detected as in stock
return 'Possibly in stock';
}
}
@@ -202,8 +190,10 @@ async () => {
// OTHER STUFF THAT COULD BE THAT IT'S OUT OF STOCK
for (let i = elementsToScan.length - 1; i >= 0; i--) {
const element = elementsToScan[i];
if (!elementIsInEyeBallRange(element)) {
// outside the 'fold' or some weird text in the heading area
// .getBoundingClientRect() was causing a crash in chrome 119, can only be run on contentVisibility != hidden
// Note: theres also an automated test that places the 'out of stock' text fairly low down
if (element.getBoundingClientRect().top + window.scrollY >= vh + 250 || element.getBoundingClientRect().top + window.scrollY <= 100) {
continue
}
elementText = "";
@@ -218,7 +208,6 @@ async () => {
for (const outOfStockText of outOfStockTexts) {
if (elementText.includes(outOfStockText)) {
console.log(`Selected 'Out of Stock' - found text "${outOfStockText}" - "${elementText}" - offset top ${element.getBoundingClientRect().top}, page height is ${vh}`)
element.style.border = "2px solid red"; // highlight the element that was detected as out of stock
return outOfStockText; // item is out of stock
}
}

View File

@@ -202,6 +202,7 @@ async (options) => {
// Foreach filter, go and find it on the page and add it to the results so we can visualise it again
for (const f of include_filters) {
bbox = false;
q = false;
if (!f.length) {
console.log("xpath_element_scraper: Empty filter, skipping");
@@ -254,7 +255,7 @@ async (options) => {
console.log("xpath_element_scraper: Got filter by ownerElement element, scroll from top was " + scroll_y)
} catch (e) {
console.log(e)
console.log("xpath_element_scraper: error looking up node.ownerElement")
console.log("xpath_element_scraper: error looking up q.ownerElement")
}
}

View File

@@ -1,82 +0,0 @@
import pluggy
import os
import importlib
import sys
# Global plugin namespace for changedetection.io
PLUGIN_NAMESPACE = "changedetectionio"
hookspec = pluggy.HookspecMarker(PLUGIN_NAMESPACE)
hookimpl = pluggy.HookimplMarker(PLUGIN_NAMESPACE)
class ChangeDetectionSpec:
"""Hook specifications for extending changedetection.io functionality."""
@hookspec
def ui_edit_stats_extras(watch):
"""Return HTML content to add to the stats tab in the edit view.
Args:
watch: The watch object being edited
Returns:
str: HTML content to be inserted in the stats tab
"""
pass
# Set up Plugin Manager
plugin_manager = pluggy.PluginManager(PLUGIN_NAMESPACE)
# Register hookspecs
plugin_manager.add_hookspecs(ChangeDetectionSpec)
# Load plugins from subdirectories
def load_plugins_from_directories():
# Dictionary of directories to scan for plugins
plugin_dirs = {
'conditions': os.path.join(os.path.dirname(__file__), 'conditions', 'plugins'),
# Add more plugin directories here as needed
}
# Note: Removed the direct import of example_word_count_plugin as it's now in the conditions/plugins directory
for dir_name, dir_path in plugin_dirs.items():
if not os.path.exists(dir_path):
continue
# Get all Python files (excluding __init__.py)
for filename in os.listdir(dir_path):
if filename.endswith(".py") and filename != "__init__.py":
module_name = filename[:-3] # Remove .py extension
module_path = f"changedetectionio.{dir_name}.plugins.{module_name}"
try:
module = importlib.import_module(module_path)
# Register the plugin with pluggy
plugin_manager.register(module, module_name)
except (ImportError, AttributeError) as e:
print(f"Error loading plugin {module_name}: {e}")
# Load plugins
load_plugins_from_directories()
# Discover installed plugins from external packages (if any)
plugin_manager.load_setuptools_entrypoints(PLUGIN_NAMESPACE)
# Helper function to collect UI stats extras from all plugins
def collect_ui_edit_stats_extras(watch):
"""Collect and combine HTML content from all plugins that implement ui_edit_stats_extras"""
extras_content = []
# Get all plugins that implement the ui_edit_stats_extras hook
results = plugin_manager.hook.ui_edit_stats_extras(watch=watch)
# If we have results, add them to our content
if results:
for result in results:
if result: # Skip empty results
extras_content.append(result)
return "\n".join(extras_content) if extras_content else ""

View File

@@ -211,14 +211,7 @@ $(document).ready(function () {
$('input[type=text]', first_available).first().val(x['xpath']);
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
found_something = true;
}
else if (x['tagName'] === 'select') {
$('select', first_available).val('<select> by option text').change();
$('input[type=text]', first_available).first().val(x['xpath']);
$('input[placeholder="Value"]', first_available).addClass('ok').click().focus();
found_something = true;
}
else {
} else {
// There's no good way (that I know) to find if this
// see https://stackoverflow.com/questions/446892/how-to-find-event-listeners-on-a-dom-node-in-javascript-or-in-debugging
// https://codepen.io/azaslavsky/pen/DEJVWv

View File

@@ -450,13 +450,6 @@ Math: {{ 1 + 1 }}") }}
</tr>
</tbody>
</table>
{% if ui_edit_stats_extras %}
<div class="plugin-stats-extras"> <!-- from pluggy plugin -->
{{ ui_edit_stats_extras|safe }}
</div>
{% endif %}
{% if watch.history_n %}
<p>
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">Download latest HTML snapshot</a>

View File

@@ -14,8 +14,6 @@ from changedetectionio.notification import (
def set_original_response():
test_return_data = """<html>
<body>
<section id=header style="padding: 50px; height: 350px">This is the header which should be ignored always - <span>add to cart</span></section>
<!-- stock-not-in-stock.js will ignore text in the first 300px, see elementIsInEyeBallRange(), sometimes "add to cart" and other junk is here -->
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
@@ -54,6 +52,8 @@ def test_restock_detection(client, live_server, measure_memory_usage):
set_original_response()
#assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
time.sleep(1)
live_server_setup(live_server)
#####################
notification_url = url_for('test_notification_endpoint', _external=True).replace('http://localhost', 'http://changedet').replace('http', 'json')
@@ -84,8 +84,7 @@ def test_restock_detection(client, live_server, measure_memory_usage):
# Is it correctly show as NOT in stock?
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'processor-restock_diff' in res.data # Should have saved in restock mode
assert b'not-in-stock' in res.data # should be out of stock
assert b'not-in-stock' in res.data
# Is it correctly shown as in stock
set_back_in_stock_response()

View File

@@ -45,15 +45,11 @@ def set_number_out_of_range_response(number="150"):
f.write(test_return_data)
def test_setup(client, live_server):
"""Test that both text and number conditions work together with AND logic."""
live_server_setup(live_server)
def test_conditions_with_text_and_number(client, live_server):
"""Test that both text and number conditions work together with AND logic."""
set_original_response("50")
#live_server_setup(live_server)
live_server_setup(live_server)
test_url = url_for('test_endpoint', _external=True)
@@ -199,40 +195,3 @@ def test_condition_validate_rule_row(client, live_server):
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
</body>
</html>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
)
# Assert the word count is counted correctly
assert b'<td>13</td>' in res.data

View File

@@ -32,14 +32,13 @@ def test_strip_text_func():
stripped_content = html_tools.strip_ignore_text(test_content, ignore)
assert stripped_content == "Some initial text\n\nWhich is across multiple lines\n\n\n\nSo let's see what happens."
def set_original_ignore_response(ver_stamp="123"):
test_return_data = f"""<html>
def set_original_ignore_response():
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
</body>
</html>
@@ -49,14 +48,13 @@ def set_original_ignore_response(ver_stamp="123"):
f.write(test_return_data)
def set_modified_original_ignore_response(ver_stamp="123"):
test_return_data = f"""<html>
def set_modified_original_ignore_response():
test_return_data = """<html>
<body>
Some NEW nice initial text<br>
<p>Which is across multiple lines</p>
<br>
So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
<p>new ignore stuff</p>
<p>blah</p>
</body>
@@ -69,15 +67,14 @@ def set_modified_original_ignore_response(ver_stamp="123"):
# Is the same but includes ZZZZZ, 'ZZZZZ' is the last line in ignore_text
def set_modified_ignore_response(ver_stamp="123"):
test_return_data = f"""<html>
def set_modified_ignore_response():
test_return_data = """<html>
<body>
Some initial text<br>
<p>Which is across multiple lines</p>
<P>ZZZZz</P>
<br>
So let's see what happens. <br>
<link href="https://www.somesite/wp-content/themes/cooltheme/style2.css?v={ver_stamp}" rel="stylesheet"/>
</body>
</html>
@@ -168,9 +165,9 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
assert b'Deleted' in res.data
# When adding some ignore text, it should not trigger a change, even if something else on that line changes
def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ\r\n"+extra_ignore
def test_check_global_ignore_text_functionality(client, live_server, measure_memory_usage):
#live_server_setup(live_server)
ignore_text = "XXXXX\r\nYYYYY\r\nZZZZZ"
set_original_ignore_response()
# Goto the settings page, add our ignore text
@@ -189,10 +186,6 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
if as_source:
# Switch to source mode so we can test that too!
test_url = "source:"+test_url
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
@@ -210,15 +203,12 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
follow_redirects=True
)
assert b"Updated watch." in res.data
wait_for_all_checks(client)
# Check it saved
res = client.get(
url_for("settings.settings_page"),
)
for i in ignore_text.splitlines():
assert bytes(i.encode('utf-8')) in res.data
assert bytes(ignore_text.encode('utf-8')) in res.data
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
@@ -231,8 +221,7 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
# Make a change which includes the ignore text, it should be ignored and no 'change' triggered
# It adds text with "ZZZZzzzz" and "ZZZZ" is in the ignore list
# And tweaks the ver_stamp which should be picked up by global regex ignore
set_modified_ignore_response(ver_stamp=time.time())
set_modified_ignore_response()
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
@@ -254,11 +243,3 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
def test_check_global_ignore_text_functionality(client, live_server):
#live_server_setup(live_server)
_run_test_global_ignore(client, as_source=False)
def test_check_global_ignore_text_functionality_as_source(client, live_server):
#live_server_setup(live_server)
_run_test_global_ignore(client, as_source=True, extra_ignore='/\?v=\d/')

View File

@@ -72,7 +72,7 @@ services:
# Comment out ports: when using behind a reverse proxy , enable networks: etc.
ports:
- 127.0.0.1:5000:5000
- 5000:5000
restart: unless-stopped
# Used for fetching pages via WebDriver+Chrome where you need Javascript support.
@@ -82,7 +82,7 @@ services:
# If WEBDRIVER or PLAYWRIGHT are enabled, changedetection container depends on that
# and must wait before starting (substitute "browser-chrome" with "playwright-chrome" if last one is used)
# depends_on:
# browser-sockpuppet-chrome:
# sockpuppetbrowser:
# condition: service_started