Compare commits

...

12 Commits

Author SHA1 Message Date
dgtlmoon
0d2df7685d bump text 2025-10-10 17:36:28 +02:00
dgtlmoon
1f0811e54d Also support RDF 2025-10-10 17:34:22 +02:00
dgtlmoon
bb35310b07 format tweak 2025-10-10 17:29:20 +02:00
dgtlmoon
709dadc492 Ability to apply filters (first, last etc) 2025-10-10 17:26:20 +02:00
dgtlmoon
f02fb7406d Feature - RSS reader mode 2025-10-10 17:11:59 +02:00
dgtlmoon
d3725da2dc Merge branch 'master' into rss-reader-mode 2025-10-10 16:29:39 +02:00
dgtlmoon
bb6d4c2756 Re #3486 - Fixing and adding test for RSS/Atom not being converted to text when server sends "text/xml" instead of the "application/atom+xml" header (#3487) 2025-10-10 16:29:02 +02:00
dgtlmoon
a72b13964d Adding 'rss reader mode' 2025-10-10 16:28:20 +02:00
dgtlmoon
b59ce190ac Ensure JSON is always correctly reformatted with padding (#3485 #3482) 2025-10-10 16:00:32 +02:00
dgtlmoon
80be1a30f2 No need to reformat/reprocess content in the case that no filters were found (#3484, #3483)
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built 📦 package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
2025-10-10 13:44:49 +02:00
dgtlmoon
93b4f79006 0.50.20 2025-10-10 10:40:04 +02:00
dgtlmoon
3009e46617 PDF - Will trigger a change - Fixing output, also reported original size of document was incorrect (it was the size of the HTML output after conversion from PDF), Improving tests (#3481) 2025-10-10 10:38:34 +02:00
16 changed files with 454 additions and 168 deletions

View File

@@ -2,7 +2,7 @@
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
__version__ = '0.50.19'
__version__ = '0.50.20'
from changedetectionio.strtobool import strtobool
from json.decoder import JSONDecodeError

View File

@@ -72,17 +72,24 @@
<span class="pure-form-message-inline">Allow access to view watch diff page when password is enabled (Good for sharing the diff page)
</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.empty_pages_are_a_change) }}
<span class="pure-form-message-inline">When a request returns no content, or the HTML does not contain any text, is this considered a change?</span>
</div>
<div class="grey-form-border">
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_hide_muted_watches) }}
</div>
<div class="pure-control-group">
{{ render_field(form.application.form.rss_content_format) }}
<span class="pure-form-message-inline">Love RSS? Does your reader support HTML? Set it here</span>
</div>
<div class="pure-control-group">
{{ render_checkbox_field(form.application.form.rss_reader_mode) }}
<span class="pure-form-message-inline">Transforms RSS/RDF feed watches into beautiful text only</span>
</div>
</div>
{% if form.requests.proxy %}
<div class="pure-control-group inline-radio">
{{ render_field(form.requests.form.proxy, class="fetch-backend-proxy") }}

View File

@@ -940,6 +940,10 @@ class globalSettingsApplicationForm(commonSettingsForm):
strip_ignored_lines = BooleanField('Strip ignored lines')
rss_hide_muted_watches = BooleanField('Hide muted watches from RSS feed', default=True,
validators=[validators.Optional()])
rss_reader_mode = BooleanField('RSS reader mode ', default=False,
validators=[validators.Optional()])
filter_failure_notification_threshold_attempts = IntegerField('Number of times the filter can be missing before sending a notification',
render_kw={"style": "width: 5em;"},
validators=[validators.NumberRange(min=0,

View File

@@ -1,5 +1,4 @@
from loguru import logger
from lxml import etree
from typing import List
import html
import json
@@ -58,13 +57,17 @@ def include_filters(include_filters, html_content, append_pretty_line_formatting
return html_block
def subtractive_css_selector(css_selector, html_content):
def subtractive_css_selector(css_selector, content):
from bs4 import BeautifulSoup
soup = BeautifulSoup(html_content, "html.parser")
soup = BeautifulSoup(content, "html.parser")
# So that the elements dont shift their index, build a list of elements here which will be pointers to their place in the DOM
elements_to_remove = soup.select(css_selector)
if not elements_to_remove:
# Better to return the original that rebuild with BeautifulSoup
return content
# Then, remove them in a separate loop
for item in elements_to_remove:
item.decompose()
@@ -72,6 +75,7 @@ def subtractive_css_selector(css_selector, html_content):
return str(soup)
def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
from lxml import etree
# Parse the HTML content using lxml
html_tree = etree.HTML(html_content)
@@ -83,6 +87,10 @@ def subtractive_xpath_selector(selectors: List[str], html_content: str) -> str:
# Collect elements for each selector
elements_to_remove.extend(html_tree.xpath(selector))
# If no elements were found, return the original HTML content
if not elements_to_remove:
return html_content
# Then, remove them in a separate loop
for element in elements_to_remove:
if element.getparent() is not None: # Ensure the element has a parent before removing
@@ -295,70 +303,92 @@ def _get_stripped_text_from_json_match(match):
return stripped_text_from_html
def extract_json_blob_from_html(content, ensure_is_ldjson_info_type, json_filter):
from bs4 import BeautifulSoup
stripped_text_from_html = ''
# Foreach <script json></script> blob.. just return the first that matches json_filter
# As a last resort, try to parse the whole <body>
soup = BeautifulSoup(content, 'html.parser')
if ensure_is_ldjson_info_type:
bs_result = soup.find_all('script', {"type": "application/ld+json"})
else:
bs_result = soup.find_all('script')
bs_result += soup.find_all('body')
bs_jsons = []
for result in bs_result:
# result.text is how bs4 magically strips JSON from the body
content_start = result.text.lstrip("\ufeff").strip()[:100] if result.text else ''
# Skip empty tags, and things that dont even look like JSON
if not result.text or not (content_start[0] == '{' or content_start[0] == '['):
continue
try:
json_data = json.loads(result.text)
bs_jsons.append(json_data)
except json.JSONDecodeError:
# Skip objects which cannot be parsed
continue
if not bs_jsons:
raise JSONNotFound("No parsable JSON found in this document")
for json_data in bs_jsons:
stripped_text_from_html = _parse_json(json_data, json_filter)
if ensure_is_ldjson_info_type:
# Could sometimes be list, string or something else random
if isinstance(json_data, dict):
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
# 1833 - could be either str or dict, should not be anything else
t = json_data.get('@type')
if t and stripped_text_from_html:
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
break
# The non-standard part, some have a list
elif isinstance(t, list):
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
break
elif stripped_text_from_html:
break
return stripped_text_from_html
# content - json
# json_filter - ie json:$..price
# ensure_is_ldjson_info_type - str "product", optional, "@type == product" (I dont know how to do that as a json selector)
def extract_json_as_string(content, json_filter, ensure_is_ldjson_info_type=None):
from bs4 import BeautifulSoup
stripped_text_from_html = False
# https://github.com/dgtlmoon/changedetection.io/pull/2041#issuecomment-1848397161w
# Try to parse/filter out the JSON, if we get some parser error, then maybe it's embedded within HTML tags
try:
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff") ), json_filter)
except json.JSONDecodeError as e:
logger.warning(str(e))
# Foreach <script json></script> blob.. just return the first that matches json_filter
# As a last resort, try to parse the whole <body>
soup = BeautifulSoup(content, 'html.parser')
# Looks like clean JSON, dont bother extracting from HTML
if ensure_is_ldjson_info_type:
bs_result = soup.find_all('script', {"type": "application/ld+json"})
else:
bs_result = soup.find_all('script')
bs_result += soup.find_all('body')
content_start = content.lstrip("\ufeff").strip()[:100]
bs_jsons = []
for result in bs_result:
# Skip empty tags, and things that dont even look like JSON
if not result.text or '{' not in result.text:
continue
try:
json_data = json.loads(result.text)
bs_jsons.append(json_data)
except json.JSONDecodeError:
# Skip objects which cannot be parsed
continue
if not bs_jsons:
raise JSONNotFound("No parsable JSON found in this document")
for json_data in bs_jsons:
stripped_text_from_html = _parse_json(json_data, json_filter)
if ensure_is_ldjson_info_type:
# Could sometimes be list, string or something else random
if isinstance(json_data, dict):
# If it has LD JSON 'key' @type, and @type is 'product', and something was found for the search
# (Some sites have multiple of the same ld+json @type='product', but some have the review part, some have the 'price' part)
# @type could also be a list although non-standard ("@type": ["Product", "SubType"],)
# LD_JSON auto-extract also requires some content PLUS the ldjson to be present
# 1833 - could be either str or dict, should not be anything else
t = json_data.get('@type')
if t and stripped_text_from_html:
if isinstance(t, str) and t.lower() == ensure_is_ldjson_info_type.lower():
break
# The non-standard part, some have a list
elif isinstance(t, list):
if ensure_is_ldjson_info_type.lower() in [x.lower().strip() for x in t]:
break
elif stripped_text_from_html:
break
if content_start[0] == '{' or content_start[0] == '[':
try:
# .lstrip("\ufeff") strings ByteOrderMark from UTF8 and still lets the UTF work
stripped_text_from_html = _parse_json(json.loads(content.lstrip("\ufeff")), json_filter)
except json.JSONDecodeError as e:
logger.warning(f"Error processing JSON {content[:20]}...{str(e)})")
else:
# Probably something else, go fish inside for it
try:
stripped_text_from_html = extract_json_blob_from_html(content=content,
ensure_is_ldjson_info_type=ensure_is_ldjson_info_type,
json_filter=json_filter )
except json.JSONDecodeError as e:
logger.warning(f"Error processing JSON while extracting JSON from HTML blob {content[:20]}...{str(e)})")
if not stripped_text_from_html:
# Re 265 - Just return an empty string when filter not found

View File

@@ -55,6 +55,7 @@ class model(dict):
'rss_access_token': None,
'rss_content_format': RSS_FORMAT_TYPES[0][0],
'rss_hide_muted_watches': True,
'rss_reader_mode': False,
'schema_version' : 0,
'shared_diff_access': False,
'strip_ignored_lines': False,

View File

@@ -94,24 +94,21 @@ class guess_stream_type():
self.is_rss = True
elif any(s in http_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
# magic will call a rss document 'xml'
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES) or '<rdf:' in test_content_normalized:
self.is_rss = True
elif any(s in http_content_header for s in XML_CONTENT_TYPES):
# Only mark as generic XML if not already detected as RSS
if not self.is_rss:
self.is_xml = True
elif 'pdf' in magic_content_header:
self.is_pdf = True
###
elif has_html_patterns or http_content_header == 'text/html':
self.is_html = True
# If magic says text/plain and we found no HTML patterns, trust it
elif magic_result == 'text/plain':
self.is_plaintext = True
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")
elif any(s in magic_content_header for s in JSON_CONTENT_TYPES):
self.is_json = True
# magic will call a rss document 'xml'
elif '<rss' in test_content_normalized or '<feed' in test_content_normalized or any(s in magic_content_header for s in RSS_XML_CONTENT_TYPES):
self.is_rss = True
elif test_content_normalized.startswith('<?xml') or any(s in magic_content_header for s in XML_CONTENT_TYPES):
# Generic XML that's not RSS/Atom (RSS/Atom checked above)
self.is_xml = True
@@ -122,4 +119,8 @@ class guess_stream_type():
# Only trust magic for 'text' if no other patterns matched
elif 'text' in magic_content_header:
self.is_plaintext = True
# If magic says text/plain and we found no HTML patterns, trust it
elif magic_result == 'text/plain':
self.is_plaintext = True
logger.debug(f"Trusting magic's text/plain result (no HTML patterns detected)")

View File

@@ -20,7 +20,7 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
name = 'Webpage Text/HTML, JSON and PDF changes'
description = 'Detects all text changes where possible'
json_filter_prefixes = ['json:', 'jq:', 'jqraw:']
JSON_FILTER_PREFIXES = ['json:', 'jq:', 'jqraw:']
# Assume it's this type if the server says nothing on content-type
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
@@ -99,6 +99,10 @@ class FilterConfig:
def has_include_filters(self):
return bool(self.include_filters) and bool(self.include_filters[0].strip())
@property
def has_include_json_filters(self):
return any(f.strip().startswith(prefix) for f in self.include_filters for prefix in JSON_FILTER_PREFIXES)
@property
def has_subtractive_selectors(self):
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
@@ -224,10 +228,23 @@ class ContentProcessor:
self.datastore = datastore
def preprocess_rss(self, content):
"""Convert CDATA/comments in RSS to usable text."""
return cdata_in_document_to_text(html_content=content)
"""
Convert CDATA/comments in RSS to usable text.
def preprocess_pdf(self, content, raw_content):
Supports two RSS processing modes:
- 'default': Inline CDATA replacement (original behavior)
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
"""
from changedetectionio import rss_tools
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
if rss_mode:
# Format RSS items nicely with CDATA content unmarked and converted to text
return rss_tools.format_rss_items(content)
else:
# Default: Original inline CDATA replacement
return cdata_in_document_to_text(html_content=content)
def preprocess_pdf(self, raw_content):
"""Convert PDF to HTML using external tool."""
from shutil import which
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
@@ -251,19 +268,18 @@ class ContentProcessor:
metadata = (
f"<p>Added by changedetection.io: Document checksum - "
f"{hashlib.md5(raw_content).hexdigest().upper()} "
f"Filesize - {len(html_content)} bytes</p>"
f"Original file size - {len(raw_content)} bytes</p>"
)
return html_content.replace('</body>', metadata + '</body>')
def preprocess_json(self, content, has_filters):
def preprocess_json(self, raw_content):
"""Format and sort JSON content."""
# Force reformat if no filters specified
if not has_filters:
content = html_tools.extract_json_as_string(content=content, json_filter="json:$")
# Then we re-format it, else it does have filters (later on) which will reformat it anyway
content = html_tools.extract_json_as_string(content=raw_content, json_filter="json:$")
# Sort JSON to avoid false alerts from reordering
try:
content = json.dumps(json.loads(content), sort_keys=True)
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
except Exception:
# Might be malformed JSON, continue anyway
pass
@@ -294,7 +310,7 @@ class ContentProcessor:
)
# JSON filters
elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes):
elif any(filter_rule.startswith(prefix) for prefix in JSON_FILTER_PREFIXES):
filtered_content += html_tools.extract_json_as_string(
content=content,
json_filter=filter_rule
@@ -381,14 +397,23 @@ class perform_site_check(difference_detection_processor):
# RSS preprocessing
if stream_content_type.is_rss:
content = content_processor.preprocess_rss(content)
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
stream_content_type.is_rss = False
stream_content_type.is_html = True
self.fetcher.content = content
# PDF preprocessing
if watch.is_pdf or stream_content_type.is_pdf:
content = content_processor.preprocess_pdf(content, self.fetcher.raw_content)
content = content_processor.preprocess_pdf(raw_content=self.fetcher.raw_content)
stream_content_type.is_html = True
# JSON - Always reformat it nicely for consistency.
# JSON preprocessing
if stream_content_type.is_json:
content = content_processor.preprocess_json(content, filter_config.has_include_filters)
if not filter_config.has_include_json_filters:
content = content_processor.preprocess_json(raw_content=content)
#else, otherwise it gets sorted/formatted in the filter stage anyway
# HTML obfuscation workarounds
if stream_content_type.is_html:
@@ -403,6 +428,8 @@ class perform_site_check(difference_detection_processor):
html_content = content
# Apply include filters (CSS, XPath, JSON)
# Except for plaintext (incase they tried to confuse the system, it will HTML escape
#if not stream_content_type.is_plaintext:
if filter_config.has_include_filters:
html_content = content_processor.apply_include_filters(content, stream_content_type)
@@ -414,6 +441,9 @@ class perform_site_check(difference_detection_processor):
if watch.is_source_type_url:
# For source URLs, keep raw content
stripped_text = html_content
elif stream_content_type.is_plaintext:
# For plaintext, keep as-is without HTML-to-text conversion
stripped_text = html_content
else:
# Extract text from HTML/RSS content (not generic XML)
if stream_content_type.is_html or stream_content_type.is_rss:

View File

@@ -0,0 +1,130 @@
"""
RSS/Atom feed processing tools for changedetection.io
"""
from loguru import logger
import re
def cdata_in_document_to_text(html_content: str, render_anchor_tag_content=False) -> str:
"""
Process CDATA sections in HTML/XML content - inline replacement.
Args:
html_content: The HTML/XML content to process
render_anchor_tag_content: Whether to render anchor tag content
Returns:
Processed HTML/XML content with CDATA sections replaced inline
"""
from xml.sax.saxutils import escape as xml_escape
from .html_tools import html_to_text
pattern = '<!\[CDATA\[(\s*(?:.(?<!\]\]>)\s*)*)\]\]>'
def repl(m):
text = m.group(1)
return xml_escape(html_to_text(html_content=text, render_anchor_tag_content=render_anchor_tag_content)).strip()
return re.sub(pattern, repl, html_content)
def format_rss_items(rss_content: str, render_anchor_tag_content=False) -> str:
"""
Format RSS/Atom feed items in a readable text format using feedparser.
Converts RSS <item> or Atom <entry> elements to formatted text with:
- <title> → <h1>Title</h1>
- <link> → Link: [url]
- <guid> → Guid: [id]
- <pubDate> → PubDate: [date]
- <description> or <content> → Raw HTML content (CDATA and entities automatically handled)
Args:
rss_content: The RSS/Atom feed content
render_anchor_tag_content: Whether to render anchor tag content in descriptions (unused, kept for compatibility)
Returns:
Formatted HTML content ready for html_to_text conversion
"""
try:
import feedparser
from xml.sax.saxutils import escape as xml_escape
# Parse the feed - feedparser handles all RSS/Atom variants, CDATA, entity unescaping, etc.
feed = feedparser.parse(rss_content)
formatted_items = []
# Determine feed type for appropriate labels when fields are missing
# feedparser sets feed.version to things like 'rss20', 'atom10', etc.
is_atom = feed.version and 'atom' in feed.version
for entry in feed.entries:
item_parts = []
# Title - feedparser handles CDATA and entity unescaping automatically
if hasattr(entry, 'title') and entry.title:
item_parts.append(f'<h1>{xml_escape(entry.title)}</h1>')
# Link
if hasattr(entry, 'link') and entry.link:
item_parts.append(f'Link: {xml_escape(entry.link)}<br>')
# GUID/ID
if hasattr(entry, 'id') and entry.id:
item_parts.append(f'Guid: {xml_escape(entry.id)}<br>')
# Date - feedparser normalizes all date field names to 'published'
if hasattr(entry, 'published') and entry.published:
item_parts.append(f'PubDate: {xml_escape(entry.published)}<br>')
# Description/Content - feedparser handles CDATA and entity unescaping automatically
# Only add "Summary:" label for Atom <summary> tags
content = None
add_label = False
if hasattr(entry, 'content') and entry.content:
# Atom <content> - no label, just content
content = entry.content[0].value if entry.content[0].value else None
elif hasattr(entry, 'summary'):
# Could be RSS <description> or Atom <summary>
# feedparser maps both to entry.summary
content = entry.summary if entry.summary else None
# Only add "Summary:" label for Atom feeds (which use <summary> tag)
if is_atom:
add_label = True
# Add content with or without label
if content:
if add_label:
item_parts.append(f'Summary:<br>{content}')
else:
item_parts.append(content)
else:
# No content - just show <none>
item_parts.append('&lt;none&gt;')
# Join all parts of this item
if item_parts:
formatted_items.append('\n'.join(item_parts))
# Wrap each item in a div with classes (first, last, item-N)
items_html = []
total_items = len(formatted_items)
for idx, item in enumerate(formatted_items):
classes = ['rss-item']
if idx == 0:
classes.append('first')
if idx == total_items - 1:
classes.append('last')
classes.append(f'item-{idx + 1}')
class_str = ' '.join(classes)
items_html.append(f'<div class="{class_str}">{item}</div>')
return '<html><body>\n'+"\n<br><br>".join(items_html)+'\n</body></html>'
except Exception as e:
logger.warning(f"Error formatting RSS items: {str(e)}")
# Fall back to original content
return rss_content

View File

@@ -344,7 +344,7 @@ label {
}
}
#notification-customisation {
.grey-form-border {
border: 1px solid var(--color-border-notification);
padding: 0.5rem;
border-radius: 5px;

File diff suppressed because one or more lines are too long

View File

@@ -33,7 +33,7 @@
<div id="notification-test-log" style="display: none;"><span class="pure-form-message-inline">Processing..</span></div>
</div>
</div>
<div id="notification-customisation" class="pure-control-group">
<div class="pure-control-group grey-form-border">
<div class="pure-control-group">
{{ render_field(form.notification_title, class="m-d notification-title", placeholder=settings_application['notification_title']) }}
<span class="pure-form-message-inline">Title for all notifications</span>

View File

@@ -113,14 +113,8 @@ def set_original_ext_response():
return None
def set_modified_ext_response():
data = """
[
{
"isPriceLowered": false,
"status": "Sold",
"statusOrig": "sold"
},
{
# This should get reformatted
data = """ [ { "isPriceLowered": false, "status": "Sold", "statusOrig": "sold" }, {
"_id": "5e7b3e1fb3262d306323ff1e",
"listingsType": "consumer",
"isPriceLowered": false,
@@ -230,30 +224,15 @@ def check_json_filter(json_filter, client, live_server):
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": json_filter.splitlines()})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
assert bytes(escape(json_filter).encode('utf-8')) in res.data
@@ -272,7 +251,7 @@ def check_json_filter(json_filter, client, live_server):
assert b'has-unread-changes' in res.data
# Should not see this, because its not in the JSONPath we entered
res = client.get(url_for("ui.ui_views.diff_history_page", uuid="first"))
res = client.get(url_for("ui.ui_views.diff_history_page", uuid=uuid))
# But the change should be there, tho its hard to test the change was detected because it will show old and new versions
# And #462 - check we see the proper utf-8 string there
@@ -294,32 +273,12 @@ def test_check_jqraw_filter(client, live_server, measure_memory_usage):
def check_json_filter_bool_val(json_filter, client, live_server):
set_original_response()
# Give the endpoint time to spin up
time.sleep(1)
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={"include_filters": [json_filter]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
"headers": "",
"fetch_backend": "html_requests",
"time_between_check_use_default": "y"
},
follow_redirects=True
)
assert b"Updated watch." in res.data
# Give the thread time to pick it up
wait_for_all_checks(client)
# Make a change
set_modified_response()
@@ -353,21 +312,16 @@ def test_check_jqraw_filter_bool_val(client, live_server, measure_memory_usage):
def check_json_ext_filter(json_filter, client, live_server):
set_original_ext_response()
# Give the endpoint time to spin up
time.sleep(1)
# Add our URL to the import page
test_url = url_for('test_endpoint', content_type="application/json", _external=True)
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
# Give the thread time to pick it up
wait_for_all_checks(client)
# Goto the edit page, add our ignore text
# Add our URL to the import page
res = client.post(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
data={"include_filters": json_filter,
"url": test_url,
"tags": "",
@@ -381,7 +335,7 @@ def check_json_ext_filter(json_filter, client, live_server):
# Check it saved
res = client.get(
url_for("ui.ui_edit.edit_page", uuid="first"),
url_for("ui.ui_edit.edit_page", uuid=uuid),
)
assert bytes(escape(json_filter).encode('utf-8')) in res.data
@@ -395,6 +349,12 @@ def check_json_ext_filter(json_filter, client, live_server):
# Give the thread time to pick it up
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert snapshot_contents[0] == '['
# It should have 'has-unread-changes'
res = client.get(url_for("watchlist.index"))
assert b'has-unread-changes' in res.data
@@ -456,7 +416,7 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
# Like in https://github.com/dgtlmoon/changedetection.io/pull/1593
# Specify extra html that JSON is sometimes wrapped in - when using SockpuppetBrowser / Puppeteer / Playwrightetc
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write('<html><body>{"hello" : 123, "world": 123}')
f.write('<html><body>{ "world": 123, "hello" : 123}')
# Add our URL to the import page
# Check weird casing is cleaned up and detected also
@@ -474,8 +434,18 @@ def test_correct_header_detect(client, live_server, measure_memory_usage):
follow_redirects=True
)
assert b'&#34;hello&#34;: 123,' in res.data
assert b'&#34;world&#34;: 123' in res.data
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert b'&#34;hello&#34;: 123,' in res.data # properly html escaped in the front end
# Should be correctly formatted and sorted, ("world" goes to end)
assert snapshot_contents == """{
"hello": 123,
"world": 123
}"""
delete_all_watches(client)

View File

@@ -8,25 +8,30 @@ from .util import set_original_response, set_modified_response, live_server_setu
# `subtractive_selectors` should still work in `source:` type requests
def test_fetch_pdf(client, live_server, measure_memory_usage):
import shutil
shutil.copy("tests/test.pdf", "test-datastore/endpoint-test.pdf")
import os
shutil.copy("tests/test.pdf", "test-datastore/endpoint-test.pdf")
first_version_size = os.path.getsize("test-datastore/endpoint-test.pdf")
# live_server_setup(live_server) # Setup on conftest per function
test_url = url_for('test_pdf_endpoint', _external=True)
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
res = client.get(
url_for("ui.ui_views.preview_page", uuid="first"),
follow_redirects=True
)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
# PDF header should not be there (it was converted to text)
assert b'PDF' not in res.data[:10]
assert b'hello world' in res.data
assert 'PDF' not in snapshot_contents
# Was converted away from HTML
assert 'pdftohtml' not in snapshot_contents.lower() # Generator tag shouldnt be there
assert f'Original file size - {first_version_size}' in snapshot_contents
assert 'html' not in snapshot_contents.lower() # is converted from html
assert 'body' not in snapshot_contents.lower() # is converted from html
# And our text content was there
assert 'hello world' in snapshot_contents
# So we know if the file changes in other ways
import hashlib
@@ -34,8 +39,7 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
# We should have one
assert len(original_md5) >0
# And it's going to be in the document
assert b'Document checksum - '+bytes(str(original_md5).encode('utf-8')) in res.data
assert f'Document checksum - {original_md5}' in snapshot_contents
shutil.copy("tests/test2.pdf", "test-datastore/endpoint-test.pdf")
changed_md5 = hashlib.md5(open("test-datastore/endpoint-test.pdf", 'rb').read()).hexdigest().upper()
@@ -58,7 +62,6 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
assert original_md5.encode('utf-8') not in res.data
assert changed_md5.encode('utf-8') in res.data
res = client.get(
url_for("ui.ui_views.diff_history_page", uuid="first"),
follow_redirects=True
@@ -66,6 +69,16 @@ def test_fetch_pdf(client, live_server, measure_memory_usage):
assert original_md5.encode('utf-8') in res.data
assert changed_md5.encode('utf-8') in res.data
assert b'here is a change' in res.data
dates = list(watch.history.keys())
# new snapshot was also OK, no HTML
snapshot_contents = watch.get_history_snapshot(dates[1])
assert 'html' not in snapshot_contents.lower()
assert f'Original file size - {os.path.getsize("test-datastore/endpoint-test.pdf")}' in snapshot_contents
assert f'here is a change' in snapshot_contents
assert os.path.getsize("test-datastore/endpoint-test.pdf") != first_version_size # And the disk change worked

View File

@@ -110,8 +110,9 @@ def test_basic_cdata_rss_markup(client, live_server, measure_memory_usage):
set_original_cdata_xml()
test_url = url_for('test_endpoint', content_type="application/atom+xml; charset=UTF-8", _external=True)
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)

View File

@@ -0,0 +1,98 @@
#!/usr/bin/env python3
import time
from flask import url_for
from .util import set_original_response, set_modified_response, live_server_setup, wait_for_all_checks, extract_rss_token_from_UI, \
extract_UUID_from_client, delete_all_watches
def set_original_cdata_xml():
test_return_data = """<rss xmlns:atom="http://www.w3.org/2005/Atom" version="2.0">
<channel>
<title>Security Bulletins on wetscale</title>
<link>https://wetscale.com/security-bulletins/</link>
<description>Recent security bulletins from wetscale</description>
<lastBuildDate>Fri, 10 Oct 2025 14:58:11 GMT</lastBuildDate>
<docs>https://validator.w3.org/feed/docs/rss2.html</docs>
<generator>wetscale.com</generator>
<language>en-US</language>
<copyright>© 2025 wetscale Inc. All rights reserved.</copyright>
<atom:link href="https://wetscale.com/security-bulletins/index.xml" rel="self" type="application/rss+xml"/>
<item>
<title>TS-2025-005</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-005</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-005</guid>
<pubDate>Thu, 07 Aug 2025 00:00:00 GMT</pubDate>
<description><p>Wet noodles escape<br><p>they also found themselves outside</p> </description>
</item>
<item>
<title>TS-2025-004</title>
<link>https://wetscale.com/security-bulletins/#ts-2025-004</link>
<guid>https://wetscale.com/security-bulletins/#ts-2025-004</guid>
<pubDate>Tue, 27 May 2025 00:00:00 GMT</pubDate>
<description>
<![CDATA[ <img class="type:primaryImage" src="https://testsite.com/701c981da04869e.jpg"/><p>The days of Terminator and The Matrix could be closer. But be positive.</p><p><a href="https://testsite.com">Read more link...</a></p> ]]>
</description>
</item>
</channel>
</rss>
"""
with open("test-datastore/endpoint-content.txt", "w") as f:
f.write(test_return_data)
def test_rss_reader_mode(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url)
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
assert 'PubDate: Thu, 07 Aug 2025 00:00:00 GMT' in snapshot_contents
delete_all_watches(client)
def test_rss_reader_mode_with_css_filters(client, live_server, measure_memory_usage):
set_original_cdata_xml()
# Rarely do endpoints give the right header, usually just text/xml, so we check also for <rss
# This also triggers the automatic CDATA text parser so the RSS goes back a nice content list
test_url = url_for('test_endpoint', content_type="text/xml; charset=UTF-8", _external=True)
live_server.app.config['DATASTORE'].data['settings']['application']['rss_reader_mode'] = True
# Add our URL to the import page
uuid = client.application.config.get('DATASTORE').add_watch(url=test_url, extras={'include_filters': [".last"]})
client.get(url_for("ui.form_watch_checknow"), follow_redirects=True)
wait_for_all_checks(client)
watch = live_server.app.config['DATASTORE'].data['watching'][uuid]
dates = list(watch.history.keys())
snapshot_contents = watch.get_history_snapshot(dates[0])
assert 'Wet noodles escape' not in snapshot_contents
assert '<br>' not in snapshot_contents
assert '&lt;' not in snapshot_contents
assert 'The days of Terminator and The Matrix' in snapshot_contents
delete_all_watches(client)

View File

@@ -1,5 +1,6 @@
# eventlet>=0.38.0 # Removed - replaced with threading mode for better Python 3.12+ compatibility
feedgen~=0.9
feedparser~=6.0 # For parsing RSS/Atom feeds
flask-compress
# 0.6.3 included compatibility fix for werkzeug 3.x (2.x had deprecation of url handlers)
flask-login>=0.6.3