Compare commits

..

1 Commits

Author SHA1 Message Date
dgtlmoon
cdabdfeef1 UI - Ability to download a complete data package (.zip) of a watch 2026-02-15 10:39:14 +01:00
30 changed files with 211 additions and 313 deletions

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
# Semver means never use .01, or 00. Should be .1.
__version__ = '0.53.1'
__version__ = '0.52.9'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -174,7 +174,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
browser_steps_blueprint = Blueprint('browser_steps', __name__, template_folder="templates")
async def start_browsersteps_session(watch_uuid):
from changedetectionio.browser_steps import browser_steps
from . import browser_steps
import time
from playwright.async_api import async_playwright
@@ -238,6 +238,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@browser_steps_blueprint.route("/browsersteps_start_session", methods=['GET'])
def browsersteps_start_session():
# A new session was requested, return sessionID
import asyncio
import uuid
browsersteps_session_id = str(uuid.uuid4())
watch_uuid = request.args.get('uuid')
@@ -300,10 +301,11 @@ def construct_blueprint(datastore: ChangeDetectionStore):
@browser_steps_blueprint.route("/browsersteps_update", methods=['POST'])
def browsersteps_ui_update():
import base64
import playwright._impl._errors
from changedetectionio.blueprint.browser_steps import browser_steps
remaining = 0
remaining =0
uuid = request.args.get('uuid')
goto_website_url_first_step = request.args.get('goto_website_url_first_step')
browsersteps_session_id = request.args.get('browsersteps_session_id')
@@ -314,33 +316,33 @@ def construct_blueprint(datastore: ChangeDetectionStore):
return make_response('No session exists under that ID', 500)
is_last_step = False
# @todo - should always be an existing session
if goto_website_url_first_step:
logger.debug("Going to site (requested automatically before stepping)..")
step_operation = "Goto site"
step_selector = None
step_optional_value = None
else:
# Actions - step/apply/etc, do the thing and return state
if request.method == 'POST':
# @todo - should always be an existing session
step_operation = request.form.get('operation')
step_selector = request.form.get('selector')
step_optional_value = request.form.get('optional_value')
is_last_step = strtobool(request.form.get('is_last_step'))
try:
# Run the async call_action method in the dedicated browser steps event loop
run_async_in_browser_loop(
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(
action_name=step_operation,
selector=step_selector,
optional_value=step_optional_value
try:
# Run the async call_action method in the dedicated browser steps event loop
run_async_in_browser_loop(
browsersteps_sessions[browsersteps_session_id]['browserstepper'].call_action(
action_name=step_operation,
selector=step_selector,
optional_value=step_optional_value
)
)
)
except Exception as e:
logger.error(f"Exception when calling step operation {step_operation} {str(e)}")
# Try to find something of value to give back to the user
return make_response(str(e).splitlines()[0], 401)
except Exception as e:
logger.error(f"Exception when calling step operation {step_operation} {str(e)}")
# Try to find something of value to give back to the user
return make_response(str(e).splitlines()[0], 401)
# if not this_session.page:
# cleanup_playwright_session()
# return make_response('Browser session ran out of time :( Please reload this page.', 401)
# Screenshots and other info only needed on requesting a step (POST)
try:
@@ -348,7 +350,7 @@ def construct_blueprint(datastore: ChangeDetectionStore):
(screenshot, xpath_data) = run_async_in_browser_loop(
browsersteps_sessions[browsersteps_session_id]['browserstepper'].get_current_state()
)
if is_last_step:
watch = datastore.data['watching'].get(uuid)
u = browsersteps_sessions[browsersteps_session_id]['browserstepper'].page.url

View File

@@ -8,17 +8,6 @@ from changedetectionio.content_fetchers import SCREENSHOT_MAX_HEIGHT_DEFAULT
from changedetectionio.content_fetchers.base import manage_user_agent
from changedetectionio.jinja2_custom import render as jinja_render
def browser_steps_get_valid_steps(browser_steps: list):
if browser_steps is not None and len(browser_steps):
valid_steps = list(filter(
lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one'),browser_steps))
# Just incase they selected Goto site by accident with older JS
if valid_steps and valid_steps[0]['operation'] == 'Goto site':
del(valid_steps[0])
return valid_steps
return []
# Two flags, tell the JS which of the "Selector" or "Value" field should be enabled in the front end

View File

@@ -26,7 +26,7 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
# https://wtforms.readthedocs.io/en/3.0.x/forms/#wtforms.form.Form.populate_obj ?
def edit_page(uuid):
from changedetectionio import forms
from changedetectionio.browser_steps.browser_steps import browser_step_ui_config
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
from changedetectionio import processors
import importlib
@@ -364,6 +364,9 @@ def construct_blueprint(datastore: ChangeDetectionStore, update_q, queuedWatchMe
from pathlib import Path
import datetime
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
watch = datastore.data['watching'].get(uuid)
if not watch:
abort(404)

View File

@@ -488,7 +488,7 @@ Math: {{ 1 + 1 }}") }}
{% if watch.history_n %}
<p>
<a href="{{url_for('ui.ui_edit.watch_get_latest_html', uuid=uuid)}}" class="pure-button button-small">{{ _('Download latest HTML snapshot') }}</a>
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Download watch data package') }}</a>
<a href="{{url_for('ui.ui_edit.watch_get_data_package', uuid=uuid)}}" class="pure-button button-small">{{ _('Export watch data') }}</a>
</p>
{% endif %}

View File

@@ -38,6 +38,7 @@ def manage_user_agent(headers, current_ua=''):
return None
class Fetcher():
browser_connection_is_custom = None
browser_connection_url = None
@@ -162,16 +163,30 @@ class Fetcher():
"""
return {k.lower(): v for k, v in self.headers.items()}
def browser_steps_get_valid_steps(self):
if self.browser_steps is not None and len(self.browser_steps):
valid_steps = list(filter(
lambda s: (s['operation'] and len(s['operation']) and s['operation'] != 'Choose one'),
self.browser_steps))
# Just incase they selected Goto site by accident with older JS
if valid_steps and valid_steps[0]['operation'] == 'Goto site':
del(valid_steps[0])
return valid_steps
return None
async def iterate_browser_steps(self, start_url=None):
from changedetectionio.browser_steps.browser_steps import steppable_browser_interface, browser_steps_get_valid_steps
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
from playwright._impl._errors import TimeoutError, Error
from changedetectionio.jinja2_custom import render as jinja_render
step_n = 0
if self.browser_steps:
if self.browser_steps is not None and len(self.browser_steps):
interface = steppable_browser_interface(start_url=start_url)
interface.page = self.page
valid_steps = browser_steps_get_valid_steps(self.browser_steps)
valid_steps = self.browser_steps_get_valid_steps()
for step in valid_steps:
step_n += 1

View File

@@ -295,7 +295,7 @@ class fetcher(Fetcher):
self.page.on("console", lambda msg: logger.debug(f"Playwright console: Watch URL: {url} {msg.type}: {msg.text} {msg.args}"))
# Re-use as much code from browser steps as possible so its the same
from changedetectionio.browser_steps.browser_steps import steppable_browser_interface
from changedetectionio.blueprint.browser_steps.browser_steps import steppable_browser_interface
browsersteps_interface = steppable_browser_interface(start_url=url)
browsersteps_interface.page = self.page
@@ -362,7 +362,7 @@ class fetcher(Fetcher):
# Wrap remaining operations in try/finally to ensure cleanup
try:
# Run Browser Steps here
if self.browser_steps:
if self.browser_steps_get_valid_steps():
try:
await self.iterate_browser_steps(start_url=url)
except BrowserStepsStepException:

View File

@@ -456,7 +456,7 @@ class fetcher(Fetcher):
# Run Browser Steps here
# @todo not yet supported, we switch to playwright in this case
# if self.browser_steps:
# if self.browser_steps_get_valid_steps():
# self.iterate_browser_steps()

View File

@@ -3,7 +3,7 @@ import hashlib
import os
import re
import asyncio
from functools import partial
from changedetectionio import strtobool
from changedetectionio.content_fetchers.exceptions import BrowserStepsInUnsupportedFetcher, EmptyReply, Non200ErrorCodeReceived
from changedetectionio.content_fetchers.base import Fetcher
@@ -36,7 +36,7 @@ class fetcher(Fetcher):
import requests
from requests.exceptions import ProxyError, ConnectionError, RequestException
if self.browser_steps:
if self.browser_steps_get_valid_steps():
raise BrowserStepsInUnsupportedFetcher(url=url)
proxies = {}
@@ -184,6 +184,7 @@ class fetcher(Fetcher):
)
async def quit(self, watch=None):
# In case they switched to `requests` fetcher from something else
# Then the screenshot could be old, in any case, it's not used here.
# REMOVE_REQUESTS_OLD_SCREENSHOTS - Mainly used for testing

View File

@@ -712,14 +712,8 @@ def changedetection_app(config=None, datastore_o=None):
def static_content(group, filename):
from flask import make_response
import re
# Strict sanitization: only allow a-z, 0-9, and underscore (blocks .. and other traversal)
group = re.sub(r'[^a-z0-9_-]+', '', group.lower())
filename = filename
# Additional safety: reject if sanitization resulted in empty strings
if not group or not filename:
abort(404)
group = re.sub(r'[^\w.-]+', '', group.lower())
filename = re.sub(r'[^\w.-]+', '', filename.lower())
if group == 'screenshot':
# Could be sensitive, follow password requirements

View File

@@ -7,6 +7,8 @@ from flask_babel import lazy_gettext as _l, gettext
from changedetectionio.blueprint.rss import RSS_FORMAT_TYPES, RSS_TEMPLATE_TYPE_OPTIONS, RSS_TEMPLATE_HTML_DEFAULT
from changedetectionio.conditions.form import ConditionFormRow
from changedetectionio.notification_service import NotificationContextData
from changedetectionio.processors.image_ssim_diff import SCREENSHOT_COMPARISON_THRESHOLD_OPTIONS, \
SCREENSHOT_COMPARISON_THRESHOLD_OPTIONS_DEFAULT
from changedetectionio.strtobool import strtobool
from changedetectionio import processors
@@ -35,7 +37,7 @@ from changedetectionio.widgets import TernaryNoneBooleanField
# default
# each select <option data-enabled="enabled-0-0"
from changedetectionio.browser_steps.browser_steps import browser_step_ui_config
from changedetectionio.blueprint.browser_steps.browser_steps import browser_step_ui_config
from changedetectionio import html_tools, content_fetchers
@@ -492,6 +494,7 @@ class ValidateJinja2Template(object):
Validates that a {token} is from a valid set
"""
def __call__(self, form, field):
from changedetectionio import notification
from changedetectionio.jinja2_custom import create_jinja_env
from jinja2 import BaseLoader, TemplateSyntaxError, UndefinedError
from jinja2.meta import find_undeclared_variables
@@ -817,7 +820,8 @@ class processor_text_json_diff_form(commonSettingsForm):
filter_text_removed = BooleanField(_l('Removed lines'), default=True)
trigger_text = StringListField(_l('Keyword triggers - Trigger/wait for text'), [validators.Optional(), ValidateListRegex()])
browser_steps = FieldList(FormField(SingleBrowserStep), min_entries=10)
if os.getenv("PLAYWRIGHT_DRIVER_URL"):
browser_steps = FieldList(FormField(SingleBrowserStep), min_entries=10)
text_should_not_be_present = StringListField(_l('Block change-detection while text matches'), [validators.Optional(), ValidateListRegex()])
webdriver_js_execute_code = TextAreaField(_l('Execute JavaScript before change detection'), render_kw={"rows": "5"}, validators=[validators.Optional()])

View File

@@ -6,8 +6,6 @@ from .persistence import EntityPersistenceMixin, _determine_entity_type
__all__ = ['EntityPersistenceMixin', 'watch_base']
from ..browser_steps.browser_steps import browser_steps_get_valid_steps
USE_SYSTEM_DEFAULT_NOTIFICATION_FORMAT_FOR_WATCH = 'System default'
CONDITIONS_MATCH_LOGIC_DEFAULT = 'ALL'
@@ -366,10 +364,6 @@ class watch_base(dict):
self._mark_field_as_edited(key)
def update(self, *args, **kwargs):
if args and args[0].get('browser_steps'):
args[0]['browser_steps'] = browser_steps_get_valid_steps(args[0].get('browser_steps'))
"""Override dict.update() to track modifications to writable fields."""
# Call parent update first
super().update(*args, **kwargs)
@@ -382,7 +376,6 @@ class watch_base(dict):
for key in kwargs.keys():
self._mark_field_as_edited(key)
def pop(self, key, *args):
"""Override dict.pop() to track removal of writable fields."""
result = super().pop(key, *args)

View File

@@ -1,7 +1,5 @@
import re
import hashlib
from changedetectionio.browser_steps.browser_steps import browser_steps_get_valid_steps
from changedetectionio.content_fetchers.base import Fetcher
from changedetectionio.strtobool import strtobool
from copy import deepcopy
@@ -171,7 +169,7 @@ class difference_detection_processor():
)
if self.watch.has_browser_steps:
self.fetcher.browser_steps = browser_steps_get_valid_steps(self.watch.get('browser_steps', []))
self.fetcher.browser_steps = self.watch.get('browser_steps', [])
self.fetcher.browser_steps_screenshot_path = os.path.join(self.datastore.datastore_path, self.watch.get('uuid'))
# Tweak the base config with the per-watch ones

View File

@@ -17,6 +17,8 @@ $(document).ready(function () {
set_scale();
});
// Should always be disabled
$('#browser_steps-0-operation option[value="Goto site"]').prop("selected", "selected");
$('#browser_steps-0-operation').attr('disabled', 'disabled');
$('#browsersteps-click-start').click(function () {
$("#browsersteps-click-start").fadeOut();
@@ -43,6 +45,12 @@ $(document).ready(function () {
browsersteps_session_id = false;
apply_buttons_disabled = false;
ctx.clearRect(0, 0, c.width, c.height);
set_first_gotosite_disabled();
}
function set_first_gotosite_disabled() {
$('#browser_steps >li:first-child select').val('Goto site').attr('disabled', 'disabled');
$('#browser_steps >li:first-child').css('opacity', '0.5');
}
// Show seconds remaining until the browser interface needs to restart the session
@@ -235,54 +243,14 @@ $(document).ready(function () {
ctx.fill();
}
// Reusable AJAX function for browser step operations
function executeBrowserStep(url, data = {}) {
$('#browser-steps-ui .loader .spinner').fadeIn();
apply_buttons_disabled = true;
$('ul#browser_steps li .control .apply').css('opacity', 0.5);
$("#browsersteps-img").css('opacity', 0.65);
return $.ajax({
method: "POST",
url: url,
data: data,
statusCode: {
400: function () {
alert("There was a problem processing the request, please reload the page.");
$("#loading-status-text").hide();
$('#browser-steps-ui .loader .spinner').fadeOut();
},
401: function (data) {
alert(data.responseText);
$("#loading-status-text").hide();
$('#browser-steps-ui .loader .spinner').fadeOut();
}
}
}).done(function (data) {
xpath_data = data.xpath_data;
$('#browsersteps-img').attr('src', data.screenshot);
$('#browser-steps-ui .loader .spinner').fadeOut();
apply_buttons_disabled = false;
$("#browsersteps-img").css('opacity', 1);
$('ul#browser_steps li .control .apply').css('opacity', 1);
$("#loading-status-text").hide();
}).fail(function (data) {
console.log(data);
if (data.responseText && data.responseText.includes("Browser session expired")) {
disable_browsersteps_ui();
}
apply_buttons_disabled = false;
$("#loading-status-text").hide();
$('ul#browser_steps li .control .apply').css('opacity', 1);
$("#browsersteps-img").css('opacity', 1);
});
}
function start() {
console.log("Starting browser-steps UI");
browsersteps_session_id = false;
// @todo This setting of the first one should be done at the datalayer but wtforms doesnt wanna play nice
$('#browser_steps >li:first-child').removeClass('empty');
set_first_gotosite_disabled();
$('#browser-steps-ui .loader .spinner').show();
// Request a new session
$('.clear,.remove', $('#browser_steps >li:first-child')).hide();
$.ajax({
type: "GET",
url: browser_steps_start_url,
@@ -299,12 +267,11 @@ $(document).ready(function () {
}).done(function (data) {
$("#loading-status-text").fadeIn();
browsersteps_session_id = data.browsersteps_session_id;
// This should trigger 'Goto site'
console.log("Got startup response, requesting Goto-Site (first) step fake click");
$('#browser_steps >li:first-child .apply').click();
browser_interface_seconds_remaining = 500;
// Request goto_site operation
executeBrowserStep(
browser_steps_sync_url + "&browsersteps_session_id=" + browsersteps_session_id + "&goto_website_url_first_step=true"
);
set_first_gotosite_disabled();
}).fail(function (data) {
console.log(data);
alert('There was an error communicating with the server.');
@@ -313,6 +280,7 @@ $(document).ready(function () {
}
function disable_browsersteps_ui() {
set_first_gotosite_disabled();
$("#browser-steps-ui").css('opacity', '0.3');
$('#browsersteps-selector-canvas').off("mousemove mousedown click");
}
@@ -360,13 +328,16 @@ $(document).ready(function () {
// Add the extra buttons to the steps
$('ul#browser_steps li').each(function (i) {
var s = '<div class="control">' + '<a data-step-index=' + i + ' class="pure-button button-secondary button-green button-xsmall apply" >Apply</a>&nbsp;';
s += `<a data-step-index="${i}" class="pure-button button-secondary button-xsmall clear" >Clear</a>&nbsp;` +
`<a data-step-index="${i}" class="pure-button button-secondary button-red button-xsmall remove" >Remove</a>`;
if (i > 0) {
// The first step never gets these (Goto-site)
s += `<a data-step-index="${i}" class="pure-button button-secondary button-xsmall clear" >Clear</a>&nbsp;` +
`<a data-step-index="${i}" class="pure-button button-secondary button-red button-xsmall remove" >Remove</a>`;
// if a screenshot is available
if (browser_steps_available_screenshots.includes(i.toString())) {
var d = (browser_steps_last_error_step === i+1) ? 'before' : 'after';
s += `&nbsp;<a data-step-index="${i}" class="pure-button button-secondary button-xsmall show-screenshot" title="Show screenshot from last run" data-type="${d}">Pic</a>&nbsp;`;
// if a screenshot is available
if (browser_steps_available_screenshots.includes(i.toString())) {
var d = (browser_steps_last_error_step === i+1) ? 'before' : 'after';
s += `&nbsp;<a data-step-index="${i}" class="pure-button button-secondary button-xsmall show-screenshot" title="Show screenshot from last run" data-type="${d}">Pic</a>&nbsp;`;
}
}
s += '</div>';
$(this).append(s)
@@ -405,35 +376,80 @@ $(document).ready(function () {
});
$('ul#browser_steps li .control .apply').click(function (event) {
// sequential requests @todo refactor
if (apply_buttons_disabled) {
return;
}
var current_data = $(event.currentTarget).closest('li');
$('#browser-steps-ui .loader .spinner').fadeIn();
apply_buttons_disabled = true;
$('ul#browser_steps li .control .apply').css('opacity', 0.5);
$("#browsersteps-img").css('opacity', 0.65);
var is_last_step = 0;
var step_n = $(event.currentTarget).data('step-index');
// Determine if this is the last configured step
var is_last_step = 0;
// On the last step, we should also be getting data ready for the visual selector
$('ul#browser_steps li select').each(function (i) {
if ($(this).val() !== 'Choose one') {
is_last_step += 1;
}
});
is_last_step = (is_last_step == (step_n + 1));
if (is_last_step == (step_n + 1)) {
is_last_step = true;
} else {
is_last_step = false;
}
console.log("Requesting step via POST " + $("select[id$='operation']", current_data).first().val());
// Execute the browser step
executeBrowserStep(
browser_steps_sync_url + "&browsersteps_session_id=" + browsersteps_session_id,
{
// POST the currently clicked step form widget back and await response, redraw
$.ajax({
method: "POST",
url: browser_steps_sync_url + "&browsersteps_session_id=" + browsersteps_session_id,
data: {
'operation': $("select[id$='operation']", current_data).first().val(),
'selector': $("input[id$='selector']", current_data).first().val(),
'optional_value': $("input[id$='optional_value']", current_data).first().val(),
'step_n': step_n,
'is_last_step': is_last_step
},
statusCode: {
400: function () {
// More than likely the CSRF token was lost when the server restarted
alert("There was a problem processing the request, please reload the page.");
$("#loading-status-text").hide();
$('#browser-steps-ui .loader .spinner').fadeOut();
},
401: function (data) {
// More than likely the CSRF token was lost when the server restarted
alert(data.responseText);
$("#loading-status-text").hide();
$('#browser-steps-ui .loader .spinner').fadeOut();
}
}
);
}).done(function (data) {
// it should return the new state (selectors available and screenshot)
xpath_data = data.xpath_data;
$('#browsersteps-img').attr('src', data.screenshot);
$('#browser-steps-ui .loader .spinner').fadeOut();
apply_buttons_disabled = false;
$("#browsersteps-img").css('opacity', 1);
$('ul#browser_steps li .control .apply').css('opacity', 1);
$("#loading-status-text").hide();
set_first_gotosite_disabled();
}).fail(function (data) {
console.log(data);
if (data.responseText.includes("Browser session expired")) {
disable_browsersteps_ui();
}
apply_buttons_disabled = false;
$("#loading-status-text").hide();
$('ul#browser_steps li .control .apply').css('opacity', 1);
$("#browsersteps-img").css('opacity', 1);
});
});
$('ul#browser_steps li .control .show-screenshot').click(function (element) {

View File

@@ -235,8 +235,6 @@ class ChangeDetectionStore(DatastoreUpdatesMixin, FileSavingDataStore):
# No datastore yet - check if this is a fresh install or legacy migration
self.init_fresh_install(include_default_watches=include_default_watches,
version_tag=version_tag)
# Maybe they copied a bunch of watch subdirs across too
self._load_state()
def init_fresh_install(self, include_default_watches, version_tag):
# Generate app_guid FIRST (required for all operations)

View File

@@ -331,7 +331,6 @@ def prepare_test_function(live_server, datastore_path):
# Cleanup: Clear watches and queue after test
try:
from changedetectionio.flask_app import update_q
from pathlib import Path
# Clear the queue to prevent leakage to next test
while not update_q.empty():
@@ -341,18 +340,6 @@ def prepare_test_function(live_server, datastore_path):
break
datastore.data['watching'] = {}
# Delete any old watch metadata JSON files
base_path = Path(datastore.datastore_path).resolve()
max_depth = 2
for file in base_path.rglob("*.json"):
# Calculate depth relative to base path
depth = len(file.relative_to(base_path).parts) - 1
if depth <= max_depth and file.is_file():
file.unlink()
except Exception as e:
logger.warning(f"Error during datastore cleanup: {e}")

View File

@@ -9,7 +9,7 @@ by testing various scenarios that should trigger validation errors.
import time
import json
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .util import live_server_setup, wait_for_all_checks
def test_openapi_validation_invalid_content_type_on_create_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -27,7 +27,6 @@ def test_openapi_validation_invalid_content_type_on_create_watch(client, live_se
# Should get 400 error due to OpenAPI validation failure
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_missing_required_field_create_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -45,7 +44,6 @@ def test_openapi_validation_missing_required_field_create_watch(client, live_ser
# Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_invalid_field_in_request_body(client, live_server, measure_memory_usage, datastore_path):
@@ -85,7 +83,6 @@ def test_openapi_validation_invalid_field_in_request_body(client, live_server, m
# Backend validation now returns "Unknown field(s):" message
assert b"Unknown field" in res.data, \
"Should contain validation error about unknown fields"
delete_all_watches(client)
def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage, datastore_path):
@@ -103,7 +100,6 @@ def test_openapi_validation_import_wrong_content_type(client, live_server, measu
# Should get 400 error due to content-type mismatch
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_import_correct_content_type_succeeds(client, live_server, measure_memory_usage, datastore_path):
@@ -121,7 +117,6 @@ def test_openapi_validation_import_correct_content_type_succeeds(client, live_se
# Should succeed
assert res.status_code == 200, f"Expected 200 but got {res.status_code}"
assert len(res.json) == 2, "Should import 2 URLs"
delete_all_watches(client)
def test_openapi_validation_get_requests_bypass_validation(client, live_server, measure_memory_usage, datastore_path):
@@ -146,7 +141,6 @@ def test_openapi_validation_get_requests_bypass_validation(client, live_server,
# Should return JSON with watch list (empty in this case)
assert isinstance(res.json, dict), "Should return JSON dictionary for watch list"
delete_all_watches(client)
def test_openapi_validation_create_tag_missing_required_title(client, live_server, measure_memory_usage, datastore_path):
@@ -164,13 +158,10 @@ def test_openapi_validation_create_tag_missing_required_title(client, live_serve
# Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_watch_update_allows_partial_updates(client, live_server, measure_memory_usage, datastore_path):
"""Test that watch updates allow partial updates without requiring all fields (positive test)."""
#xxx
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# First create a valid watch
@@ -207,5 +198,4 @@ def test_openapi_validation_watch_update_allows_partial_updates(client, live_ser
)
assert res.status_code == 200
assert res.json.get('title') == 'Updated Title Only', "Title should be updated"
assert res.json.get('url') == 'https://example.com', "URL should remain unchanged"
delete_all_watches(client)
assert res.json.get('url') == 'https://example.com', "URL should remain unchanged"

View File

@@ -6,6 +6,8 @@ from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
sleep_time_for_fetch_thread = 3
# Basic test to check inscriptus is not adding return line chars, basically works etc
def test_inscriptus():

View File

@@ -81,16 +81,36 @@ def test_backup(client, live_server, measure_memory_usage, datastore_path):
def test_watch_data_package_download(client, live_server, measure_memory_usage, datastore_path):
"""Test downloading a single watch's data as a zip package"""
import os
import json
set_original_response(datastore_path=datastore_path)
uuid = client.application.config.get('DATASTORE').add_watch(url=url_for('test_endpoint', _external=True))
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Add a watch
res = client.post(
url_for("imports.import_page"),
data={"urls": url_for('test_endpoint', _external=True)},
follow_redirects=True
)
assert b"1 Imported" in res.data
wait_for_all_checks(client)
# Get the UUID directly from the datastore
# Find the watch directories
uuid = None
for item in os.listdir(datastore_path):
item_path = os.path.join(datastore_path, item)
if os.path.isdir(item_path) and len(item) == 36: # UUID format
uuid = item
break
assert uuid is not None, "Could not find watch UUID in datastore"
# Download the watch data package
res = client.get(url_for("ui.ui_edit.watch_get_data_package", uuid=uuid))
res = client.get(
url_for("ui.ui_edit.watch_get_data_package", uuid=uuid),
follow_redirects=True
)
# Should get the right zip content type
assert res.content_type == "application/zip"

View File

@@ -6,6 +6,10 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
import os
sleep_time_for_fetch_thread = 3
def test_check_extract_text_from_diff(client, live_server, measure_memory_usage, datastore_path):
import time
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:

View File

@@ -41,6 +41,7 @@ def set_modified_ignore_response(datastore_path):
def test_render_anchor_tag_content_true(client, live_server, measure_memory_usage, datastore_path):
"""Testing that the link changes are detected when
render_anchor_tag_content setting is set to true"""
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)

View File

@@ -100,6 +100,7 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Tests the whole stack works with staus codes ignored
def test_403_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path):
sleep_time_for_fetch_thread = 3
set_original_response(datastore_path=datastore_path)
@@ -111,7 +112,8 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server, measu
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, check our ignore option
# Add our URL to the import page

View File

@@ -2,9 +2,10 @@
import time
from flask import url_for
from . util import live_server_setup
import os
from .util import live_server_setup, delete_all_watches, wait_for_all_checks
# Should be the same as set_original_ignore_response(datastore_path=datastore_path) but with a little more whitespacing
@@ -49,7 +50,10 @@ def set_original_ignore_response(datastore_path):
# If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_check_ignore_whitespace(client, live_server, measure_memory_usage, datastore_path):
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
set_original_ignore_response(datastore_path=datastore_path)
@@ -70,17 +74,17 @@ def test_check_ignore_whitespace(client, live_server, measure_memory_usage, data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
set_original_ignore_response_but_with_whitespace(datastore_path)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index"))

View File

@@ -24,29 +24,6 @@ def set_original_response(datastore_path):
f.write(test_return_data)
return None
def test_favicon(client, live_server, measure_memory_usage, datastore_path):
# Attempt to fetch it, make sure that works
SVG_BASE64 = 'PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAxIDEiLz4='
uuid = client.application.config.get('DATASTORE').add_watch(url='https://localhost')
live_server.app.config['DATASTORE'].data['watching'][uuid].bump_favicon(url="favicon-set-type.svg",
favicon_base_64=SVG_BASE64
)
res = client.get(url_for('static_content', group='favicon', filename=uuid))
assert res.status_code == 200
assert len(res.data) > 10
res = client.get(url_for('static_content', group='..', filename='__init__.py'))
assert res.status_code != 200
res = client.get(url_for('static_content', group='.', filename='../__init__.py'))
assert res.status_code != 200
# Traverse by filename protection
res = client.get(url_for('static_content', group='js', filename='../styles/styles.css'))
assert res.status_code != 200
def test_bad_access(client, live_server, measure_memory_usage, datastore_path):
res = client.post(
@@ -501,80 +478,3 @@ def test_logout_with_redirect(client, live_server, measure_memory_usage, datasto
# Cleanup
del client.application.config['DATASTORE'].data['settings']['application']['password']
def test_static_directory_traversal(client, live_server, measure_memory_usage, datastore_path):
"""
Test that the static file serving route properly blocks directory traversal attempts.
This tests the fix for GHSA-9jj8-v89v-xjvw (CVE pending).
The vulnerability was in /static/<group>/<filename> where the sanitization regex
allowed dots, enabling "../" traversal to read application source files.
The fix changed the regex from r'[^\w.-]+' to r'[^a-z0-9_]+' which blocks dots.
"""
# Test 1: Direct .. traversal attempt (URL-encoded)
res = client.get(
"/static/%2e%2e/flask_app.py",
follow_redirects=False
)
# Should be blocked (404 or 403)
assert res.status_code in [404, 403], f"Expected 404/403, got {res.status_code}"
# Should NOT contain application source code
assert b"def static_content" not in res.data
assert b"changedetection_app" not in res.data
# Test 2: Direct .. traversal attempt (unencoded)
res = client.get(
"/static/../flask_app.py",
follow_redirects=False
)
assert res.status_code in [404, 403], f"Expected 404/403, got {res.status_code}"
assert b"def static_content" not in res.data
# Test 3: Multiple dots traversal
res = client.get(
"/static/..../flask_app.py",
follow_redirects=False
)
assert res.status_code in [404, 403], f"Expected 404/403, got {res.status_code}"
assert b"def static_content" not in res.data
# Test 4: Try to access other application files
for filename in ["__init__.py", "datastore.py", "store.py"]:
res = client.get(
f"/static/%2e%2e/{filename}",
follow_redirects=False
)
assert res.status_code in [404, 403], f"File {filename} should be blocked"
# Should not contain Python code indicators
assert b"import" not in res.data or b"# Test" in res.data # Allow "1 Imported" etc
# Test 5: Verify legitimate static files still work
# Note: We can't test actual files without knowing what exists,
# but we can verify the sanitization doesn't break valid groups
res = client.get(
"/static/images/test.png", # Will 404 if file doesn't exist, but won't traverse
follow_redirects=False
)
# Should get 404 (file not found) not 403 (blocked)
# This confirms the group name "images" is valid
assert res.status_code == 404
# Test 6: Ensure hyphens and dots are blocked in group names
res = client.get(
"/static/../../../etc/passwd",
follow_redirects=False
)
assert res.status_code in [404, 403]
assert b"root:" not in res.data
# Test 7: Test that underscores still work (they're allowed)
res = client.get(
"/static/visual_selector_data/test.json",
follow_redirects=False
)
# visual_selector_data is a real group, but requires auth
# Should get 403 (not authenticated) or 404 (file not found), not a path traversal
assert res.status_code in [403, 404]

View File

@@ -6,6 +6,9 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, delete_all_watches
import re
sleep_time_for_fetch_thread = 3
def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path)

View File

@@ -6,6 +6,7 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from ..diff import ADDED_STYLE
sleep_time_for_fetch_thread = 3
def test_check_basic_change_detection_functionality_source(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path)
@@ -71,10 +72,7 @@ def test_check_ignore_elements(client, live_server, measure_memory_usage, datast
follow_redirects=True
)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"),

View File

@@ -2,8 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, delete_all_watches, wait_for_all_checks
from . util import live_server_setup, delete_all_watches
import os
@@ -26,6 +25,9 @@ def set_original_ignore_response(datastore_path):
def test_trigger_regex_functionality_with_filter(client, live_server, measure_memory_usage, datastore_path):
# live_server_setup(live_server) # Setup on conftest per function
sleep_time_for_fetch_thread = 3
set_original_ignore_response(datastore_path=datastore_path)
# Give the endpoint time to spin up
@@ -36,7 +38,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# it needs time to save the original version
time.sleep(sleep_time_for_fetch_thread)
### test regex with filter
res = client.post(
@@ -49,9 +52,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
follow_redirects=True
)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
client.get(url_for("ui.ui_diff.diff_history_page", uuid="first"))
@@ -60,8 +62,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
f.write("<html>some new noise with cool stuff2 ok</html>")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# It should report nothing found (nothing should match the regex and filter)
res = client.get(url_for("watchlist.index"))
@@ -72,8 +73,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data

View File

@@ -160,7 +160,6 @@ def extract_UUID_from_client(client):
return uuid.strip()
def delete_all_watches(client=None):
wait_for_all_checks(client)
uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids:
@@ -181,23 +180,6 @@ def delete_all_watches(client=None):
time.sleep(0.2)
# Delete any old watch metadata
from pathlib import Path
base_path = Path(
client.application.config.get('DATASTORE').datastore_path
).resolve()
max_depth = 2
for file in base_path.rglob("*.json"):
# Calculate depth relative to base path
depth = len(file.relative_to(base_path).parts) - 1
if depth <= max_depth and file.is_file():
file.unlink()
def wait_for_all_checks(client=None):
"""
Waits until the queue is empty and workers are idle.

View File

@@ -88,6 +88,7 @@ def test_visual_selector_content_ready(client, live_server, measure_memory_usage
def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_path):
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
test_url = url_for('test_interactive_html_endpoint', _external=True)
test_url = test_url.replace('localhost.localdomain', 'cdio')
@@ -107,13 +108,13 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
"url": test_url,
"tags": "",
'fetch_backend': "html_webdriver",
'browser_steps-5-operation': 'Enter text in field',
'browser_steps-5-selector': '#test-input-text',
'browser_steps-0-operation': 'Enter text in field',
'browser_steps-0-selector': '#test-input-text',
# Should get set to the actual text (jinja2 rendered)
'browser_steps-5-optional_value': "Hello-Jinja2-{% now 'Europe/Berlin', '%Y-%m-%d' %}",
'browser_steps-8-operation': 'Click element',
'browser_steps-8-selector': 'button[name=test-button]',
'browser_steps-8-optional_value': '',
'browser_steps-0-optional_value': "Hello-Jinja2-{% now 'Europe/Berlin', '%Y-%m-%d' %}",
'browser_steps-1-operation': 'Click element',
'browser_steps-1-selector': 'button[name=test-button]',
'browser_steps-1-optional_value': '',
# For now, cookies doesnt work in headers because it must be a full cookiejar object
'headers': "testheader: yes\buser-agent: MyCustomAgent",
"time_between_check_use_default": "y",
@@ -121,18 +122,9 @@ def test_basic_browserstep(client, live_server, measure_memory_usage, datastore_
follow_redirects=True
)
assert b"unpaused" in res.data
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
# 3874 - should have tidied up any blanks
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
assert watch['browser_steps'][0].get('operation') == 'Enter text in field'
assert watch['browser_steps'][1].get('selector') == 'button[name=test-button]'
# This part actually needs the browser, before this we are just testing data
assert os.getenv('PLAYWRIGHT_DRIVER_URL'), "Needs PLAYWRIGHT_DRIVER_URL set for this test"
assert live_server.app.config['DATASTORE'].data['watching'][uuid].history_n >= 1, "Watch history had atleast 1 (everything fetched OK)"
assert b"This text should be removed" not in res.data