Compare commits

...

7 Commits

Author SHA1 Message Date
dgtlmoon
c442a798a3 More clenaup 2026-02-16 18:17:38 +01:00
dgtlmoon
06fd43ee24 safer cleanup 2026-02-16 17:59:40 +01:00
dgtlmoon
111d424d23 Hmm 2 2026-02-16 17:58:23 +01:00
dgtlmoon
fbe245c1d7 hmm 2026-02-16 17:47:15 +01:00
dgtlmoon
1eeed8dd5b add cleanup 2026-02-16 17:36:53 +01:00
dgtlmoon
363dcf6ff0 Remove static sleeps 2026-02-16 17:21:52 +01:00
dgtlmoon
7a3c9cb391 Security - Adding small test and fixing overzealous filename cleaner 2026-02-16 16:56:16 +01:00
13 changed files with 88 additions and 37 deletions

View File

@@ -712,9 +712,10 @@ def changedetection_app(config=None, datastore_o=None):
def static_content(group, filename): def static_content(group, filename):
from flask import make_response from flask import make_response
import re import re
# Strict sanitization: only allow a-z, 0-9, and underscore (blocks .. and other traversal) # Strict sanitization: only allow a-z, 0-9, and underscore (blocks .. and other traversal)
group = re.sub(r'[^a-z0-9_]+', '', group.lower()) group = re.sub(r'[^a-z0-9_-]+', '', group.lower())
filename = re.sub(r'[^a-z0-9_]+', '', filename.lower()) filename = filename
# Additional safety: reject if sanitization resulted in empty strings # Additional safety: reject if sanitization resulted in empty strings
if not group or not filename: if not group or not filename:

View File

@@ -331,6 +331,7 @@ def prepare_test_function(live_server, datastore_path):
# Cleanup: Clear watches and queue after test # Cleanup: Clear watches and queue after test
try: try:
from changedetectionio.flask_app import update_q from changedetectionio.flask_app import update_q
from pathlib import Path
# Clear the queue to prevent leakage to next test # Clear the queue to prevent leakage to next test
while not update_q.empty(): while not update_q.empty():
@@ -340,6 +341,18 @@ def prepare_test_function(live_server, datastore_path):
break break
datastore.data['watching'] = {} datastore.data['watching'] = {}
# Delete any old watch metadata JSON files
base_path = Path(datastore.datastore_path).resolve()
max_depth = 2
for file in base_path.rglob("*.json"):
# Calculate depth relative to base path
depth = len(file.relative_to(base_path).parts) - 1
if depth <= max_depth and file.is_file():
file.unlink()
except Exception as e: except Exception as e:
logger.warning(f"Error during datastore cleanup: {e}") logger.warning(f"Error during datastore cleanup: {e}")

View File

@@ -9,7 +9,7 @@ by testing various scenarios that should trigger validation errors.
import time import time
import json import json
from flask import url_for from flask import url_for
from .util import live_server_setup, wait_for_all_checks from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def test_openapi_validation_invalid_content_type_on_create_watch(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_invalid_content_type_on_create_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -27,6 +27,7 @@ def test_openapi_validation_invalid_content_type_on_create_watch(client, live_se
# Should get 400 error due to OpenAPI validation failure # Should get 400 error due to OpenAPI validation failure
assert res.status_code == 400, f"Expected 400 but got {res.status_code}" assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message" assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_missing_required_field_create_watch(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_missing_required_field_create_watch(client, live_server, measure_memory_usage, datastore_path):
@@ -44,6 +45,7 @@ def test_openapi_validation_missing_required_field_create_watch(client, live_ser
# Should get 400 error due to missing required field # Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}" assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message" assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_invalid_field_in_request_body(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_invalid_field_in_request_body(client, live_server, measure_memory_usage, datastore_path):
@@ -83,6 +85,7 @@ def test_openapi_validation_invalid_field_in_request_body(client, live_server, m
# Backend validation now returns "Unknown field(s):" message # Backend validation now returns "Unknown field(s):" message
assert b"Unknown field" in res.data, \ assert b"Unknown field" in res.data, \
"Should contain validation error about unknown fields" "Should contain validation error about unknown fields"
delete_all_watches(client)
def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_import_wrong_content_type(client, live_server, measure_memory_usage, datastore_path):
@@ -100,6 +103,7 @@ def test_openapi_validation_import_wrong_content_type(client, live_server, measu
# Should get 400 error due to content-type mismatch # Should get 400 error due to content-type mismatch
assert res.status_code == 400, f"Expected 400 but got {res.status_code}" assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message" assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_import_correct_content_type_succeeds(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_import_correct_content_type_succeeds(client, live_server, measure_memory_usage, datastore_path):
@@ -117,6 +121,7 @@ def test_openapi_validation_import_correct_content_type_succeeds(client, live_se
# Should succeed # Should succeed
assert res.status_code == 200, f"Expected 200 but got {res.status_code}" assert res.status_code == 200, f"Expected 200 but got {res.status_code}"
assert len(res.json) == 2, "Should import 2 URLs" assert len(res.json) == 2, "Should import 2 URLs"
delete_all_watches(client)
def test_openapi_validation_get_requests_bypass_validation(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_get_requests_bypass_validation(client, live_server, measure_memory_usage, datastore_path):
@@ -141,6 +146,7 @@ def test_openapi_validation_get_requests_bypass_validation(client, live_server,
# Should return JSON with watch list (empty in this case) # Should return JSON with watch list (empty in this case)
assert isinstance(res.json, dict), "Should return JSON dictionary for watch list" assert isinstance(res.json, dict), "Should return JSON dictionary for watch list"
delete_all_watches(client)
def test_openapi_validation_create_tag_missing_required_title(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_create_tag_missing_required_title(client, live_server, measure_memory_usage, datastore_path):
@@ -158,10 +164,13 @@ def test_openapi_validation_create_tag_missing_required_title(client, live_serve
# Should get 400 error due to missing required field # Should get 400 error due to missing required field
assert res.status_code == 400, f"Expected 400 but got {res.status_code}" assert res.status_code == 400, f"Expected 400 but got {res.status_code}"
assert b"Validation failed" in res.data, "Should contain validation error message" assert b"Validation failed" in res.data, "Should contain validation error message"
delete_all_watches(client)
def test_openapi_validation_watch_update_allows_partial_updates(client, live_server, measure_memory_usage, datastore_path): def test_openapi_validation_watch_update_allows_partial_updates(client, live_server, measure_memory_usage, datastore_path):
"""Test that watch updates allow partial updates without requiring all fields (positive test).""" """Test that watch updates allow partial updates without requiring all fields (positive test)."""
#xxx
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token') api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
# First create a valid watch # First create a valid watch
@@ -198,4 +207,5 @@ def test_openapi_validation_watch_update_allows_partial_updates(client, live_ser
) )
assert res.status_code == 200 assert res.status_code == 200
assert res.json.get('title') == 'Updated Title Only', "Title should be updated" assert res.json.get('title') == 'Updated Title Only', "Title should be updated"
assert res.json.get('url') == 'https://example.com', "URL should remain unchanged" assert res.json.get('url') == 'https://example.com', "URL should remain unchanged"
delete_all_watches(client)

View File

@@ -6,8 +6,6 @@ from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \ from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches extract_UUID_from_client, delete_all_watches
sleep_time_for_fetch_thread = 3
# Basic test to check inscriptus is not adding return line chars, basically works etc # Basic test to check inscriptus is not adding return line chars, basically works etc
def test_inscriptus(): def test_inscriptus():

View File

@@ -6,10 +6,6 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
import os import os
sleep_time_for_fetch_thread = 3
def test_check_extract_text_from_diff(client, live_server, measure_memory_usage, datastore_path): def test_check_extract_text_from_diff(client, live_server, measure_memory_usage, datastore_path):
import time import time
with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f: with open(os.path.join(datastore_path, "endpoint-content.txt"), "w") as f:

View File

@@ -41,7 +41,6 @@ def set_modified_ignore_response(datastore_path):
def test_render_anchor_tag_content_true(client, live_server, measure_memory_usage, datastore_path): def test_render_anchor_tag_content_true(client, live_server, measure_memory_usage, datastore_path):
"""Testing that the link changes are detected when """Testing that the link changes are detected when
render_anchor_tag_content setting is set to true""" render_anchor_tag_content setting is set to true"""
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up # Give the endpoint time to spin up
time.sleep(1) time.sleep(1)

View File

@@ -100,7 +100,6 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Tests the whole stack works with staus codes ignored # Tests the whole stack works with staus codes ignored
def test_403_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path): def test_403_page_check_works_with_ignore_status_code(client, live_server, measure_memory_usage, datastore_path):
sleep_time_for_fetch_thread = 3
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)
@@ -112,8 +111,7 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server, measu
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
# Goto the edit page, check our ignore option # Goto the edit page, check our ignore option
# Add our URL to the import page # Add our URL to the import page

View File

@@ -2,10 +2,9 @@
import time import time
from flask import url_for from flask import url_for
from . util import live_server_setup
import os import os
from .util import live_server_setup, delete_all_watches, wait_for_all_checks
# Should be the same as set_original_ignore_response(datastore_path=datastore_path) but with a little more whitespacing # Should be the same as set_original_ignore_response(datastore_path=datastore_path) but with a little more whitespacing
@@ -50,10 +49,7 @@ def set_original_ignore_response(datastore_path):
# If there was only a change in the whitespacing, then we shouldnt have a change detected # If there was only a change in the whitespacing, then we shouldnt have a change detected
def test_check_ignore_whitespace(client, live_server, measure_memory_usage, datastore_path): def test_check_ignore_whitespace(client, live_server, measure_memory_usage, datastore_path):
sleep_time_for_fetch_thread = 3
# Give the endpoint time to spin up
time.sleep(1)
set_original_ignore_response(datastore_path=datastore_path) set_original_ignore_response(datastore_path=datastore_path)
@@ -74,17 +70,17 @@ def test_check_ignore_whitespace(client, live_server, measure_memory_usage, data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# Trigger a check # Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
set_original_ignore_response_but_with_whitespace(datastore_path) set_original_ignore_response_but_with_whitespace(datastore_path)
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# Trigger a check # Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up # Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread) wait_for_all_checks(client)
# It should report nothing found (no new 'has-unread-changes' class) # It should report nothing found (no new 'has-unread-changes' class)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))

View File

@@ -24,6 +24,29 @@ def set_original_response(datastore_path):
f.write(test_return_data) f.write(test_return_data)
return None return None
def test_favicon(client, live_server, measure_memory_usage, datastore_path):
# Attempt to fetch it, make sure that works
SVG_BASE64 = 'PHN2ZyB4bWxucz0iaHR0cDovL3d3dy53My5vcmcvMjAwMC9zdmciIHZpZXdCb3g9IjAgMCAxIDEiLz4='
uuid = client.application.config.get('DATASTORE').add_watch(url='https://localhost')
live_server.app.config['DATASTORE'].data['watching'][uuid].bump_favicon(url="favicon-set-type.svg",
favicon_base_64=SVG_BASE64
)
res = client.get(url_for('static_content', group='favicon', filename=uuid))
assert res.status_code == 200
assert len(res.data) > 10
res = client.get(url_for('static_content', group='..', filename='__init__.py'))
assert res.status_code != 200
res = client.get(url_for('static_content', group='.', filename='../__init__.py'))
assert res.status_code != 200
# Traverse by filename protection
res = client.get(url_for('static_content', group='js', filename='../styles/styles.css'))
assert res.status_code != 200
def test_bad_access(client, live_server, measure_memory_usage, datastore_path): def test_bad_access(client, live_server, measure_memory_usage, datastore_path):
res = client.post( res = client.post(

View File

@@ -6,9 +6,6 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, delete_all_watches from .util import set_original_response, set_modified_response, live_server_setup, delete_all_watches
import re import re
sleep_time_for_fetch_thread = 3
def test_share_watch(client, live_server, measure_memory_usage, datastore_path): def test_share_watch(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)

View File

@@ -6,7 +6,6 @@ from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from ..diff import ADDED_STYLE from ..diff import ADDED_STYLE
sleep_time_for_fetch_thread = 3
def test_check_basic_change_detection_functionality_source(client, live_server, measure_memory_usage, datastore_path): def test_check_basic_change_detection_functionality_source(client, live_server, measure_memory_usage, datastore_path):
set_original_response(datastore_path=datastore_path) set_original_response(datastore_path=datastore_path)
@@ -72,7 +71,10 @@ def test_check_ignore_elements(client, live_server, measure_memory_usage, datast
follow_redirects=True follow_redirects=True
) )
time.sleep(sleep_time_for_fetch_thread) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get( res = client.get(
url_for("ui.ui_preview.preview_page", uuid="first"), url_for("ui.ui_preview.preview_page", uuid="first"),

View File

@@ -2,7 +2,8 @@
import time import time
from flask import url_for from flask import url_for
from . util import live_server_setup, delete_all_watches
from .util import live_server_setup, delete_all_watches, wait_for_all_checks
import os import os
@@ -25,9 +26,6 @@ def set_original_ignore_response(datastore_path):
def test_trigger_regex_functionality_with_filter(client, live_server, measure_memory_usage, datastore_path): def test_trigger_regex_functionality_with_filter(client, live_server, measure_memory_usage, datastore_path):
# live_server_setup(live_server) # Setup on conftest per function
sleep_time_for_fetch_thread = 3
set_original_ignore_response(datastore_path=datastore_path) set_original_ignore_response(datastore_path=datastore_path)
# Give the endpoint time to spin up # Give the endpoint time to spin up
@@ -38,8 +36,7 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url) uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# it needs time to save the original version wait_for_all_checks(client)
time.sleep(sleep_time_for_fetch_thread)
### test regex with filter ### test regex with filter
res = client.post( res = client.post(
@@ -52,8 +49,9 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
follow_redirects=True follow_redirects=True
) )
# Give the thread time to pick it up client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
wait_for_all_checks(client)
client.get(url_for("ui.ui_diff.diff_history_page", uuid="first")) client.get(url_for("ui.ui_diff.diff_history_page", uuid="first"))
@@ -62,7 +60,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
f.write("<html>some new noise with cool stuff2 ok</html>") f.write("<html>some new noise with cool stuff2 ok</html>")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
wait_for_all_checks(client)
# It should report nothing found (nothing should match the regex and filter) # It should report nothing found (nothing should match the regex and filter)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
@@ -73,7 +72,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>") f.write("<html>some new noise with <span id=in-here>cool stuff6</span> ok</html>")
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True) client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index")) res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data assert b'has-unread-changes' in res.data

View File

@@ -160,6 +160,7 @@ def extract_UUID_from_client(client):
return uuid.strip() return uuid.strip()
def delete_all_watches(client=None): def delete_all_watches(client=None):
wait_for_all_checks(client)
uuids = list(client.application.config.get('DATASTORE').data['watching']) uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids: for uuid in uuids:
@@ -180,6 +181,23 @@ def delete_all_watches(client=None):
time.sleep(0.2) time.sleep(0.2)
# Delete any old watch metadata
from pathlib import Path
base_path = Path(
client.application.config.get('DATASTORE').datastore_path
).resolve()
max_depth = 2
for file in base_path.rglob("*.json"):
# Calculate depth relative to base path
depth = len(file.relative_to(base_path).parts) - 1
if depth <= max_depth and file.is_file():
file.unlink()
def wait_for_all_checks(client=None): def wait_for_all_checks(client=None):
""" """
Waits until the queue is empty and workers are idle. Waits until the queue is empty and workers are idle.