mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-22 01:16:12 +00:00
Compare commits
9 Commits
3482-JSON-
...
rss-reader
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0d2df7685d | ||
|
|
1f0811e54d | ||
|
|
bb35310b07 | ||
|
|
709dadc492 | ||
|
|
f02fb7406d | ||
|
|
d3725da2dc | ||
|
|
bb6d4c2756 | ||
|
|
a72b13964d | ||
|
|
b59ce190ac |
@@ -72,17 +72,24 @@
|
|||||||
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
|
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
|
||||||
</span>
|
</span>
|
||||||
</div>
|
</div>
|
||||||
<div class="pure-control-group">
|
|
||||||
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
|
||||||
</div>
|
|
||||||
<div class="pure-control-group">
|
|
||||||
{{ render_field(form.application.form.rss_content_format) }}
|
|
||||||
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
|
||||||
</div>
|
|
||||||
<div class="pure-control-group">
|
<div class="pure-control-group">
|
||||||
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
|
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
|
||||||
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
|
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
|
||||||
</div>
|
</div>
|
||||||
|
<div class="grey-form-border">
|
||||||
|
<div class="pure-control-group">
|
||||||
|
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
|
||||||
|
</div>
|
||||||
|
<div class="pure-control-group">
|
||||||
|
{{ render_field(form.application.form.rss_content_format) }}
|
||||||
|
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
|
||||||
|
</div>
|
||||||
|
<div class="pure-control-group">
|
||||||
|
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
|
||||||
|
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
|
||||||
|
</div>
|
||||||
|
</div>
|
||||||
|
|
||||||
{% if form.requests.proxy %}
|
{% if form.requests.proxy %}
|
||||||
<div class="pure-control-group inline-radio">
|
<div class="pure-control-group inline-radio">
|
||||||
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}
|
||||||
|
|||||||
@@ -940,6 +940,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
|
|||||||
strip_ignored_lines = BooleanField('Strip ignored lines')
|
strip_ignored_lines = BooleanField('Strip ignored lines')
|
||||||
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
|
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
|
||||||
validators=[validators.Optional()])
|
validators=[validators.Optional()])
|
||||||
|
|
||||||
|
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
|
||||||
|
validators=[validators.Optional()])
|
||||||
|
|
||||||
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
|
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
|
||||||
render_kw={"style": "width: 5em;"},
|
render_kw={"style": "width: 5em;"},
|
||||||
validators=[validators.NumberRange(min=0,
|
validators=[validators.NumberRange(min=0,
|
||||||
|
|||||||
@@ -303,70 +303,92 @@ def _get_stripped_text_from_json_match(match):
|
|||||||
|
|
||||||
return stripped_text_from_html
|
return stripped_text_from_html
|
||||||
|
|
||||||
|
def extract_json_blob_from_html(content, ensure_is_ldjson_info_type, json_filter):
|
||||||
|
from bs4 import BeautifulSoup
|
||||||
|
stripped_text_from_html = ''
|
||||||
|
|
||||||
|
# Foreach <script json></script> blob.. just return the first that matches json_filter
|
||||||
|
# As a last resort, try to parse the whole <body>
|
||||||
|
soup = BeautifulSoup(content, 'html.parser')
|
||||||
|
|
||||||
|
if ensure_is_ldjson_info_type:
|
||||||
|
bs_result = soup.find_all('script', {"type": "application/ld+json"})
|
||||||
|
else:
|
||||||
|
bs_result = soup.find_all('script')
|
||||||
|
bs_result += soup.find_all('body')
|
||||||
|
|
||||||
|
bs_jsons = []
|
||||||
|
|
||||||
|
for result in bs_result:
|
||||||
|
# result.text is how bs4 magically strips JSON from the body
|
||||||
|
content_start = result.text.lstrip("\ufeff").strip()[:100] if result.text else ''
|
||||||
|
# Skip empty tags, and things that dont even look like JSON
|
||||||
|
if not result.text or not (content_start[0] == '{' or content_start[0] == '['):
|
||||||
|
continue
|
||||||
|
try:
|
||||||
|
json_data = json.loads(result.text)
|
||||||
|
bs_jsons.append(json_data)
|
||||||
|
except json.JSONDecodeError:
|
||||||
|
# Skip objects which cannot be parsed
|
||||||
|
continue
|
||||||
|
|
||||||
|
if not bs_jsons:
|
||||||
|
raise JSONNotFound("No parsable JSON found in this document")
|
||||||
|
|
||||||
|
for json_data in bs_jsons:
|
||||||
|
stripped_text_from_html = _parse_json(json_data, json_filter)
|
||||||
|
|
||||||
|
if ensure_is_ldjson_info_type:
|
||||||
|
# Could sometimes be list, string or something else random
|
||||||
|
if isinstance(json_data, dict):
|
||||||
|
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
|
||||||
|
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
|
||||||
|
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
|
||||||
|
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
|
||||||
|
# 1833 - could be either str or dict, should not be anything else
|
||||||
|
|
||||||
|
t = json_data.get('@type')
|
||||||
|
if t and stripped_text_from_html:
|
||||||
|
|
||||||
|
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
|
||||||
|
break
|
||||||
|
# The non-standard part, some have a list
|
||||||
|
elif isinstance(t, list):
|
||||||
|
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
|
||||||
|
break
|
||||||
|
|
||||||
|
elif stripped_text_from_html:
|
||||||
|
break
|
||||||
|
|
||||||
|
return stripped_text_from_html
|
||||||
|
|
||||||
# content - json
|
# content - json
|
||||||
# json_filter - ie json:$..price
|
# json_filter - ie json:$..price
|
||||||
# ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector)
|
# ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector)
|
||||||
def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None):
|
def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None):
|
||||||
from bs4 import BeautifulSoup
|
|
||||||
|
|
||||||
stripped_text_from_html = False
|
stripped_text_from_html = False
|
||||||
# https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w
|
# https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w
|
||||||
# Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags
|
# Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags
|
||||||
try:
|
|
||||||
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
|
|
||||||
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff") ), json_filter)
|
|
||||||
except json.JSONDecodeError as e:
|
|
||||||
logger.warning(str(e))
|
|
||||||
|
|
||||||
# Foreach <script json></script> blob.. just return the first that matches json_filter
|
# Looks like clean JSON, dont bother extracting from HTML
|
||||||
# As a last resort, try to parse the whole <body>
|
|
||||||
soup = BeautifulSoup(content, 'html.parser')
|
|
||||||
|
|
||||||
if ensure_is_ldjson_info_type:
|
content_start = content.lstrip("\ufeff").strip()[:100]
|
||||||
bs_result = soup.find_all('script', {"type": "application/ld+json"})
|
|
||||||
else:
|
|
||||||
bs_result = soup.find_all('script')
|
|
||||||
bs_result += soup.find_all('body')
|
|
||||||
|
|
||||||
bs_jsons = []
|
if content_start[0] == '{' or content_start[0] == '[':
|
||||||
for result in bs_result:
|
try:
|
||||||
# Skip empty tags, and things that dont even look like JSON
|
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
|
||||||
if not result.text or '{' not in result.text:
|
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff")), json_filter)
|
||||||
continue
|
except json.JSONDecodeError as e:
|
||||||
try:
|
logger.warning(f"Error processing JSON {content[:20]}...{str(e)})")
|
||||||
json_data = json.loads(result.text)
|
else:
|
||||||
bs_jsons.append(json_data)
|
# Probably something else, go fish inside for it
|
||||||
except json.JSONDecodeError:
|
try:
|
||||||
# Skip objects which cannot be parsed
|
stripped_text_from_html = extract_json_blob_from_html(content=content,
|
||||||
continue
|
ensure_is_ldjson_info_type=ensure_is_ldjson_info_type,
|
||||||
|
json_filter=json_filter )
|
||||||
if not bs_jsons:
|
except json.JSONDecodeError as e:
|
||||||
raise JSONNotFound("No parsable JSON found in this document")
|
logger.warning(f"Error processing JSON while extracting JSON from HTML blob {content[:20]}...{str(e)})")
|
||||||
|
|
||||||
for json_data in bs_jsons:
|
|
||||||
stripped_text_from_html = _parse_json(json_data, json_filter)
|
|
||||||
|
|
||||||
if ensure_is_ldjson_info_type:
|
|
||||||
# Could sometimes be list, string or something else random
|
|
||||||
if isinstance(json_data, dict):
|
|
||||||
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
|
|
||||||
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
|
|
||||||
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
|
|
||||||
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
|
|
||||||
# 1833 - could be either str or dict, should not be anything else
|
|
||||||
|
|
||||||
t = json_data.get('@type')
|
|
||||||
if t and stripped_text_from_html:
|
|
||||||
|
|
||||||
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
|
|
||||||
break
|
|
||||||
# The non-standard part, some have a list
|
|
||||||
elif isinstance(t, list):
|
|
||||||
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
|
|
||||||
break
|
|
||||||
|
|
||||||
elif stripped_text_from_html:
|
|
||||||
break
|
|
||||||
|
|
||||||
if not stripped_text_from_html:
|
if not stripped_text_from_html:
|
||||||
# Re 265 - Just return an empty string when filter not found
|
# Re 265 - Just return an empty string when filter not found
|
||||||
|
|||||||
@@ -55,6 +55,7 @@ class model(dict):
|
|||||||
'rss_access_token': None,
|
'rss_access_token': None,
|
||||||
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
'rss_content_format': RSS_FORMAT_TYPES[0][0],
|
||||||
'rss_hide_muted_watches': True,
|
'rss_hide_muted_watches': True,
|
||||||
|
'rss_reader_mode': False,
|
||||||
'schema_version' : 0,
|
'schema_version' : 0,
|
||||||
'shared_diff_access': False,
|
'shared_diff_access': False,
|
||||||
'strip_ignored_lines': False,
|
'strip_ignored_lines': False,
|
||||||
|
|||||||
@@ -94,24 +94,21 @@ class guess_stream_type():
|
|||||||
self.is_rss = True
|
self.is_rss = True
|
||||||
elif any(s in http_content_header for s in JSON_CONTENT_TYPES):
|
elif any(s in http_content_header for s in JSON_CONTENT_TYPES):
|
||||||
self.is_json = True
|
self.is_json = True
|
||||||
|
elif 'pdf' in magic_content_header:
|
||||||
|
self.is_pdf = True
|
||||||
|
elif has_html_patterns or http_content_header == 'text/html':
|
||||||
|
self.is_html = True
|
||||||
|
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
|
||||||
|
self.is_json = True
|
||||||
|
# magic will call a rss document 'xml'
|
||||||
|
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||||
|
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||||
|
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
|
||||||
|
self.is_rss = True
|
||||||
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
|
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
|
||||||
# Only mark as generic XML if not already detected as RSS
|
# Only mark as generic XML if not already detected as RSS
|
||||||
if not self.is_rss:
|
if not self.is_rss:
|
||||||
self.is_xml = True
|
self.is_xml = True
|
||||||
elif 'pdf' in magic_content_header:
|
|
||||||
self.is_pdf = True
|
|
||||||
###
|
|
||||||
elif has_html_patterns or http_content_header == 'text/html':
|
|
||||||
self.is_html = True
|
|
||||||
# If magic says text/plain and we found no HTML patterns, trust it
|
|
||||||
elif magic_result == 'text/plain':
|
|
||||||
self.is_plaintext = True
|
|
||||||
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")
|
|
||||||
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
|
|
||||||
self.is_json = True
|
|
||||||
# magic will call a rss document 'xml'
|
|
||||||
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
|
|
||||||
self.is_rss = True
|
|
||||||
elif test_content_normalized.startswith('<?xml') or any(s in magic_content_header for s in XML_CONTENT_TYPES):
|
elif test_content_normalized.startswith('<?xml') or any(s in magic_content_header for s in XML_CONTENT_TYPES):
|
||||||
# Generic XML that's not RSS/Atom (RSS/Atom checked above)
|
# Generic XML that's not RSS/Atom (RSS/Atom checked above)
|
||||||
self.is_xml = True
|
self.is_xml = True
|
||||||
@@ -122,4 +119,8 @@ class guess_stream_type():
|
|||||||
# Only trust magic for 'text' if no other patterns matched
|
# Only trust magic for 'text' if no other patterns matched
|
||||||
elif 'text' in magic_content_header:
|
elif 'text' in magic_content_header:
|
||||||
self.is_plaintext = True
|
self.is_plaintext = True
|
||||||
|
# If magic says text/plain and we found no HTML patterns, trust it
|
||||||
|
elif magic_result == 'text/plain':
|
||||||
|
self.is_plaintext = True
|
||||||
|
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")
|
||||||
|
|
||||||
|
|||||||
@@ -20,7 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|||||||
name = 'Webpage Text/HTML, JSON and PDF changes'
|
name = 'Webpage Text/HTML, JSON and PDF changes'
|
||||||
description = 'Detects all text changes where possible'
|
description = 'Detects all text changes where possible'
|
||||||
|
|
||||||
json_filter_prefixes = ['json:', 'jq:', 'jqraw:']
|
JSON_FILTER_PREFIXES = ['json:', 'jq:', 'jqraw:']
|
||||||
|
|
||||||
# Assume it's this type if the server says nothing on content-type
|
# Assume it's this type if the server says nothing on content-type
|
||||||
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
|
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
|
||||||
@@ -99,6 +99,10 @@ class FilterConfig:
|
|||||||
def has_include_filters(self):
|
def has_include_filters(self):
|
||||||
return bool(self.include_filters) and bool(self.include_filters[0].strip())
|
return bool(self.include_filters) and bool(self.include_filters[0].strip())
|
||||||
|
|
||||||
|
@property
|
||||||
|
def has_include_json_filters(self):
|
||||||
|
return any(f.strip().startswith(prefix) for f in self.include_filters for prefix in JSON_FILTER_PREFIXES)
|
||||||
|
|
||||||
@property
|
@property
|
||||||
def has_subtractive_selectors(self):
|
def has_subtractive_selectors(self):
|
||||||
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
|
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
|
||||||
@@ -224,8 +228,21 @@ class ContentProcessor:
|
|||||||
self.datastore = datastore
|
self.datastore = datastore
|
||||||
|
|
||||||
def preprocess_rss(self, content):
|
def preprocess_rss(self, content):
|
||||||
"""Convert CDATA/comments in RSS to usable text."""
|
"""
|
||||||
return cdata_in_document_to_text(html_content=content)
|
Convert CDATA/comments in RSS to usable text.
|
||||||
|
|
||||||
|
Supports two RSS processing modes:
|
||||||
|
- 'default': Inline CDATA replacement (original behavior)
|
||||||
|
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
|
||||||
|
"""
|
||||||
|
from changedetectionio import rss_tools
|
||||||
|
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
|
||||||
|
if rss_mode:
|
||||||
|
# Format RSS items nicely with CDATA content unmarked and converted to text
|
||||||
|
return rss_tools.format_rss_items(content)
|
||||||
|
else:
|
||||||
|
# Default: Original inline CDATA replacement
|
||||||
|
return cdata_in_document_to_text(html_content=content)
|
||||||
|
|
||||||
def preprocess_pdf(self, raw_content):
|
def preprocess_pdf(self, raw_content):
|
||||||
"""Convert PDF to HTML using external tool."""
|
"""Convert PDF to HTML using external tool."""
|
||||||
@@ -255,15 +272,14 @@ class ContentProcessor:
|
|||||||
)
|
)
|
||||||
return html_content.replace('</body>', metadata + '</body>')
|
return html_content.replace('</body>', metadata + '</body>')
|
||||||
|
|
||||||
def preprocess_json(self, content, has_filters):
|
def preprocess_json(self, raw_content):
|
||||||
"""Format and sort JSON content."""
|
"""Format and sort JSON content."""
|
||||||
# Force reformat if no filters specified
|
# Then we re-format it, else it does have filters (later on) which will reformat it anyway
|
||||||
if not has_filters:
|
content = html_tools.extract_json_as_string(content=raw_content, json_filter="json:$")
|
||||||
content = html_tools.extract_json_as_string(content=content, json_filter="json:$")
|
|
||||||
|
|
||||||
# Sort JSON to avoid false alerts from reordering
|
# Sort JSON to avoid false alerts from reordering
|
||||||
try:
|
try:
|
||||||
content = json.dumps(json.loads(content), sort_keys=True)
|
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
|
||||||
except Exception:
|
except Exception:
|
||||||
# Might be malformed JSON, continue anyway
|
# Might be malformed JSON, continue anyway
|
||||||
pass
|
pass
|
||||||
@@ -294,7 +310,7 @@ class ContentProcessor:
|
|||||||
)
|
)
|
||||||
|
|
||||||
# JSON filters
|
# JSON filters
|
||||||
elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes):
|
elif any(filter_rule.startswith(prefix) for prefix in JSON_FILTER_PREFIXES):
|
||||||
filtered_content += html_tools.extract_json_as_string(
|
filtered_content += html_tools.extract_json_as_string(
|
||||||
content=content,
|
content=content,
|
||||||
json_filter=filter_rule
|
json_filter=filter_rule
|
||||||
@@ -381,15 +397,23 @@ class perform_site_check(difference_detection_processor):
|
|||||||
# RSS preprocessing
|
# RSS preprocessing
|
||||||
if stream_content_type.is_rss:
|
if stream_content_type.is_rss:
|
||||||
content = content_processor.preprocess_rss(content)
|
content = content_processor.preprocess_rss(content)
|
||||||
|
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
|
||||||
|
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
|
||||||
|
stream_content_type.is_rss = False
|
||||||
|
stream_content_type.is_html = True
|
||||||
|
self.fetcher.content = content
|
||||||
|
|
||||||
# PDF preprocessing
|
# PDF preprocessing
|
||||||
if watch.is_pdf or stream_content_type.is_pdf:
|
if watch.is_pdf or stream_content_type.is_pdf:
|
||||||
content = content_processor.preprocess_pdf(raw_content=self.fetcher.raw_content)
|
content = content_processor.preprocess_pdf(raw_content=self.fetcher.raw_content)
|
||||||
stream_content_type.is_html = True
|
stream_content_type.is_html = True
|
||||||
|
|
||||||
# JSON preprocessing
|
# JSON - Always reformat it nicely for consistency.
|
||||||
|
|
||||||
if stream_content_type.is_json:
|
if stream_content_type.is_json:
|
||||||
content = content_processor.preprocess_json(content, filter_config.has_include_filters)
|
if not filter_config.has_include_json_filters:
|
||||||
|
content = content_processor.preprocess_json(raw_content=content)
|
||||||
|
#else, otherwise it gets sorted/formatted in the filter stage anyway
|
||||||
|
|
||||||
# HTML obfuscation workarounds
|
# HTML obfuscation workarounds
|
||||||
if stream_content_type.is_html:
|
if stream_content_type.is_html:
|
||||||
|
|||||||
130
changedetectionio/rss_tools.py
Normal file
130
changedetectionio/rss_tools.py
Normal file
@@ -0,0 +1,130 @@
|
|||||||
|
"""
|
||||||
|
RSS/Atom feed processing tools for changedetection.io
|
||||||
|
"""
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
import re
|
||||||
|
|
||||||
|
|
||||||
|
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
|
||||||
|
"""
|
||||||
|
Process CDATA sections in HTML/XML content - inline replacement.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
html_content: The HTML/XML content to process
|
||||||
|
render_anchor_tag_content: Whether to render anchor tag content
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Processed HTML/XML content with CDATA sections replaced inline
|
||||||
|
"""
|
||||||
|
from xml.sax.saxutils import escape as xml_escape
|
||||||
|
from .html_tools import html_to_text
|
||||||
|
|
||||||
|
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
|
||||||
|
|
||||||
|
def repl(m):
|
||||||
|
text = m.group(1)
|
||||||
|
return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip()
|
||||||
|
|
||||||
|
return re.sub(pattern, repl, html_content)
|
||||||
|
|
||||||
|
|
||||||
|
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
|
||||||
|
"""
|
||||||
|
Format RSS/Atom feed items in a readable text format using feedparser.
|
||||||
|
|
||||||
|
Converts RSS <item> or Atom <entry> elements to formatted text with:
|
||||||
|
- <title> → <h1>Title</h1>
|
||||||
|
- <link> → Link: [url]
|
||||||
|
- <guid> → Guid: [id]
|
||||||
|
- <pubDate> → PubDate: [date]
|
||||||
|
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
|
||||||
|
|
||||||
|
Args:
|
||||||
|
rss_content: The RSS/Atom feed content
|
||||||
|
render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
Formatted HTML content ready for html_to_text conversion
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
import feedparser
|
||||||
|
from xml.sax.saxutils import escape as xml_escape
|
||||||
|
|
||||||
|
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
|
||||||
|
feed = feedparser.parse(rss_content)
|
||||||
|
|
||||||
|
formatted_items = []
|
||||||
|
|
||||||
|
# Determine feed type for appropriate labels when fields are missing
|
||||||
|
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
|
||||||
|
is_atom = feed.version and 'atom' in feed.version
|
||||||
|
|
||||||
|
for entry in feed.entries:
|
||||||
|
item_parts = []
|
||||||
|
|
||||||
|
# Title - feedparser handles CDATA and entity unescaping automatically
|
||||||
|
if hasattr(entry, 'title') and entry.title:
|
||||||
|
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
|
||||||
|
|
||||||
|
# Link
|
||||||
|
if hasattr(entry, 'link') and entry.link:
|
||||||
|
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
|
||||||
|
|
||||||
|
# GUID/ID
|
||||||
|
if hasattr(entry, 'id') and entry.id:
|
||||||
|
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
|
||||||
|
|
||||||
|
# Date - feedparser normalizes all date field names to 'published'
|
||||||
|
if hasattr(entry, 'published') and entry.published:
|
||||||
|
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
|
||||||
|
|
||||||
|
# Description/Content - feedparser handles CDATA and entity unescaping automatically
|
||||||
|
# Only add "Summary:" label for Atom <summary> tags
|
||||||
|
content = None
|
||||||
|
add_label = False
|
||||||
|
|
||||||
|
if hasattr(entry, 'content') and entry.content:
|
||||||
|
# Atom <content> - no label, just content
|
||||||
|
content = entry.content[0].value if entry.content[0].value else None
|
||||||
|
elif hasattr(entry, 'summary'):
|
||||||
|
# Could be RSS <description> or Atom <summary>
|
||||||
|
# feedparser maps both to entry.summary
|
||||||
|
content = entry.summary if entry.summary else None
|
||||||
|
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
|
||||||
|
if is_atom:
|
||||||
|
add_label = True
|
||||||
|
|
||||||
|
# Add content with or without label
|
||||||
|
if content:
|
||||||
|
if add_label:
|
||||||
|
item_parts.append(f'Summary:<br>{content}')
|
||||||
|
else:
|
||||||
|
item_parts.append(content)
|
||||||
|
else:
|
||||||
|
# No content - just show <none>
|
||||||
|
item_parts.append('<none>')
|
||||||
|
|
||||||
|
# Join all parts of this item
|
||||||
|
if item_parts:
|
||||||
|
formatted_items.append('\n'.join(item_parts))
|
||||||
|
|
||||||
|
# Wrap each item in a div with classes (first, last, item-N)
|
||||||
|
items_html = []
|
||||||
|
total_items = len(formatted_items)
|
||||||
|
for idx, item in enumerate(formatted_items):
|
||||||
|
classes = ['rss-item']
|
||||||
|
if idx == 0:
|
||||||
|
classes.append('first')
|
||||||
|
if idx == total_items - 1:
|
||||||
|
classes.append('last')
|
||||||
|
classes.append(f'item-{idx + 1}')
|
||||||
|
|
||||||
|
class_str = ' '.join(classes)
|
||||||
|
items_html.append(f'<div class="{class_str}">{item}</div>')
|
||||||
|
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
|
||||||
|
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Error formatting RSS items: {str(e)}")
|
||||||
|
# Fall back to original content
|
||||||
|
return rss_content
|
||||||
@@ -344,7 +344,7 @@ label {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#notification-customisation {
|
.grey-form-border {
|
||||||
border: 1px solid var(--color-border-notification);
|
border: 1px solid var(--color-border-notification);
|
||||||
padding: 0.5rem;
|
padding: 0.5rem;
|
||||||
border-radius: 5px;
|
border-radius: 5px;
|
||||||
|
|||||||
File diff suppressed because one or more lines are too long
@@ -33,7 +33,7 @@
|
|||||||
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
|
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
|
||||||
</div>
|
</div>
|
||||||
</div>
|
</div>
|
||||||
<div id="notification-customisation" class="pure-control-group">
|
<div class="pure-control-group grey-form-border">
|
||||||
<div class="pure-control-group">
|
<div class="pure-control-group">
|
||||||
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
|
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
|
||||||
<span class="pure-form-message-inline">Title for all notifications</span>
|
<span class="pure-form-message-inline">Title for all notifications</span>
|
||||||
|
|||||||
@@ -113,14 +113,8 @@ def set_original_ext_response():
|
|||||||
return None
|
return None
|
||||||
|
|
||||||
def set_modified_ext_response():
|
def set_modified_ext_response():
|
||||||
data = """
|
# This should get reformatted
|
||||||
[
|
data = """ [ { "isPriceLowered": false, "status": "Sold", "statusOrig": "sold" }, {
|
||||||
{
|
|
||||||
"isPriceLowered": false,
|
|
||||||
"status": "Sold",
|
|
||||||
"statusOrig": "sold"
|
|
||||||
},
|
|
||||||
{
|
|
||||||
"_id": "5e7b3e1fb3262d306323ff1e",
|
"_id": "5e7b3e1fb3262d306323ff1e",
|
||||||
"listingsType": "consumer",
|
"listingsType": "consumer",
|
||||||
"isPriceLowered": false,
|
"isPriceLowered": false,
|
||||||
@@ -230,30 +224,15 @@ def check_json_filter(json_filter, client, live_server):
|
|||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
|
||||||
# Add our URL to the import page
|
|
||||||
res = client.post(
|
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
|
||||||
data={"include_filters": json_filter,
|
|
||||||
"url": test_url,
|
|
||||||
"tags": "",
|
|
||||||
"headers": "",
|
|
||||||
"fetch_backend": "html_requests",
|
|
||||||
"time_between_check_use_default": "y"
|
|
||||||
},
|
|
||||||
follow_redirects=True
|
|
||||||
)
|
|
||||||
assert b"Updated watch." in res.data
|
|
||||||
|
|
||||||
# Check it saved
|
# Check it saved
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
)
|
)
|
||||||
assert bytes(escape(json_filter).encode('utf-8')) in res.data
|
assert bytes(escape(json_filter).encode('utf-8')) in res.data
|
||||||
|
|
||||||
@@ -272,7 +251,7 @@ def check_json_filter(json_filter, client, live_server):
|
|||||||
assert b'has-unread-changes' in res.data
|
assert b'has-unread-changes' in res.data
|
||||||
|
|
||||||
# Should not see this, because its not in the JSONPath we entered
|
# Should not see this, because its not in the JSONPath we entered
|
||||||
res = client.get(url_for("ui.ui_views.diff_history_page", uuid="first"))
|
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
|
||||||
|
|
||||||
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
|
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
|
||||||
# And #462 - check we see the proper utf-8 string there
|
# And #462 - check we see the proper utf-8 string there
|
||||||
@@ -294,32 +273,12 @@ def test_check_jqraw_filter(client, live_server, measure_memory_usage):
|
|||||||
def check_json_filter_bool_val(json_filter, client, live_server):
|
def check_json_filter_bool_val(json_filter, client, live_server):
|
||||||
set_original_response()
|
set_original_response()
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
|
|
||||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": [json_filter]})
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
# Goto the edit page, add our ignore text
|
|
||||||
# Add our URL to the import page
|
|
||||||
res = client.post(
|
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
|
||||||
data={"include_filters": json_filter,
|
|
||||||
"url": test_url,
|
|
||||||
"tags": "",
|
|
||||||
"headers": "",
|
|
||||||
"fetch_backend": "html_requests",
|
|
||||||
"time_between_check_use_default": "y"
|
|
||||||
},
|
|
||||||
follow_redirects=True
|
|
||||||
)
|
|
||||||
assert b"Updated watch." in res.data
|
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
|
||||||
wait_for_all_checks(client)
|
|
||||||
# Make a change
|
# Make a change
|
||||||
set_modified_response()
|
set_modified_response()
|
||||||
|
|
||||||
@@ -353,21 +312,16 @@ def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage):
|
|||||||
def check_json_ext_filter(json_filter, client, live_server):
|
def check_json_ext_filter(json_filter, client, live_server):
|
||||||
set_original_ext_response()
|
set_original_ext_response()
|
||||||
|
|
||||||
# Give the endpoint time to spin up
|
|
||||||
time.sleep(1)
|
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
|
||||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
# Give the thread time to pick it up
|
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
# Goto the edit page, add our ignore text
|
# Goto the edit page, add our ignore text
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
res = client.post(
|
res = client.post(
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
data={"include_filters": json_filter,
|
data={"include_filters": json_filter,
|
||||||
"url": test_url,
|
"url": test_url,
|
||||||
"tags": "",
|
"tags": "",
|
||||||
@@ -381,7 +335,7 @@ def check_json_ext_filter(json_filter, client, live_server):
|
|||||||
|
|
||||||
# Check it saved
|
# Check it saved
|
||||||
res = client.get(
|
res = client.get(
|
||||||
url_for("ui.ui_edit.edit_page", uuid="first"),
|
url_for("ui.ui_edit.edit_page", uuid=uuid),
|
||||||
)
|
)
|
||||||
assert bytes(escape(json_filter).encode('utf-8')) in res.data
|
assert bytes(escape(json_filter).encode('utf-8')) in res.data
|
||||||
|
|
||||||
@@ -395,6 +349,12 @@ def check_json_ext_filter(json_filter, client, live_server):
|
|||||||
# Give the thread time to pick it up
|
# Give the thread time to pick it up
|
||||||
wait_for_all_checks(client)
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||||
|
|
||||||
|
assert snapshot_contents[0] == '['
|
||||||
|
|
||||||
# It should have 'has-unread-changes'
|
# It should have 'has-unread-changes'
|
||||||
res = client.get(url_for("watchlist.index"))
|
res = client.get(url_for("watchlist.index"))
|
||||||
assert b'has-unread-changes' in res.data
|
assert b'has-unread-changes' in res.data
|
||||||
@@ -456,7 +416,7 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
|
|||||||
# Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
|
# Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
|
||||||
# Specify extra html that JSON is sometimes wrapped in - when using SockpuppetBrowser / Puppeteer / Playwrightetc
|
# Specify extra html that JSON is sometimes wrapped in - when using SockpuppetBrowser / Puppeteer / Playwrightetc
|
||||||
with open("test-datastore/endpoint-content.txt", "w") as f:
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
f.write('<html><body>{"hello" : 123, "world": 123}')
|
f.write('<html><body>{ "world": 123, "hello" : 123}')
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
# Check weird casing is cleaned up and detected also
|
# Check weird casing is cleaned up and detected also
|
||||||
@@ -474,8 +434,18 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
|
|||||||
follow_redirects=True
|
follow_redirects=True
|
||||||
)
|
)
|
||||||
|
|
||||||
assert b'"hello": 123,' in res.data
|
|
||||||
assert b'"world": 123' in res.data
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||||
|
|
||||||
|
assert b'"hello": 123,' in res.data # properly html escaped in the front end
|
||||||
|
|
||||||
|
# Should be correctly formatted and sorted, ("world" goes to end)
|
||||||
|
assert snapshot_contents == """{
|
||||||
|
"hello": 123,
|
||||||
|
"world": 123
|
||||||
|
}"""
|
||||||
|
|
||||||
delete_all_watches(client)
|
delete_all_watches(client)
|
||||||
|
|
||||||
|
|||||||
@@ -110,8 +110,9 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
|
|||||||
|
|
||||||
|
|
||||||
set_original_cdata_xml()
|
set_original_cdata_xml()
|
||||||
|
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||||
test_url = url_for('test_endpoint', content_type="application/atom+xml; charset=UTF-8", _external=True)
|
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||||
|
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
|
||||||
|
|
||||||
# Add our URL to the import page
|
# Add our URL to the import page
|
||||||
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
|
|||||||
98
changedetectionio/tests/test_rss_reader_mode.py
Normal file
98
changedetectionio/tests/test_rss_reader_mode.py
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
|
||||||
|
import time
|
||||||
|
from flask import url_for
|
||||||
|
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
|
||||||
|
extract_UUID_from_client, delete_all_watches
|
||||||
|
|
||||||
|
|
||||||
|
def set_original_cdata_xml():
|
||||||
|
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
|
||||||
|
<channel>
|
||||||
|
<title>Security Bulletins on wetscale</title>
|
||||||
|
<link>https://wetscale.com/security-bulletins/</link>
|
||||||
|
<description>Recent security bulletins from wetscale</description>
|
||||||
|
<lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate>
|
||||||
|
<docs>https://validator.w3.org/feed/docs/rss2.html</docs>
|
||||||
|
<generator>wetscale.com</generator>
|
||||||
|
<language>en-US</language>
|
||||||
|
<copyright>© 2025 wetscale Inc. All rights reserved.</copyright>
|
||||||
|
<atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/>
|
||||||
|
<item>
|
||||||
|
<title>TS-2025-005</title>
|
||||||
|
<link>https://wetscale.com/security-bulletins/#ts-2025-005</link>
|
||||||
|
<guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid>
|
||||||
|
<pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate>
|
||||||
|
<description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description>
|
||||||
|
</item>
|
||||||
|
|
||||||
|
|
||||||
|
<item>
|
||||||
|
<title>TS-2025-004</title>
|
||||||
|
<link>https://wetscale.com/security-bulletins/#ts-2025-004</link>
|
||||||
|
<guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid>
|
||||||
|
<pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate>
|
||||||
|
<description>
|
||||||
|
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
|
||||||
|
</description>
|
||||||
|
</item>
|
||||||
|
</channel>
|
||||||
|
</rss>
|
||||||
|
"""
|
||||||
|
|
||||||
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
|
f.write(test_return_data)
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
def test_rss_reader_mode(client, live_server, measure_memory_usage):
|
||||||
|
set_original_cdata_xml()
|
||||||
|
|
||||||
|
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||||
|
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||||
|
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
|
||||||
|
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
|
||||||
|
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
|
||||||
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||||
|
assert 'Wet noodles escape' in snapshot_contents
|
||||||
|
assert '<br>' not in snapshot_contents
|
||||||
|
assert '<' not in snapshot_contents
|
||||||
|
assert 'The days of Terminator and The Matrix' in snapshot_contents
|
||||||
|
assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents
|
||||||
|
delete_all_watches(client)
|
||||||
|
|
||||||
|
def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage):
|
||||||
|
set_original_cdata_xml()
|
||||||
|
|
||||||
|
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
|
||||||
|
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
|
||||||
|
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
|
||||||
|
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
|
||||||
|
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
|
||||||
|
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
|
||||||
|
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
|
||||||
|
dates = list(watch.history.keys())
|
||||||
|
snapshot_contents = watch.get_history_snapshot(dates[0])
|
||||||
|
assert 'Wet noodles escape' not in snapshot_contents
|
||||||
|
assert '<br>' not in snapshot_contents
|
||||||
|
assert '<' not in snapshot_contents
|
||||||
|
assert 'The days of Terminator and The Matrix' in snapshot_contents
|
||||||
|
delete_all_watches(client)
|
||||||
|
|
||||||
@@ -1,5 +1,6 @@
|
|||||||
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
|
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
|
||||||
feedgen~=0.9
|
feedgen~=0.9
|
||||||
|
feedparser~=6.0 # For parsing RSS/Atom feeds
|
||||||
flask-compress
|
flask-compress
|
||||||
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
|
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
|
||||||
flask-login>=0.6.3
|
flask-login>=0.6.3
|
||||||
|
|||||||
Reference in New Issue
Block a user