Compare commits

...

17 Commits

Author SHA1 Message Date
dgtlmoon
9144b67610 Adding test 2025-10-12 19:40:44 +02:00
dgtlmoon
b20fe70b49 adding test 2025-10-12 19:39:15 +02:00
dgtlmoon
080acced85 Merge branch 'master' into proxy-url-validation 2025-10-12 19:26:49 +02:00
dgtlmoon
66ddd87ee4 Move proxy default selection to proxy tab 2025-10-12 19:26:04 +02:00
dgtlmoon
233189e4f7 Build - Splitting memory report (#3493) 2025-10-12 19:20:28 +02:00
dgtlmoon
6b895ae972 Refactoring fieldlist to accept correct validation 2025-10-12 19:19:49 +02:00
dgtlmoon
7efd4c2a99 WIP - proxy/browser settings URL validation 2025-10-12 19:05:47 +02:00
dgtlmoon
b237fd7201 Replace stream/filetype detection library with puremagic, 20Mb less RAM usage (#3491) 2025-10-12 18:40:37 +02:00
dgtlmoon
3c81efe2f4 0.50.21
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 18:17:56 +02:00
dgtlmoon
0fcfb94690 Adding 'RSS reader mode' (see main Settings) (#3488) 2025-10-10 18:17:30 +02:00
dgtlmoon
bb6d4c2756 Re #3486 - Fixing and adding test for RSS/Atom not being converted to text when server sends "text/xml" instead of the "application/atom+xml" header (#3487) 2025-10-10 16:29:02 +02:00
dgtlmoon
b59ce190ac Ensure JSON is always correctly reformatted with padding (#3485 #3482) 2025-10-10 16:00:32 +02:00
dgtlmoon
80be1a30f2 No need to reformat/reprocess content in the case that no filters were found (#3484, #3483)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 13:44:49 +02:00
dgtlmoon
93b4f79006 0.50.20 2025-10-10 10:40:04 +02:00
dgtlmoon
3009e46617 PDF - Will trigger a change - Fixing output, also reported original size of document was incorrect (it was the size of the HTML output after conversion from PDF), Improving tests (#3481) 2025-10-10 10:38:34 +02:00
dgtlmoon
8f040a1a84 0.50.19
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 01:17:57 +02:00
dgtlmoon
4dbab8d77a Test speedup - remove common calls for function calls (#3477) 2025-10-10 01:16:03 +02:00
69 changed files with 1048 additions and 917 deletions

View File

@@ -253,6 +253,30 @@ jobs:
docker logs test-cdio-basic-tests > output-logs/test-cdio-basic-tests-stdout-${{ env.PYTHON_VERSION }}.txt
docker logs test-cdio-basic-tests 2> output-logs/test-cdio-basic-tests-stderr-${{ env.PYTHON_VERSION }}.txt
- name: Extract and display memory test report
if: always()
run: |
# Extract test-memory.log from the container
echo "Extracting test-memory.log from container..."
docker cp test-cdio-basic-tests:/app/changedetectionio/test-memory.log output-logs/test-memory-${{ env.PYTHON_VERSION }}.log || echo "test-memory.log not found in container"
# Display the memory log contents for immediate visibility in workflow output
echo "=== Top 10 Highest Peak Memory Tests ==="
if [ -f output-logs/test-memory-${{ env.PYTHON_VERSION }}.log ]; then
# Sort by peak memory value (extract number before MB and sort numerically, reverse order)
grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log | \
sed 's/.*Peak memory: //' | \
paste -d'|' - <(grep "Peak memory:" output-logs/test-memory-${{ env.PYTHON_VERSION }}.log) | \
sort -t'|' -k1 -nr | \
cut -d'|' -f2 | \
head -10
echo ""
echo "=== Full Memory Test Report ==="
cat output-logs/test-memory-${{ env.PYTHON_VERSION }}.log
else
echo "No memory log available"
fi
- name: Store everything including test-datastore
if: always()
uses: actions/upload-artifact@v4

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.18'
__version__ = '0.50.21'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -334,6 +334,10 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
if update_handler.fetcher.content or (not update_handler.fetcher.content and empty_pages_are_a_change):
watch.save_last_fetched_html(contents=update_handler.fetcher.content, timestamp=int(fetch_start_time))
# Explicitly delete large content variables to free memory IMMEDIATELY after saving
# These are no longer needed after being saved to history
del contents
# Send notifications on second+ check
if watch.history_n >= 2:
logger.info(f"Change detected in UUID {uuid} - {watch['url']}")
@@ -372,6 +376,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
datastore.update_watch(uuid=uuid, update_obj={'fetch_time': round(time.time() - fetch_start_time, 3),
'check_count': count})
# NOW clear fetcher content - after all processing is complete
# This is the last point where we need the fetcher data
if update_handler and hasattr(update_handler, 'fetcher') and update_handler.fetcher:
update_handler.fetcher.clear_content()
logger.debug(f"Cleared fetcher content for UUID {uuid}")
except Exception as e:
logger.error(f"Worker {worker_id} unexpected error processing {uuid}: {e}")
logger.error(f"Worker {worker_id} traceback:", exc_info=True)
@@ -392,7 +402,28 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore):
#logger.info(f"Worker {worker_id} sending completion signal for UUID {watch['uuid']}")
watch_check_update.send(watch_uuid=watch['uuid'])
update_handler = None
# Explicitly clean up update_handler and all its references
if update_handler:
# Clear fetcher content using the proper method
if hasattr(update_handler, 'fetcher') and update_handler.fetcher:
update_handler.fetcher.clear_content()
# Clear processor references
if hasattr(update_handler, 'content_processor'):
update_handler.content_processor = None
update_handler = None
# Clear local contents variable if it still exists
if 'contents' in locals():
del contents
# Note: We don't set watch = None here because:
# 1. watch is just a local reference to datastore.data['watching'][uuid]
# 2. Setting it to None doesn't affect the datastore
# 3. GC can't collect the object anyway (still referenced by datastore)
# 4. It would just cause confusion
logger.debug(f"Worker {worker_id} completed watch {uuid} in {time.time()-fetch_start_time:.2f}s")
except Exception as cleanup_error:
logger.error(f"Worker {worker_id} error during cleanup: {cleanup_error}")

View File

@@ -1,7 +1,7 @@
{% extends 'base.html' %}
{% block content %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field %}
{% from '_helpers.html' import render_field, render_checkbox_field, render_button, render_time_schedule_form, render_ternary_field, render_fieldlist_with_inline_errors %}
{% from '_common_fields.html' import render_common_settings_form %}
<script>
const notification_base_url="{{url_for('ui.ui_notification.ajax_callback_send_notification_test', mode="global-settings")}}";
@@ -72,25 +72,23 @@
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
{% if form.requests.proxy %}
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">
Choose a default proxy for all watches
</span>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
</div>
</div>
{% endif %}
</fieldset>
</div>
@@ -314,17 +312,27 @@ nav
<p><strong>Tip</strong>: "Residential" and "Mobile" proxy type can be more successfull than "Data Center" for blocked websites.
<div class="pure-control-group" id="extra-proxies-setting">
{{ render_field(form.requests.form.extra_proxies) }}
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_proxies) }}
<span class="pure-form-message-inline">"Name" will be used for selecting the proxy in the Watch Edit settings</span><br>
<span class="pure-form-message-inline">SOCKS5 proxies with authentication are only supported with 'plain requests' fetcher, for other fetchers you should whitelist the IP access instead</span>
{% if form.requests.proxy %}
<div>
<br>
<div class="inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
<span class="pure-form-message-inline">Choose a default proxy for all watches</span>
</div>
</div>
{% endif %}
</div>
<div class="pure-control-group" id="extra-browsers-setting">
<p>
<span class="pure-form-message-inline"><i>Extra Browsers</i> can be attached to further defeat CAPTCHA's on websites that are particularly hard to scrape.</span><br>
<span class="pure-form-message-inline">Simply paste the connection address into the box, <a href="https://changedetection.io/tutorial/using-bright-datas-scraping-browser-pass-captchas-and-other-protection-when-monitoring">More instructions and examples here</a> </span>
</p>
{{ render_field(form.requests.form.extra_browsers) }}
{{ render_fieldlist_with_inline_errors(form.requests.form.extra_browsers) }}
</div>
</div>
<div id="actions">
<div class="pure-control-group">

View File

@@ -64,6 +64,19 @@ class Fetcher():
# Time ONTOP of the system defined env minimum time
render_extract_delay = 0
def clear_content(self):
"""
Explicitly clear all content from memory to free up heap space.
Call this after content has been saved to disk.
"""
self.content = None
if hasattr(self, 'raw_content'):
self.raw_content = None
self.screenshot = None
self.xpath_data = None
# Keep headers and status_code as they're small
logger.trace("Fetcher content cleared from memory")
@abstractmethod
def get_error(self):
return self.error

View File

@@ -678,6 +678,51 @@ class ValidateCSSJSONXPATHInput(object):
except:
raise ValidationError("A system-error occurred when validating your jq expression")
class ValidateSimpleURL:
"""Validate that the value can be parsed by urllib.parse.urlparse() and has a scheme/netloc."""
def __init__(self, message=None):
self.message = message or "Invalid URL."
def __call__(self, form, field):
data = (field.data or "").strip()
if not data:
return # empty is OK — pair with validators.Optional()
from urllib.parse import urlparse
parsed = urlparse(data)
if not parsed.scheme or not parsed.netloc:
raise ValidationError(self.message)
class ValidateStartsWithRegex(object):
def __init__(self, regex, *, flags=0, message=None, allow_empty=True, split_lines=True):
# compile with given flags (well pass re.IGNORECASE below)
self.pattern = re.compile(regex, flags) if isinstance(regex, str) else regex
self.message = message
self.allow_empty = allow_empty
self.split_lines = split_lines
def __call__(self, form, field):
data = field.data
if not data:
return
# normalize into list of lines
if isinstance(data, str) and self.split_lines:
lines = data.splitlines()
elif isinstance(data, (list, tuple)):
lines = data
else:
lines = [data]
for line in lines:
stripped = line.strip()
if not stripped:
if self.allow_empty:
continue
raise ValidationError(self.message or "Empty value not allowed.")
if not self.pattern.match(stripped):
raise ValidationError(self.message or "Invalid value.")
class quickWatchForm(Form):
from . import processors
@@ -865,16 +910,29 @@ class processor_text_json_diff_form(commonSettingsForm):
class SingleExtraProxy(Form):
# maybe better to set some <script>var..
proxy_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
proxy_url = StringField('Proxy URL', [validators.Optional()], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
# @todo do the validation here instead
proxy_url = StringField('Proxy URL', [
validators.Optional(),
ValidateStartsWithRegex(
regex=r'^(https?|socks5)://', # ✅ main pattern
flags=re.IGNORECASE, # ✅ makes it case-insensitive
message='Proxy URLs must start with http://, https:// or socks5://',
),
ValidateSimpleURL()
], render_kw={"placeholder": "socks5:// or regular proxy http://user:pass@...:3128", "size":50})
class SingleExtraBrowser(Form):
browser_name = StringField('Name', [validators.Optional()], render_kw={"placeholder": "Name"})
browser_connection_url = StringField('Browser connection URL', [validators.Optional()], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
# @todo do the validation here instead
browser_connection_url = StringField('Browser connection URL', [
validators.Optional(),
ValidateStartsWithRegex(
regex=r'^(wss?|ws)://',
flags=re.IGNORECASE,
message='Browser URLs must start with wss:// or ws://'
),
ValidateSimpleURL()
], render_kw={"placeholder": "wss://brightdata... wss://oxylabs etc", "size":50})
class DefaultUAInputForm(Form):
html_requests = StringField('Plaintext requests', validators=[validators.Optional()], render_kw={"placeholder": "<default>"})
@@ -885,7 +943,7 @@ class DefaultUAInputForm(Form):
class globalSettingsRequestForm(Form):
time_between_check = RequiredFormField(TimeBetweenCheckForm)
time_schedule_limit = FormField(ScheduleLimitForm)
proxy = RadioField('Proxy')
proxy = RadioField('Default proxy')
jitter_seconds = IntegerField('Random jitter seconds ± check',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0, message="Should contain zero or more seconds")])
@@ -940,6 +998,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
strip_ignored_lines = BooleanField('Strip ignored lines')
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0,

View File

@@ -1,5 +1,4 @@
from loguru import logger
from lxml import etree
from typing import List
import html
import json
@@ -58,13 +57,17 @@ def include_filters(include_filters, html_content, append_pretty_line_formatting
return html_block
def subtractive_css_selector(css_selector, html_content):
def subtractive_css_selector(css_selector, content):
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
soup = BeautifulSoup(content, "html.parser")
# So that the elements dont shift their index, build a list of elements here which will be pointers to their place in the DOM
elements_to_remove = soup.select(css_selector)
if not elements_to_remove:
# Better to return the original that rebuild with BeautifulSoup
return content
# Then, remove them in a separate loop
for item in elements_to_remove:
item.decompose()
@@ -72,6 +75,7 @@ def subtractive_css_selector(css_selector, html_content):
return str(soup)
def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
from lxml import etree
# Parse the HTML content using lxml
html_tree = etree.HTML(html_content)
@@ -83,6 +87,10 @@ def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
# Collect elements for each selector
elements_to_remove.extend(html_tree.xpath(selector))
# If no elements were found, return the original HTML content
if not elements_to_remove:
return html_content
# Then, remove them in a separate loop
for element in elements_to_remove:
if element.getparent() is not None: # Ensure the element has a parent before removing
@@ -295,70 +303,92 @@ def _get_stripped_text_from_json_match(match):
return stripped_text_from_html
def extract_json_blob_from_html(content, ensure_is_ldjson_info_type, json_filter):
from bs4 import BeautifulSoup
stripped_text_from_html = ''
# Foreach <script json></script> blob.. just return the first that matches json_filter
# As a last resort, try to parse the whole <body>
soup = BeautifulSoup(content, 'html.parser')
if ensure_is_ldjson_info_type:
bs_result = soup.find_all('script', {"type": "application/ld+json"})
else:
bs_result = soup.find_all('script')
bs_result += soup.find_all('body')
bs_jsons = []
for result in bs_result:
# result.text is how bs4 magically strips JSON from the body
content_start = result.text.lstrip("\ufeff").strip()[:100] if result.text else ''
# Skip empty tags, and things that dont even look like JSON
if not result.text or not (content_start[0] == '{' or content_start[0] == '['):
continue
try:
json_data = json.loads(result.text)
bs_jsons.append(json_data)
except json.JSONDecodeError:
# Skip objects which cannot be parsed
continue
if not bs_jsons:
raise JSONNotFound("No parsable JSON found in this document")
for json_data in bs_jsons:
stripped_text_from_html = _parse_json(json_data, json_filter)
if ensure_is_ldjson_info_type:
# Could sometimes be list, string or something else random
if isinstance(json_data, dict):
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
# 1833 - could be either str or dict, should not be anything else
t = json_data.get('@type')
if t and stripped_text_from_html:
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
break
# The non-standard part, some have a list
elif isinstance(t, list):
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
break
elif stripped_text_from_html:
break
return stripped_text_from_html
# content - json
# json_filter - ie json:$..price
# ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector)
def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None):
from bs4 import BeautifulSoup
stripped_text_from_html = False
# https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w
# Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags
try:
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff") ), json_filter)
except json.JSONDecodeError as e:
logger.warning(str(e))
# Foreach <script json></script> blob.. just return the first that matches json_filter
# As a last resort, try to parse the whole <body>
soup = BeautifulSoup(content, 'html.parser')
# Looks like clean JSON, dont bother extracting from HTML
if ensure_is_ldjson_info_type:
bs_result = soup.find_all('script', {"type": "application/ld+json"})
else:
bs_result = soup.find_all('script')
bs_result += soup.find_all('body')
content_start = content.lstrip("\ufeff").strip()[:100]
bs_jsons = []
for result in bs_result:
# Skip empty tags, and things that dont even look like JSON
if not result.text or '{' not in result.text:
continue
try:
json_data = json.loads(result.text)
bs_jsons.append(json_data)
except json.JSONDecodeError:
# Skip objects which cannot be parsed
continue
if not bs_jsons:
raise JSONNotFound("No parsable JSON found in this document")
for json_data in bs_jsons:
stripped_text_from_html = _parse_json(json_data, json_filter)
if ensure_is_ldjson_info_type:
# Could sometimes be list, string or something else random
if isinstance(json_data, dict):
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
# 1833 - could be either str or dict, should not be anything else
t = json_data.get('@type')
if t and stripped_text_from_html:
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
break
# The non-standard part, some have a list
elif isinstance(t, list):
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
break
elif stripped_text_from_html:
break
if content_start[0] == '{' or content_start[0] == '[':
try:
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff")), json_filter)
except json.JSONDecodeError as e:
logger.warning(f"Error processing JSON {content[:20]}...{str(e)})")
else:
# Probably something else, go fish inside for it
try:
stripped_text_from_html = extract_json_blob_from_html(content=content,
ensure_is_ldjson_info_type=ensure_is_ldjson_info_type,
json_filter=json_filter )
except json.JSONDecodeError as e:
logger.warning(f"Error processing JSON while extracting JSON from HTML blob {content[:20]}...{str(e)})")
if not stripped_text_from_html:
# Re 265 - Just return an empty string when filter not found

View File

@@ -55,6 +55,7 @@ class model(dict):
'rss_access_token': None,
'rss_content_format': RSS_FORMAT_TYPES[0][0],
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'schema_version' : 0,
'shared_diff_access': False,
'strip_ignored_lines': False,

View File

@@ -64,24 +64,31 @@ class guess_stream_type():
# Remove whitespace between < and tag name for robust detection (handles '< html', '<\nhtml', etc.)
test_content_normalized = re.sub(r'<\s+', '<', test_content)
# Magic will sometimes call text/plain as text/html!
# Use puremagic for lightweight MIME detection (saves ~14MB vs python-magic)
magic_result = None
try:
import magic
import puremagic
mime = magic.from_buffer(content[:200], mime=True) # Send the original content
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
if mime and "/" in mime:
magic_result = mime
# Ignore generic/fallback mime types from magic
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
logger.debug(f"Ignoring generic mime type '{mime}' from magic library")
# Trust magic for non-text types immediately
elif mime not in ['text/html', 'text/plain']:
magic_content_header = mime
# puremagic needs bytes, so encode if we have a string
content_bytes = content[:200].encode('utf-8') if isinstance(content, str) else content[:200]
# puremagic returns a list of PureMagic objects with confidence scores
detections = puremagic.magic_string(content_bytes)
if detections:
# Get the highest confidence detection
mime = detections[0].mime_type
logger.debug(f"Guessing mime type, original content_type '{http_content_header}', mime type detected '{mime}'")
if mime and "/" in mime:
magic_result = mime
# Ignore generic/fallback mime types
if mime in ['application/octet-stream', 'application/x-empty', 'binary']:
logger.debug(f"Ignoring generic mime type '{mime}' from puremagic library")
# Trust puremagic for non-text types immediately
elif mime not in ['text/html', 'text/plain']:
magic_content_header = mime
except Exception as e:
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}), using content-based detection")
logger.error(f"Error getting a more precise mime type from 'puremagic' library ({str(e)}), using content-based detection")
# Content-based detection (most reliable for text formats)
# Check for HTML patterns first - if found, override magic's text/plain
@@ -94,24 +101,21 @@ class guess_stream_type():
self.is_rss = True
elif any(s in http_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
# magic will call a rss document 'xml'
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
self.is_rss = True
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
# Only mark as generic XML if not already detected as RSS
if not self.is_rss:
self.is_xml = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
###
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
# If magic says text/plain and we found no HTML patterns, trust it
elif magic_result == 'text/plain':
self.is_plaintext = True
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
# magic will call a rss document 'xml'
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
self.is_rss = True
elif test_content_normalized.startswith('<?xml') or any(s in magic_content_header for s in XML_CONTENT_TYPES):
# Generic XML that's not RSS/Atom (RSS/Atom checked above)
self.is_xml = True
@@ -122,4 +126,8 @@ class guess_stream_type():
# Only trust magic for 'text' if no other patterns matched
elif 'text' in magic_content_header:
self.is_plaintext = True
# If magic says text/plain and we found no HTML patterns, trust it
elif magic_result == 'text/plain':
self.is_plaintext = True
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")

View File

@@ -20,7 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
name = 'Webpage Text/HTML, JSON and PDF changes'
description = 'Detects all text changes where possible'
json_filter_prefixes = ['json:', 'jq:', 'jqraw:']
JSON_FILTER_PREFIXES = ['json:', 'jq:', 'jqraw:']
# Assume it's this type if the server says nothing on content-type
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
@@ -99,6 +99,10 @@ class FilterConfig:
def has_include_filters(self):
return bool(self.include_filters) and bool(self.include_filters[0].strip())
@property
def has_include_json_filters(self):
return any(f.strip().startswith(prefix) for f in self.include_filters for prefix in JSON_FILTER_PREFIXES)
@property
def has_subtractive_selectors(self):
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
@@ -224,10 +228,23 @@ class ContentProcessor:
self.datastore = datastore
def preprocess_rss(self, content):
"""Convert CDATA/comments in RSS to usable text."""
return cdata_in_document_to_text(html_content=content)
"""
Convert CDATA/comments in RSS to usable text.
def preprocess_pdf(self, content, raw_content):
Supports two RSS processing modes:
- 'default': Inline CDATA replacement (original behavior)
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
"""
from changedetectionio import rss_tools
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
if rss_mode:
# Format RSS items nicely with CDATA content unmarked and converted to text
return rss_tools.format_rss_items(content)
else:
# Default: Original inline CDATA replacement
return cdata_in_document_to_text(html_content=content)
def preprocess_pdf(self, raw_content):
"""Convert PDF to HTML using external tool."""
from shutil import which
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
@@ -251,19 +268,18 @@ class ContentProcessor:
metadata = (
f"<p>Added by changedetection.io: Document checksum - "
f"{hashlib.md5(raw_content).hexdigest().upper()} "
f"Filesize - {len(html_content)} bytes</p>"
f"Original file size - {len(raw_content)} bytes</p>"
)
return html_content.replace('</body>', metadata + '</body>')
def preprocess_json(self, content, has_filters):
def preprocess_json(self, raw_content):
"""Format and sort JSON content."""
# Force reformat if no filters specified
if not has_filters:
content = html_tools.extract_json_as_string(content=content, json_filter="json:$")
# Then we re-format it, else it does have filters (later on) which will reformat it anyway
content = html_tools.extract_json_as_string(content=raw_content, json_filter="json:$")
# Sort JSON to avoid false alerts from reordering
try:
content = json.dumps(json.loads(content), sort_keys=True)
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
except Exception:
# Might be malformed JSON, continue anyway
pass
@@ -294,7 +310,7 @@ class ContentProcessor:
)
# JSON filters
elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes):
elif any(filter_rule.startswith(prefix) for prefix in JSON_FILTER_PREFIXES):
filtered_content += html_tools.extract_json_as_string(
content=content,
json_filter=filter_rule
@@ -381,14 +397,23 @@ class perform_site_check(difference_detection_processor):
# RSS preprocessing
if stream_content_type.is_rss:
content = content_processor.preprocess_rss(content)
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
stream_content_type.is_rss = False
stream_content_type.is_html = True
self.fetcher.content = content
# PDF preprocessing
if watch.is_pdf or stream_content_type.is_pdf:
content = content_processor.preprocess_pdf(content, self.fetcher.raw_content)
content = content_processor.preprocess_pdf(raw_content=self.fetcher.raw_content)
stream_content_type.is_html = True
# JSON - Always reformat it nicely for consistency.
# JSON preprocessing
if stream_content_type.is_json:
content = content_processor.preprocess_json(content, filter_config.has_include_filters)
if not filter_config.has_include_json_filters:
content = content_processor.preprocess_json(raw_content=content)
#else, otherwise it gets sorted/formatted in the filter stage anyway
# HTML obfuscation workarounds
if stream_content_type.is_html:
@@ -403,6 +428,8 @@ class perform_site_check(difference_detection_processor):
html_content = content
# Apply include filters (CSS, XPath, JSON)
# Except for plaintext (incase they tried to confuse the system, it will HTML escape
#if not stream_content_type.is_plaintext:
if filter_config.has_include_filters:
html_content = content_processor.apply_include_filters(content, stream_content_type)
@@ -414,6 +441,9 @@ class perform_site_check(difference_detection_processor):
if watch.is_source_type_url:
# For source URLs, keep raw content
stripped_text = html_content
elif stream_content_type.is_plaintext:
# For plaintext, keep as-is without HTML-to-text conversion
stripped_text = html_content
else:
# Extract text from HTML/RSS content (not generic XML)
if stream_content_type.is_html or stream_content_type.is_rss:
@@ -526,6 +556,20 @@ class perform_site_check(difference_detection_processor):
else:
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
# Note: Explicit cleanup is only needed here because text_json_diff handles
# large strings (100KB-300KB for RSS/HTML). The other processors work with
# small strings and don't need this.
#
# Python would clean these up automatically, but explicit `del` frees memory
# immediately rather than waiting for function return, reducing peak memory usage.
del content
if 'html_content' in locals() and html_content is not stripped_text:
del html_content
if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text:
del text_content_before_ignored_filter
if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text:
del text_for_checksuming
return changed_detected, update_obj, stripped_text
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):

View File

@@ -0,0 +1,130 @@
"""
RSS/Atom feed processing tools for changedetection.io
"""
from loguru import logger
import re
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
"""
Process CDATA sections in HTML/XML content - inline replacement.
Args:
html_content: The HTML/XML content to process
render_anchor_tag_content: Whether to render anchor tag content
Returns:
Processed HTML/XML content with CDATA sections replaced inline
"""
from xml.sax.saxutils import escape as xml_escape
from .html_tools import html_to_text
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
def repl(m):
text = m.group(1)
return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip()
return re.sub(pattern, repl, html_content)
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
Format RSS/Atom feed items in a readable text format using feedparser.
Converts RSS <item> or Atom <entry> elements to formatted text with:
- <title> → <h1>Title</h1>
- <link> → Link: [url]
- <guid> → Guid: [id]
- <pubDate> → PubDate: [date]
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
Args:
rss_content: The RSS/Atom feed content
render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility)
Returns:
Formatted HTML content ready for html_to_text conversion
"""
try:
import feedparser
from xml.sax.saxutils import escape as xml_escape
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
feed = feedparser.parse(rss_content)
formatted_items = []
# Determine feed type for appropriate labels when fields are missing
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
is_atom = feed.version and 'atom' in feed.version
for entry in feed.entries:
item_parts = []
# Title - feedparser handles CDATA and entity unescaping automatically
if hasattr(entry, 'title') and entry.title:
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
# Link
if hasattr(entry, 'link') and entry.link:
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
# GUID/ID
if hasattr(entry, 'id') and entry.id:
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
# Date - feedparser normalizes all date field names to 'published'
if hasattr(entry, 'published') and entry.published:
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
# Description/Content - feedparser handles CDATA and entity unescaping automatically
# Only add "Summary:" label for Atom <summary> tags
content = None
add_label = False
if hasattr(entry, 'content') and entry.content:
# Atom <content> - no label, just content
content = entry.content[0].value if entry.content[0].value else None
elif hasattr(entry, 'summary'):
# Could be RSS <description> or Atom <summary>
# feedparser maps both to entry.summary
content = entry.summary if entry.summary else None
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
if is_atom:
add_label = True
# Add content with or without label
if content:
if add_label:
item_parts.append(f'Summary:<br>{content}')
else:
item_parts.append(content)
else:
# No content - just show <none>
item_parts.append('&lt;none&gt;')
# Join all parts of this item
if item_parts:
formatted_items.append('\n'.join(item_parts))
# Wrap each item in a div with classes (first, last, item-N)
items_html = []
total_items = len(formatted_items)
for idx, item in enumerate(formatted_items):
classes = ['rss-item']
if idx == 0:
classes.append('first')
if idx == total_items - 1:
classes.append('last')
classes.append(f'item-{idx + 1}')
class_str = ' '.join(classes)
items_html.append(f'<div class="{class_str}">{item}</div>')
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
except Exception as e:
logger.warning(f"Error formatting RSS items: {str(e)}")
# Fall back to original content
return rss_content

View File

@@ -344,7 +344,7 @@ label {
}
}
#notification-customisation {
.grey-form-border {
border: 1px solid var(--color-border-notification);
padding: 0.5rem;
border-radius: 5px;

File diff suppressed because one or more lines are too long

View File

@@ -33,7 +33,7 @@
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
</div>
</div>
<div id="notification-customisation" class="pure-control-group">
<div class="pure-control-group grey-form-border">
<div class="pure-control-group">
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
<span class="pure-form-message-inline">Title for all notifications</span>

View File

@@ -14,13 +14,31 @@
{% if field.errors is mapping and 'form' in field.errors %}
{# and subfield form errors, such as used in RequiredFormField() for TimeBetweenCheckForm sub form #}
{% set errors = field.errors['form'] %}
{% for error in errors %}
<li>{{ error }}</li>
{% endfor %}
{% elif field.type == 'FieldList' %}
{# Handle FieldList of FormFields - errors is a list of dicts, one per entry #}
{% for idx, entry_errors in field.errors|enumerate %}
{% if entry_errors is mapping and entry_errors %}
{# Only show entries that have actual errors #}
<li><strong>Entry {{ idx + 1 }}:</strong>
<ul>
{% for field_name, messages in entry_errors.items() %}
{% for message in messages %}
<li>{{ field_name }}: {{ message }}</li>
{% endfor %}
{% endfor %}
</ul>
</li>
{% endif %}
{% endfor %}
{% else %}
{# regular list of errors with this field #}
{% set errors = field.errors %}
{% for error in field.errors %}
<li>{{ error }}</li>
{% endfor %}
{% endif %}
{% for error in errors %}
<li>{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</div>
@@ -93,6 +111,39 @@
{{ field(**kwargs)|safe }}
{% endmacro %}
{% macro render_fieldlist_with_inline_errors(fieldlist) %}
{# Specialized macro for FieldList(FormField(...)) that renders errors inline with each field #}
<div {% if fieldlist.errors %} class="error" {% endif %}>{{ fieldlist.label }}</div>
<div {% if fieldlist.errors %} class="error" {% endif %}>
<ul id="{{ fieldlist.id }}">
{% for entry in fieldlist %}
<li {% if entry.errors %} class="error" {% endif %}>
<label for="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>{{ fieldlist.label.text }}-{{ loop.index0 }}</label>
<table id="{{ entry.id }}" {% if entry.errors %} class="error" {% endif %}>
<tbody>
{% for subfield in entry %}
<tr {% if subfield.errors %} class="error" {% endif %}>
<th {% if subfield.errors %} class="error" {% endif %}><label for="{{ subfield.id }}" {% if subfield.errors %} class="error" {% endif %}>{{ subfield.label.text }}</label></th>
<td {% if subfield.errors %} class="error" {% endif %}>
{{ subfield(**kwargs)|safe }}
{% if subfield.errors %}
<ul class="errors">
{% for error in subfield.errors %}
<li class="error">{{ error }}</li>
{% endfor %}
</ul>
{% endif %}
</td>
</tr>
{% endfor %}
</tbody>
</table>
</li>
{% endfor %}
</ul>
</div>
{% endmacro %}
{% macro render_conditions_fieldlist_of_formfields_as_table(fieldlist, table_id="rulesTable") %}
<div class="fieldlist_formfields" id="{{ table_id }}">
<div class="fieldlist-header">

View File

@@ -29,16 +29,28 @@ def reportlog(pytestconfig):
logger.remove(handler_id)
def format_memory_human(bytes_value):
"""Format memory in human-readable units (KB, MB, GB)"""
if bytes_value < 1024:
return f"{bytes_value} B"
elif bytes_value < 1024 ** 2:
return f"{bytes_value / 1024:.2f} KB"
elif bytes_value < 1024 ** 3:
return f"{bytes_value / (1024 ** 2):.2f} MB"
else:
return f"{bytes_value / (1024 ** 3):.2f} GB"
def track_memory(memory_usage, ):
process = psutil.Process(os.getpid())
while not memory_usage["stop"]:
current_rss = process.memory_info().rss
memory_usage["peak"] = max(memory_usage["peak"], current_rss)
memory_usage["current"] = current_rss # Keep updating current
time.sleep(0.01) # Adjust the sleep time as needed
@pytest.fixture(scope='function')
def measure_memory_usage(request):
memory_usage = {"peak": 0, "stop": False}
memory_usage = {"peak": 0, "current": 0, "stop": False}
tracker_thread = Thread(target=track_memory, args=(memory_usage,))
tracker_thread.start()
@@ -47,16 +59,17 @@ def measure_memory_usage(request):
memory_usage["stop"] = True
tracker_thread.join()
# Note: ru_maxrss is in kilobytes on Unix-based systems
max_memory_used = memory_usage["peak"] / 1024 # Convert to MB
s = f"{time.time()} Peak memory used by the test {request.node.fspath} - '{request.node.name}': {max_memory_used:.2f} MB"
# Note: psutil returns RSS memory in bytes
peak_human = format_memory_human(memory_usage["peak"])
s = f"{time.time()} {request.node.fspath} - '{request.node.name}' - Peak memory: {peak_human}"
logger.debug(s)
with open("test-memory.log", 'a') as f:
f.write(f"{s}\n")
# Assert that the memory usage is less than 200MB
# assert max_memory_used < 150, f"Memory usage exceeded 200MB: {max_memory_used:.2f} MB"
# assert peak_memory_kb < 150 * 1024, f"Memory usage exceeded 150MB: {peak_human}"
def cleanup(datastore_path):

View File

@@ -29,13 +29,8 @@ def do_test(client, live_server, make_test_use_extra_browser=False):
assert b"Settings updated." in res.data
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
if make_test_use_extra_browser:

View File

@@ -49,3 +49,39 @@ def test_select_custom(client, live_server, measure_memory_usage):
#
# Now we should see the request in the container logs for "squid-squid-custom" because it will be the only default
def test_custom_proxy_validation(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
# Goto settings, add our custom one
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
"requests-extra_proxies-0-proxy_url": "xxxxhtt/333??p://test:awesome@squid-custom:3128",
},
follow_redirects=True
)
assert b"Settings updated." not in res.data
assert b'Proxy URLs must start with' in res.data
res = client.post(
url_for("settings.settings_page"),
data={
"requests-time_between_check-minutes": 180,
"application-ignore_whitespace": "y",
"application-fetch_backend": 'html_requests',
"requests-extra_proxies-0-proxy_name": "custom-test-proxy",
"requests-extra_proxies-0-proxy_url": "https://",
},
follow_redirects=True
)
assert b"Settings updated." not in res.data
assert b"Invalid URL." in res.data

View File

@@ -2,7 +2,7 @@
import json
import os
from flask import url_for
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
def set_response():
@@ -98,6 +98,5 @@ def test_socks5(client, live_server, measure_memory_usage):
)
assert b"OK" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -5,7 +5,7 @@ import re
from flask import url_for
from changedetectionio.tests.util import set_original_response, set_modified_response, set_more_modified_response, live_server_setup, \
wait_for_all_checks, \
set_longer_modified_response
set_longer_modified_response, delete_all_watches
from changedetectionio.tests.util import extract_UUID_from_client
import logging
import base64
@@ -85,8 +85,7 @@ def test_check_notification_email_formats_default_HTML(client, live_server, meas
assert '(added) So let\'s see what happens.\r\n' in msg # The plaintext part with \r\n
assert 'Content-Type: text/html' in msg
assert '(added) So let\'s see what happens.<br>' in msg # the html part
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_notification_email_formats_default_Text_override_HTML(client, live_server, measure_memory_usage):
@@ -179,5 +178,4 @@ def test_check_notification_email_formats_default_Text_override_HTML(client, liv
assert '&lt;' not in msg
assert 'Content-Type: text/html' in msg
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@ from .util import live_server_setup, wait_for_all_checks
from flask import url_for
import time
def test_check_access_control(app, client, live_server):
def test_check_access_control(app, client, live_server, measure_memory_usage):
# Still doesnt work, but this is closer.
# live_server_setup(live_server) # Setup on conftest per function

View File

@@ -3,7 +3,7 @@
import os.path
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, delete_all_watches
import time
def set_original(excluding=None, add_line=None):
@@ -44,12 +44,8 @@ def test_check_removed_line_contains_trigger(client, live_server, measure_memory
set_original()
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -107,14 +103,12 @@ def test_check_removed_line_contains_trigger(client, live_server, measure_memory
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_add_line_contains_trigger(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
time.sleep(1)
# Give the endpoint time to spin up
@@ -137,12 +131,8 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
set_original()
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -187,5 +177,4 @@ def test_check_add_line_contains_trigger(client, live_server, measure_memory_usa
assert b'-Oh yes please' in response
assert '网站监测 内容更新了'.encode('utf-8') in response
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
import json
import uuid
@@ -276,8 +276,7 @@ def test_access_denied(client, live_server, measure_memory_usage):
assert res.status_code == 200
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.post(
url_for("settings.settings_page"),
@@ -385,8 +384,7 @@ def test_api_watch_PUT_update(client, live_server, measure_memory_usage):
assert b'Additional properties are not allowed' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_api_import(client, live_server, measure_memory_usage):

View File

@@ -4,7 +4,7 @@ from flask import url_for
from .util import live_server_setup
import json
def test_api_notifications_crud(client, live_server):
def test_api_notifications_crud(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')

View File

@@ -6,7 +6,7 @@ import time
from .util import live_server_setup, wait_for_all_checks
def test_api_search(client, live_server):
def test_api_search(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')

View File

@@ -12,12 +12,8 @@ def test_basic_auth(client, live_server, measure_memory_usage):
# This page will echo back any auth info
test_url = url_for('test_basicauth_method', _external=True).replace("//","//myuser:mypass@")
time.sleep(1)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
time.sleep(1)
# Check form validation

View File

@@ -86,12 +86,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Should get a notice that it's available
@@ -129,12 +125,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'ldjson-price-track-offer' not in res.data
@@ -146,12 +138,8 @@ def test_check_ldjson_price_autodetect(client, live_server, measure_memory_usage
def _test_runner_check_bad_format_ignored(live_server, client, has_ldjson_price_data):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
for k,v in client.application.config.get('DATASTORE').data['watching'].items():

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client
extract_UUID_from_client, delete_all_watches
sleep_time_for_fetch_thread = 3
@@ -163,8 +163,7 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
#
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
"""
@@ -193,13 +192,8 @@ got it\r\n
test_url = url_for('test_endpoint', content_type="application/octet-stream", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -227,7 +221,7 @@ got it\r\n
assert b"some random text that should be split by line\n" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_standard_text_plain(client, live_server, measure_memory_usage):
@@ -258,13 +252,8 @@ got it\r\n
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -293,7 +282,7 @@ got it\r\n
assert b"some random text that should be split by line\n" in res.data
assert b"<title>Even this title should stay because we are just plain text</title>" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Server says its plaintext, we should always treat it as plaintext
def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage):
@@ -309,13 +298,8 @@ def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage
test_url = url_for('test_endpoint', content_type="text/plain", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -326,7 +310,7 @@ def test_plaintext_even_if_xml_content(client, live_server, measure_memory_usage
assert b'&lt;string name=&#34;feed_update_receiver_name&#34;' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Server says its plaintext, we should always treat it as plaintext, and then if they have a filter, try to apply that
def test_plaintext_even_if_xml_content_and_can_apply_filters(client, live_server, measure_memory_usage):

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from changedetectionio import html_tools
def set_original_ignore_response():
@@ -70,12 +70,8 @@ def test_check_block_changedetection_text_NOT_present(client, live_server, measu
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -144,5 +140,4 @@ def test_check_block_changedetection_text_NOT_present(client, live_server, measu
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -14,12 +14,8 @@ def test_clone_functionality(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# So that we can be sure the same history doesnt carry over

View File

@@ -3,7 +3,7 @@ import json
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from ..model import CONDITIONS_MATCH_LOGIC_DEFAULT
@@ -47,11 +47,11 @@ def set_number_out_of_range_response(number="150"):
f.write(test_return_data)
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
"""Test that both text and number conditions work together with AND logic."""
# live_server_setup(live_server) # Setup on conftest per function
def test_conditions_with_text_and_number(client, live_server):
def test_conditions_with_text_and_number(client, live_server, measure_memory_usage):
"""Test that both text and number conditions work together with AND logic."""
set_original_response("50")
@@ -60,12 +60,8 @@ def test_conditions_with_text_and_number(client, live_server):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Configure the watch with two conditions connected with AND:
@@ -143,23 +139,18 @@ def test_conditions_with_text_and_number(client, live_server):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# The 'validate' button next to each rule row
def test_condition_validate_rule_row(client, live_server):
def test_condition_validate_rule_row(client, live_server, measure_memory_usage):
set_original_response("50")
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -230,12 +221,8 @@ def test_wordcount_conditions_plugin(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)

View File

@@ -81,12 +81,8 @@ def test_check_markup_include_filters_restriction(client, live_server, measure_m
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)
@@ -138,12 +134,8 @@ def test_check_multiple_filters(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
@@ -193,12 +185,8 @@ def test_filter_is_empty_help_suggestion(client, live_server, measure_memory_usa
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text

View File

@@ -5,7 +5,7 @@ import time
from flask import url_for
from ..html_tools import *
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
@@ -231,8 +231,7 @@ body > table > tr:nth-child(3) > td:nth-child(3)""",
for selector_list in subtractive_selectors_data:
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"subtractive_selectors": selector_list.splitlines()})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)

View File

@@ -28,11 +28,8 @@ def test_check_encoding_detection(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="text/html", _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -59,11 +56,8 @@ def test_check_encoding_detection_missing_content_type_header(client, live_serve
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
@@ -19,12 +19,8 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
status_code=http_code,
_external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -47,8 +43,7 @@ def _runner_test_http_errors(client, live_server, http_code, expected_text):
#assert b'Error Screenshot' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_http_error_handler(client, live_server, measure_memory_usage):
@@ -56,8 +51,7 @@ def test_http_error_handler(client, live_server, measure_memory_usage):
_runner_test_http_errors(client, live_server, 404, 'Page not found')
_runner_test_http_errors(client, live_server, 500, '(Internal server error) received')
_runner_test_http_errors(client, live_server, 400, 'Error - Request returned a HTTP error code 400')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Just to be sure error text is properly handled
def test_DNS_errors(client, live_server, measure_memory_usage):
@@ -87,8 +81,7 @@ def test_DNS_errors(client, live_server, measure_memory_usage):
assert found_name_resolution_error
# Should always record that we tried
assert bytes("just now".encode('utf-8')) in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Re 1513
def test_low_level_errors_clear_correctly(client, live_server, measure_memory_usage):
@@ -145,5 +138,4 @@ def test_low_level_errors_clear_correctly(client, live_server, measure_memory_us
)
assert not found_name_resolution_error
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from ..html_tools import *
@@ -76,12 +76,8 @@ def test_check_filter_multiline(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -131,12 +127,8 @@ def test_check_filter_and_regex_extract(client, live_server, measure_memory_usag
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -212,12 +204,8 @@ def test_regex_error_handling(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
### test regex error handling
res = client.post(
@@ -231,5 +219,4 @@ def test_regex_error_handling(client, live_server, measure_memory_usage):
assert b'is not a valid regular expression.' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -42,13 +42,8 @@ def run_filter_test(client, live_server, content_filter):
if os.path.isfile("test-datastore/notification.txt"):
os.unlink("test-datastore/notification.txt")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, get_UUID_for_tag_name, extract_UUID_from_client, delete_all_watches
import os
@@ -127,8 +127,7 @@ def test_setup_group_tag(client, live_server, measure_memory_usage):
assert b"should-be-excluded" not in res.data
assert res.status_code == 200
assert b"first-imported=1" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_tag_import_singular(client, live_server, measure_memory_usage):
@@ -147,8 +146,7 @@ def test_tag_import_singular(client, live_server, measure_memory_usage):
)
# Should be only 1 tag because they both had the same
assert res.data.count(b'test-tag') == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_tag_add_in_ui(client, live_server, measure_memory_usage):
@@ -164,8 +162,7 @@ def test_tag_add_in_ui(client, live_server, measure_memory_usage):
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_group_tag_notification(client, live_server, measure_memory_usage):
@@ -232,8 +229,7 @@ def test_group_tag_notification(client, live_server, measure_memory_usage):
#@todo Test that multiple notifications fired
#@todo Test that each of multiple notifications with different settings
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_limit_tag_ui(client, live_server, measure_memory_usage):
@@ -269,8 +265,7 @@ def test_limit_tag_ui(client, live_server, measure_memory_usage):
assert res.data.count(b' unviewed ') == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
@@ -297,8 +292,7 @@ def test_clone_tag_on_import(client, live_server, measure_memory_usage):
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usage):
@@ -325,8 +319,7 @@ def test_clone_tag_on_quickwatchform_add(client, live_server, measure_memory_usa
# 2 times plus the top link to tag
assert res.data.count(b'test-tag') == 3
assert res.data.count(b'another-tag') == 3
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
res = client.get(url_for("tags.delete_all"), follow_redirects=True)
assert b'All tags deleted' in res.data
@@ -389,12 +382,8 @@ def test_order_of_filters_tag_filter_and_watch_filter(client, live_server, measu
f.write(d)
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
filters = [
@@ -480,5 +469,4 @@ the {test} appeared before. {test in res.data[:n]=}
"""
n += t_index + len(test)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -4,7 +4,7 @@ import time
import os
import json
from flask import url_for
from .util import wait_for_all_checks
from .util import wait_for_all_checks, delete_all_watches
from urllib.parse import urlparse, parse_qs
def test_consistent_history(client, live_server, measure_memory_usage):
@@ -80,19 +80,15 @@ def test_consistent_history(client, live_server, measure_memory_usage):
assert '"default"' not in f.read(), "'default' probably shouldnt be here, it came from when the 'default' Watch vars were accidently being saved"
def test_check_text_history_view(client, live_server):
def test_check_text_history_view(client, live_server, measure_memory_usage):
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write("<html>test-one</html>")
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -121,5 +117,4 @@ def test_check_text_history_view(client, live_server):
assert b'test-two' in res.data
assert b'test-one' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -27,12 +27,8 @@ def test_ignore(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
set_original_ignore_response()
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -79,12 +75,8 @@ def test_strip_ignore_lines(client, live_server, measure_memory_usage):
assert b"Settings updated." in res.data
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from changedetectionio import html_tools
@@ -97,12 +97,8 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -163,8 +159,7 @@ def test_check_ignore_text_functionality(client, live_server, measure_memory_usa
# it is only ignored, it is not removed (it will be highlighted too)
assert b'new ignore stuff' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# When adding some ignore text, it should not trigger a change, even if something else on that line changes
def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
@@ -192,12 +187,8 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
# Switch to source mode so we can test that too!
test_url = "source:"+test_url
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -251,13 +242,12 @@ def _run_test_global_ignore(client, as_source=False, extra_ignore=""):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_global_ignore_text_functionality(client, live_server):
def test_check_global_ignore_text_functionality(client, live_server, measure_memory_usage):
_run_test_global_ignore(client, as_source=False)
def test_check_global_ignore_text_functionality_as_source(client, live_server):
def test_check_global_ignore_text_functionality_as_source(client, live_server, measure_memory_usage):
_run_test_global_ignore(client, as_source=True, extra_ignore='/\?v=\d/')

View File

@@ -3,9 +3,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -117,7 +115,5 @@ def test_render_anchor_tag_content_true(client, live_server, measure_memory_usag
assert b"/test-endpoint" in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"),
follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -60,12 +60,8 @@ def test_normal_page_check_works_with_ignore_status_code(client, live_server, me
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -94,12 +90,8 @@ def test_403_page_check_works_with_ignore_status_code(client, live_server, measu
# Add our URL to the import page
test_url = url_for('test_endpoint', status_code=403, _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(sleep_time_for_fetch_thread)

View File

@@ -70,12 +70,8 @@ def test_check_ignore_whitespace(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
# Trigger a check

View File

@@ -5,7 +5,7 @@ import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
# def test_setup(client, live_server, measure_memory_usage):
@@ -28,7 +28,7 @@ https://example.com tag1, other tag"""
assert b"3 Imported" in res.data
assert b"tag1" in res.data
assert b"other tag" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get( url_for("watchlist.index"))
@@ -53,7 +53,7 @@ def xtest_import_skip_url(client, live_server, measure_memory_usage):
assert b"1 Imported" in res.data
assert b"ht000000broken" in res.data
assert b"1 Skipped" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get( url_for("watchlist.index"))
@@ -119,7 +119,7 @@ def test_import_distillio(client, live_server, measure_memory_usage):
assert b"nice stuff" in res.data
assert b"nerd-news" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
# Clear flask alerts
res = client.get(url_for("watchlist.index"))
@@ -169,8 +169,7 @@ def test_import_custom_xlsx(client, live_server, measure_memory_usage):
assert filters[0] == '/html[1]/body[1]/div[4]/div[1]/div[1]/div[1]||//*[@id=\'content\']/div[3]/div[1]/div[1]||//*[@id=\'content\']/div[1]'
assert watch.get('time_between_check') == {'weeks': 0, 'days': 1, 'hours': 6, 'minutes': 24, 'seconds': 0}
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_import_watchete_xlsx(client, live_server, measure_memory_usage):
"""Test can upload a excel spreadsheet and the watches are created correctly"""
@@ -214,5 +213,4 @@ def test_import_watchete_xlsx(client, live_server, measure_memory_usage):
if watch.get('title') == 'system default website':
assert watch.get('fetch_backend') == 'system' # uses default if blank
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for, escape
from . util import live_server_setup, wait_for_all_checks
from . util import live_server_setup, wait_for_all_checks, delete_all_watches
import pytest
jq_support = True
@@ -113,14 +113,8 @@ def set_original_ext_response():
return None
def set_modified_ext_response():
data = """
[
{
"isPriceLowered": false,
"status": "Sold",
"statusOrig": "sold"
},
{
# This should get reformatted
data = """ [ { "isPriceLowered": false, "status": "Sold", "statusOrig": "sold" }, {
"_id": "5e7b3e1fb3262d306323ff1e",
"listingsType": "consumer",
"isPriceLowered": false,
@@ -207,11 +201,8 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -225,8 +216,7 @@ def test_check_json_without_filter(client, live_server, measure_memory_usage):
assert b'&#34;html&#34;: &#34;&lt;b&gt;&#34;' in res.data
assert res.data.count(b'{') >= 2
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def check_json_filter(json_filter, client, live_server):
set_original_response()
@@ -234,34 +224,15 @@ def check_json_filter(json_filter, client, live_server):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
assert bytes(escape(json_filter).encode('utf-8')) in res.data
@@ -280,14 +251,13 @@ def check_json_filter(json_filter, client, live_server):
assert b'has-unread-changes' in res.data
# Should not see this, because its not in the JSONPath we entered
res = client.get(url_for("ui.ui_views.diff_history_page", uuid="first"))
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
# And #462 - check we see the proper utf-8 string there
assert "Örnsköldsvik".encode('utf-8') in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_jsonpath_filter(client, live_server, measure_memory_usage):
check_json_filter('json:boss.name', client, live_server)
@@ -303,36 +273,12 @@ def test_check_jqraw_filter(client, live_server, measure_memory_usage):
def check_json_filter_bool_val(json_filter, client, live_server):
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": [json_filter]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
# Make a change
set_modified_response()
@@ -345,8 +291,7 @@ def check_json_filter_bool_val(json_filter, client, live_server):
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
assert b'false' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_jsonpath_filter_bool_val(client, live_server, measure_memory_usage):
check_json_filter_bool_val("json:$['available']", client, live_server)
@@ -367,25 +312,16 @@ def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage):
def check_json_ext_filter(json_filter, client, live_server):
set_original_ext_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
# Give the thread time to pick it up
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
@@ -399,7 +335,7 @@ def check_json_ext_filter(json_filter, client, live_server):
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
assert bytes(escape(json_filter).encode('utf-8')) in res.data
@@ -413,6 +349,12 @@ def check_json_ext_filter(json_filter, client, live_server):
# Give the thread time to pick it up
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert snapshot_contents[0] == '['
# It should have 'has-unread-changes'
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
@@ -431,8 +373,7 @@ def check_json_ext_filter(json_filter, client, live_server):
assert b'ForSale' in res.data
assert b'Sold' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_ignore_json_order(client, live_server, measure_memory_usage):
# A change in order shouldn't trigger a notification
@@ -443,12 +384,8 @@ def test_ignore_json_order(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -473,24 +410,19 @@ def test_ignore_json_order(client, live_server, measure_memory_usage):
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_correct_header_detect(client, live_server, measure_memory_usage):
# Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
# Specify extra html that JSON is sometimes wrapped in - when using SockpuppetBrowser / Puppeteer / Playwrightetc
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write('<html><body>{"hello" : 123, "world": 123}')
f.write('<html><body>{ "world": 123, "hello" : 123}')
# Add our URL to the import page
# Check weird casing is cleaned up and detected also
test_url = url_for('test_endpoint', content_type="aPPlication/JSon", uppercase_headers=True, _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
@@ -502,11 +434,20 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'&#34;hello&#34;: 123,' in res.data
assert b'&#34;world&#34;: 123' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert b'&#34;hello&#34;: 123,' in res.data # properly html escaped in the front end
# Should be correctly formatted and sorted, ("world" goes to end)
assert snapshot_contents == """{
"hello": 123,
"world": 123
}"""
delete_all_watches(client)
def test_check_jsonpath_ext_filter(client, live_server, measure_memory_usage):
check_json_ext_filter('json:$[?(@.status==Sold)]', client, live_server)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from flask import url_for
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from changedetectionio.tests.util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
def set_response():
@@ -75,5 +75,4 @@ def test_content_filter_live_preview(client, live_server, measure_memory_usage):
assert reply.get('ignore_line_numbers') == [2] # Ignored - "socks" on line 2
assert reply.get('trigger_line_numbers') == [1] # Triggers "Awesome" in line 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -1,7 +1,7 @@
#!/usr/bin/env python3
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
import time
@@ -113,6 +113,5 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
#
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -24,12 +24,8 @@ def test_obfuscations(client, live_server, measure_memory_usage):
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
time.sleep(3)

View File

@@ -8,30 +8,30 @@ from .util import set_original_response, set_modified_response, live_server_setu
# `subtractive_selectors` should still work in `source:` type requests
def test_fetch_pdf(client, live_server, measure_memory_usage):
import shutil
import os
shutil.copy("tests/test.pdf", "test-datastore/endpoint-test.pdf")
first_version_size = os.path.getsize("test-datastore/endpoint-test.pdf")
# live_server_setup(live_server) # Setup on conftest per function
test_url = url_for('test_pdf_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
# PDF header should not be there (it was converted to text)
assert b'PDF' not in res.data[:10]
assert b'hello world' in res.data
assert 'PDF' not in snapshot_contents
# Was converted away from HTML
assert 'pdftohtml' not in snapshot_contents.lower() # Generator tag shouldnt be there
assert f'Original file size - {first_version_size}' in snapshot_contents
assert 'html' not in snapshot_contents.lower() # is converted from html
assert 'body' not in snapshot_contents.lower() # is converted from html
# And our text content was there
assert 'hello world' in snapshot_contents
# So we know if the file changes in other ways
import hashlib
@@ -39,8 +39,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
# We should have one
assert len(original_md5) >0
# And it's going to be in the document
assert b'Document checksum - '+bytes(str(original_md5).encode('utf-8')) in res.data
assert f'Document checksum - {original_md5}' in snapshot_contents
shutil.copy("tests/test2.pdf", "test-datastore/endpoint-test.pdf")
changed_md5 = hashlib.md5(open("test-datastore/endpoint-test.pdf", 'rb').read()).hexdigest().upper()
@@ -63,7 +62,6 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
assert original_md5.encode('utf-8') not in res.data
assert changed_md5.encode('utf-8') in res.data
res = client.get(
url_for("ui.ui_views.diff_history_page", uuid="first"),
follow_redirects=True
@@ -71,6 +69,16 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
assert original_md5.encode('utf-8') in res.data
assert changed_md5.encode('utf-8') in res.data
assert b'here is a change' in res.data
dates = list(watch.history.keys())
# new snapshot was also OK, no HTML
snapshot_contents = watch.get_history_snapshot(dates[1])
assert 'html' not in snapshot_contents.lower()
assert f'Original file size - {os.path.getsize("test-datastore/endpoint-test.pdf")}' in snapshot_contents
assert f'here is a change' in snapshot_contents
assert os.path.getsize("test-datastore/endpoint-test.pdf") != first_version_size # And the disk change worked

View File

@@ -13,13 +13,8 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
test_url = url_for('test_pdf_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -2,7 +2,7 @@ import json
import os
import time
from flask import url_for
from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client
from . util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
@@ -17,21 +17,13 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
test_url = test_url.replace('localhost', 'changedet')
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
cookie_header = '_ga=GA1.2.1022228332; cookie-preferences=analytics:accepted;'
@@ -82,8 +74,7 @@ def test_headers_in_request(client, live_server, measure_memory_usage):
for k, watch in client.application.config.get('DATASTORE').data.get('watching').items():
assert 'custom' in watch.get('remote_server_reply') # added in util.py
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_body_in_request(client, live_server, measure_memory_usage):
@@ -93,12 +84,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
# Because its no longer calling back to localhost but from the browser container, set in test-only.yml
test_url = test_url.replace('localhost', 'cdio')
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -150,12 +137,8 @@ def test_body_in_request(client, live_server, measure_memory_usage):
####### data sanity checks
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watches_with_body = 0
with open('test-datastore/url-watches.json') as f:
@@ -180,8 +163,7 @@ def test_body_in_request(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"Body must be empty when Request Method is set to GET" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_method_in_request(client, live_server, measure_memory_usage):
# Add our URL to the import page
@@ -191,20 +173,12 @@ def test_method_in_request(client, live_server, measure_memory_usage):
test_url = test_url.replace('localhost', 'cdio')
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -258,8 +232,7 @@ def test_method_in_request(client, live_server, measure_memory_usage):
# Should be only one with method set to PATCH
assert watches_with_method == 1
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Re #2408 - user-agent override test, also should handle case-insensitive header deduplication
def test_ua_global_override(client, live_server, measure_memory_usage):
@@ -277,12 +250,8 @@ def test_ua_global_override(client, live_server, measure_memory_usage):
)
assert b'Settings updated' in res.data
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
@@ -315,8 +284,7 @@ def test_ua_global_override(client, live_server, measure_memory_usage):
)
assert b"agent-from-watch" in res.data
assert b"html-requests-user-agent" not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
@@ -356,12 +324,8 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
assert b"requests-default_ua-html_requests" in res.data
# Add the test URL twice, we will check
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -429,19 +393,14 @@ def test_headers_textfile_in_request(client, live_server, measure_memory_usage):
assert "User-Agent:".encode('utf-8') + requests_ua.encode('utf-8') in res.data
# unlink headers.txt on start/stop
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_headers_validation(client, live_server):
def test_headers_validation(client, live_server, measure_memory_usage):
test_url = url_for('test_headers', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),

View File

@@ -3,7 +3,7 @@ import os
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, wait_for_notification_endpoint_output, extract_UUID_from_client, delete_all_watches
from ..notification import default_notification_format
instock_props = [
@@ -44,11 +44,11 @@ def set_original_response(props_markup='', price="121.95"):
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
def test_restock_itemprop_basic(client, live_server):
def test_restock_itemprop_basic(client, live_server, measure_memory_usage):
@@ -69,8 +69,7 @@ def test_restock_itemprop_basic(client, live_server):
assert b'has-restock-info' in res.data
assert b' in-stock' in res.data
assert b' not-in-stock' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
for p in out_of_stock_props:
@@ -85,10 +84,9 @@ def test_restock_itemprop_basic(client, live_server):
assert b'has-restock-info not-in-stock' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_itemprop_price_change(client, live_server):
def test_itemprop_price_change(client, live_server, measure_memory_usage):
# Out of the box 'Follow price changes' should be ON
@@ -132,13 +130,11 @@ def test_itemprop_price_change(client, live_server):
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def _run_test_minmax_limit(client, extra_watch_edit_form):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
@@ -212,11 +208,10 @@ def _run_test_minmax_limit(client, extra_watch_edit_form):
assert b'1,890.45' in res.data or b'1890.45' in res.data
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_restock_itemprop_minmax(client, live_server):
def test_restock_itemprop_minmax(client, live_server, measure_memory_usage):
extras = {
"restock_settings-follow_price_changes": "y",
@@ -225,7 +220,7 @@ def test_restock_itemprop_minmax(client, live_server):
}
_run_test_minmax_limit(client, extra_watch_edit_form=extras)
def test_restock_itemprop_with_tag(client, live_server):
def test_restock_itemprop_with_tag(client, live_server, measure_memory_usage):
res = client.post(
@@ -254,11 +249,10 @@ def test_restock_itemprop_with_tag(client, live_server):
def test_itemprop_percent_threshold(client, live_server):
def test_itemprop_percent_threshold(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
@@ -317,12 +311,11 @@ def test_itemprop_percent_threshold(client, live_server):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_change_with_notification_values(client, live_server):
def test_change_with_notification_values(client, live_server, measure_memory_usage):
if os.path.isfile("test-datastore/notification.txt"):
@@ -390,11 +383,10 @@ def test_change_with_notification_values(client, live_server):
assert os.path.isfile("test-datastore/notification.txt"), "Notification received"
def test_data_sanity(client, live_server):
def test_data_sanity(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
test_url = url_for('test_endpoint', _external=True)
test_url2 = url_for('test_endpoint2', _external=True)
@@ -421,8 +413,7 @@ def test_data_sanity(client, live_server):
assert str(res.data.decode()).count("950.95") == 1, "Price should only show once (for the watch added, no other watches yet)"
## different test, check the edit page works on an empty request result
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
client.post(
url_for("ui.ui_views.form_quick_watch_add"),
@@ -435,11 +426,10 @@ def test_data_sanity(client, live_server):
url_for("ui.ui_edit.edit_page", uuid="first"))
assert test_url2.encode('utf-8') in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# All examples should give a prive of 666.66
def test_special_prop_examples(client, live_server):
def test_special_prop_examples(client, live_server, measure_memory_usage):
import glob

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client
extract_UUID_from_client, delete_all_watches
def set_original_cdata_xml():
@@ -110,17 +110,13 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
set_original_cdata_xml()
test_url = url_for('test_endpoint', content_type="application/atom+xml; charset=UTF-8", _external=True)
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
@@ -132,7 +128,7 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
assert b'<![' not in res.data
assert b'Hackers can access your computer' in res.data
assert b'The days of Terminator' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
@@ -180,10 +176,10 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
assert b'The days of Terminator' not in res.data # Should NOT be selected by the xpath
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
delete_all_watches(client)
def test_rss_bad_chars_breaking(client, live_server):
def test_rss_bad_chars_breaking(client, live_server, measure_memory_usage):
"""This should absolutely trigger the RSS builder to go into worst state mode
- source: prefix means no html conversion (which kinda filters out the bad stuff)

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
def set_original_cdata_xml():
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>Security Bulletins on wetscale</title>
<link>https://wetscale.com/security-bulletins/</link>
<description>Recent security bulletins from wetscale</description>
<lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate>
<docs>https://validator.w3.org/feed/docs/rss2.html</docs>
<generator>wetscale.com</generator>
<language>en-US</language>
<copyright>© 2025 wetscale Inc. All rights reserved.</copyright>
<atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/>
<item>
<title>TS-2025-005</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-005</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid>
<pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate>
<description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description>
</item>
<item>
<title>TS-2025-004</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-004</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid>
<pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate>
<description>
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
</description>
</item>
</channel>
</rss>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_rss_reader_mode(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents
delete_all_watches(client)
def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' not in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
delete_all_watches(client)

View File

@@ -5,11 +5,11 @@ from copy import copy
from datetime import datetime, timezone
from zoneinfo import ZoneInfo
from flask import url_for
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client
from .util import live_server_setup, wait_for_all_checks, extract_UUID_from_client, delete_all_watches
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
def test_check_basic_scheduler_functionality(client, live_server, measure_memory_usage):
@@ -34,13 +34,8 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
res = client.get(url_for("settings.settings_page"))
assert b'Pacific/Kiritimati' in res.data
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -92,8 +87,7 @@ def test_check_basic_scheduler_functionality(client, live_server, measure_memory
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] != last_check
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_basic_global_scheduler_functionality(client, live_server, measure_memory_usage):
@@ -101,13 +95,8 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
days = ['monday', 'tuesday', 'wednesday', 'thursday', 'friday', 'saturday', 'sunday']
test_url = url_for('test_random_content_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
@@ -180,18 +169,13 @@ def test_check_basic_global_scheduler_functionality(client, live_server, measure
assert live_server.app.config['DATASTORE'].data['watching'][uuid]['last_checked'] != last_check
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_validation_time_interval_field(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(

View File

@@ -1,7 +1,7 @@
import os
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
from .. import strtobool
@@ -100,8 +100,7 @@ def _runner_test_various_file_slash(client, file_uri):
# This will give some error from requests or if it went to chrome, will give some other error :-)
assert any(s in res.data for s in substrings)
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_file_slash_access(client, live_server, measure_memory_usage):

View File

@@ -3,7 +3,7 @@
import time
from flask import url_for
from urllib.request import urlopen
from .util import set_original_response, set_modified_response, live_server_setup
from .util import set_original_response, set_modified_response, live_server_setup, delete_all_watches
import re
sleep_time_for_fetch_thread = 3
@@ -17,13 +17,8 @@ def test_share_watch(client, live_server, measure_memory_usage):
include_filters = ".nice-filter"
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Goto the edit page, add our ignore text
# Add our URL to the import page
@@ -54,8 +49,7 @@ def test_share_watch(client, live_server, measure_memory_usage):
# Now delete what we have, we will try to re-import it
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Add our URL to the import page
res = client.post(

View File

@@ -13,13 +13,8 @@ def test_check_basic_change_detection_functionality_source(client, live_server,
set_original_response()
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
time.sleep(sleep_time_for_fetch_thread)
@@ -62,13 +57,8 @@ def test_check_ignore_elements(client, live_server, measure_memory_usage):
time.sleep(1)
test_url = 'source:'+url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)

View File

@@ -65,12 +65,8 @@ def test_trigger_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Trigger a check
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -30,12 +30,8 @@ def test_trigger_regex_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -76,5 +72,4 @@ def test_trigger_regex_functionality(client, live_server, measure_memory_usage):
assert b'has-unread-changes' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from . util import live_server_setup
from . util import live_server_setup, delete_all_watches
def set_original_ignore_response():
@@ -34,12 +34,8 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# it needs time to save the original version
time.sleep(sleep_time_for_fetch_thread)
@@ -81,5 +77,4 @@ def test_trigger_regex_functionality_with_filter(client, live_server, measure_me
assert b'has-unread-changes' in res.data
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -1,11 +1,11 @@
#!/usr/bin/env python3
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, delete_all_watches
from ..forms import REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT, REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT
def test_recheck_time_field_validation_global_settings(client, live_server):
def test_recheck_time_field_validation_global_settings(client, live_server, measure_memory_usage):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
@@ -27,7 +27,7 @@ def test_recheck_time_field_validation_global_settings(client, live_server):
assert REQUIRE_ATLEAST_ONE_TIME_PART_MESSAGE_DEFAULT.encode('utf-8') in res.data
def test_recheck_time_field_validation_single_watch(client, live_server):
def test_recheck_time_field_validation_single_watch(client, live_server, measure_memory_usage):
"""
Tests that the global settings time field has atleast one value for week/day/hours/minute/seconds etc entered
class globalSettingsRequestForm(Form):
@@ -36,13 +36,8 @@ def test_recheck_time_field_validation_single_watch(client, live_server):
test_url = url_for('test_endpoint', _external=True)
# Add our URL to the import page
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -100,7 +95,7 @@ def test_recheck_time_field_validation_single_watch(client, live_server):
assert b"Updated watch." in res.data
assert REQUIRE_ATLEAST_ONE_TIME_PART_WHEN_NOT_GLOBAL_DEFAULT.encode('utf-8') not in res.data
def test_checkbox_open_diff_in_new_tab(client, live_server):
def test_checkbox_open_diff_in_new_tab(client, live_server, measure_memory_usage):
set_original_response()
# Add our URL to the import page
@@ -171,10 +166,9 @@ def test_checkbox_open_diff_in_new_tab(client, live_server):
assert 'target=' not in target_line
# Cleanup everything
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_page_title_listing_behaviour(client, live_server):
def test_page_title_listing_behaviour(client, live_server, measure_memory_usage):
set_original_response(extra_title="custom html")
@@ -249,7 +243,7 @@ def test_page_title_listing_behaviour(client, live_server):
assert b"head titlecustom html" in res.data
def test_ui_viewed_unread_flag(client, live_server):
def test_ui_viewed_unread_flag(client, live_server, measure_memory_usage):
import time

View File

@@ -2,7 +2,7 @@
import time
from flask import url_for
from .util import live_server_setup, wait_for_all_checks
from .util import live_server_setup, wait_for_all_checks, delete_all_watches
def set_original_ignore_response():
@@ -79,12 +79,8 @@ def test_unique_lines_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -118,8 +114,7 @@ def test_unique_lines_functionality(client, live_server, measure_memory_usage):
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_sort_lines_functionality(client, live_server, measure_memory_usage):
@@ -128,12 +123,8 @@ def test_sort_lines_functionality(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -168,8 +159,7 @@ def test_sort_lines_functionality(client, live_server, measure_memory_usage):
assert res.data.find(b'A uppercase') < res.data.find(b'Z last')
assert res.data.find(b'Some initial text') < res.data.find(b'Which is across multiple lines')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_extra_filters(client, live_server, measure_memory_usage):
@@ -179,12 +169,8 @@ def test_extra_filters(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Add our URL to the import page
@@ -216,5 +202,4 @@ def test_extra_filters(client, live_server, measure_memory_usage):
# still should remain unsorted ('A - sortable line') stays at the end
assert res.data.find(b'A - sortable line') > res.data.find(b'Which is across multiple lines')
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)

View File

@@ -10,12 +10,8 @@ def test_check_watch_field_storage(client, live_server, measure_memory_usage):
test_url = "http://somerandomsitewewatch.com"
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
res = client.post(

View File

@@ -2,7 +2,7 @@
from flask import url_for
from .util import wait_for_all_checks
from .util import wait_for_all_checks, delete_all_watches
from ..processors.magic import RSS_XML_CONTENT_TYPES
@@ -113,12 +113,8 @@ def test_check_xpath_filter_utf8(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True, content_type="application/rss+xml;charset=UTF-8")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -129,8 +125,7 @@ def test_check_xpath_filter_utf8(client, live_server, measure_memory_usage):
wait_for_all_checks(client)
res = client.get(url_for("watchlist.index"))
assert b'Unicode strings with encoding declaration are not supported.' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# Handle utf-8 charset replies https://github.com/dgtlmoon/changedetection.io/pull/613
@@ -167,12 +162,8 @@ def test_check_xpath_text_function_utf8(client, live_server, measure_memory_usag
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True, content_type="application/rss+xml;charset=UTF-8")
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
@@ -193,8 +184,7 @@ def test_check_xpath_text_function_utf8(client, live_server, measure_memory_usag
assert b'Stock Alert (UK): RPi CM4' in res.data
assert b'Stock Alert (UK): Big monitor' in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_check_markup_xpath_filter_restriction(client, live_server, measure_memory_usage):
@@ -204,12 +194,8 @@ def test_check_markup_xpath_filter_restriction(client, live_server, measure_memo
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
@@ -239,19 +225,14 @@ def test_check_markup_xpath_filter_restriction(client, live_server, measure_memo
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' not in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -260,19 +241,14 @@ def test_xpath_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath23_prefix_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -281,8 +257,7 @@ def test_xpath23_prefix_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath1_lxml(client, live_server, measure_memory_usage):
@@ -317,12 +292,8 @@ def test_xpath1_lxml(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -351,12 +322,8 @@ def test_xpath1_lxml(client, live_server, measure_memory_usage):
def test_xpath1_validation(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -365,25 +332,19 @@ def test_xpath1_validation(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b"is not a valid XPath expression" in res.data
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
# actually only really used by the distll.io importer, but could be handy too
def test_check_with_prefix_include_filters(client, live_server, measure_memory_usage):
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
set_original_response()
wait_for_all_checks(client)
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -428,12 +389,8 @@ def test_various_rules(client, live_server, measure_memory_usage):
""")
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
for r in ['//div', '//a', 'xpath://div', 'xpath://a']:
@@ -452,18 +409,13 @@ def test_various_rules(client, live_server, measure_memory_usage):
res = client.get(url_for("watchlist.index"))
assert b'fetch-error' not in res.data, f"Should not see errors after '{r} filter"
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
assert b'Deleted' in res.data
delete_all_watches(client)
def test_xpath_20(client, live_server, measure_memory_usage):
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
set_original_response()
@@ -499,12 +451,8 @@ def test_xpath_20_function_count(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -536,12 +484,8 @@ def test_xpath_20_function_count2(client, live_server, measure_memory_usage):
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
@@ -573,16 +517,12 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
# Add our URL to the import page
test_url = url_for('test_endpoint', _external=True)
res = client.post(
url_for("imports.import_page"),
data={"urls": test_url},
follow_redirects=True
)
assert b"1 Imported" in res.data
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={
"include_filters": "xpath:string-join(//*[contains(@class, 'sametext')]|//*[matches(@class, 'changetext')], 'specialconjunction')",
"url": test_url,
@@ -597,7 +537,7 @@ def test_xpath_20_function_string_join_matches(client, live_server, measure_memo
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
url_for("ui.ui_views.preview_page", uuid=uuid),
follow_redirects=True
)
@@ -644,7 +584,7 @@ def _subtest_xpath_rss(client, content_type='text/html'):
client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
# Be sure all-in-the-wild types of RSS feeds work with xpath
def test_rss_xpath(client, live_server):
def test_rss_xpath(client, live_server, measure_memory_usage):
for feed_header in ['', '<?xml version="1.0" encoding="utf-8"?>']:
set_rss_atom_feed_response(header=feed_header)
for content_type in RSS_XML_CONTENT_TYPES:

View File

@@ -127,6 +127,11 @@ def extract_UUID_from_client(client):
uuid = m.group(1)
return uuid.strip()
def delete_all_watches(client=None):
uuids = list(client.application.config.get('DATASTORE').data['watching'])
for uuid in uuids:
client.application.config.get('DATASTORE').delete(uuid)
def wait_for_all_checks(client=None):
"""
@@ -135,8 +140,6 @@ def wait_for_all_checks(client=None):
"""
from changedetectionio.flask_app import update_q as global_update_q
from changedetectionio import worker_handler
logger = logging.getLogger()
empty_since = None
attempt = 0
max_attempts = 150 # Still reasonable upper bound
@@ -144,9 +147,9 @@ def wait_for_all_checks(client=None):
while attempt < max_attempts:
# Start with fast checks, slow down if needed
if attempt < 10:
time.sleep(0.1) # Very fast initial checks
time.sleep(0.2) # Very fast initial checks
elif attempt < 30:
time.sleep(0.3) # Medium speed
time.sleep(0.4) # Medium speed
else:
time.sleep(0.8) # Slower for persistent issues
@@ -322,4 +325,3 @@ def new_live_server_setup(live_server):
return resp
live_server.start()

View File

@@ -4,7 +4,7 @@ import os
from flask import url_for
from ..util import live_server_setup, wait_for_all_checks
# def test_setup(client, live_server):
# def test_setup(client, live_server, measure_memory_usage):
# live_server_setup(live_server) # Setup on conftest per function
@@ -142,7 +142,7 @@ def test_basic_browserstep(client, live_server, measure_memory_usage):
assert b"testheader: yes" in res.data
assert b"user-agent: mycustomagent" in res.data
def test_non_200_errors_report_browsersteps(client, live_server):
def test_non_200_errors_report_browsersteps(client, live_server, measure_memory_usage):
four_o_four_url = url_for('test_endpoint', status_code=404, _external=True)

View File

@@ -1,5 +1,6 @@
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
feedgen~=0.9
feedparser~=6.0 # For parsing RSS/Atom feeds
flask-compress
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
flask-login>=0.6.3
@@ -124,8 +125,9 @@ price-parser
# flask_socket_io - incorrect package name, already have flask-socketio above
# So far for detecting correct favicon type, but for other things in the future
python-magic
# Lightweight MIME type detection (saves ~14MB memory vs python-magic/libmagic)
# Used for detecting correct favicon type and content-type detection
puremagic
# Scheduler - Windows seemed to miss a lot of default timezone info (even "UTC" !)
tzdata