mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-16 04:58:15 +00:00
Compare commits
4 Commits
0.48.01
...
enhanced-R
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
365df6cf81 | ||
|
|
76729f2106 | ||
|
|
591dd5b570 | ||
|
|
919812bf8b |
@@ -729,6 +729,12 @@ def changedetection_app(config=None, datastore_o=None):
|
||||
for p in datastore.proxy_list:
|
||||
form.proxy.choices.append(tuple((p, datastore.proxy_list[p]['label'])))
|
||||
|
||||
# Add some HTML to be used for form validation
|
||||
if datastore.data['watching'][uuid].history.keys():
|
||||
timestamp = list(datastore.data['watching'][uuid].history.keys())[-1]
|
||||
form.last_html_for_form_validation = datastore.data['watching'][uuid].get_fetched_html(timestamp)
|
||||
else:
|
||||
form.last_html_for_form_validation = "<html><body></body></html>"
|
||||
|
||||
if request.method == 'POST' and form.validate():
|
||||
|
||||
|
||||
@@ -1,6 +1,9 @@
|
||||
import os
|
||||
import re
|
||||
|
||||
import elementpath
|
||||
|
||||
from changedetectionio.html_tools import xpath_filter, xpath1_filter
|
||||
from changedetectionio.strtobool import strtobool
|
||||
|
||||
from wtforms import (
|
||||
@@ -322,52 +325,39 @@ class ValidateCSSJSONXPATHInput(object):
|
||||
self.allow_json = allow_json
|
||||
|
||||
def __call__(self, form, field):
|
||||
|
||||
from lxml.etree import XPathEvalError
|
||||
if isinstance(field.data, str):
|
||||
data = [field.data]
|
||||
else:
|
||||
data = field.data
|
||||
|
||||
for line in data:
|
||||
# Nothing to see here
|
||||
if not len(line.strip()):
|
||||
return
|
||||
line = line.strip()
|
||||
|
||||
# Does it look like XPath?
|
||||
if line.strip()[0] == '/' or line.strip().startswith('xpath:'):
|
||||
if not line:
|
||||
continue
|
||||
|
||||
if line.startswith('xpath') or line.startswith('/'):
|
||||
if not self.allow_xpath:
|
||||
raise ValidationError("XPath not permitted in this field!")
|
||||
from lxml import etree, html
|
||||
import elementpath
|
||||
# xpath 2.0-3.1
|
||||
from elementpath.xpath3 import XPath3Parser
|
||||
tree = html.fromstring("<html></html>")
|
||||
line = line.replace('xpath:', '')
|
||||
|
||||
if line.startswith('xpath1:'):
|
||||
filter_function = xpath1_filter
|
||||
else:
|
||||
line = line.replace('xpath:', '')
|
||||
filter_function = xpath_filter
|
||||
|
||||
try:
|
||||
elementpath.select(tree, line.strip(), parser=XPath3Parser)
|
||||
except elementpath.ElementPathError as e:
|
||||
# Call the determined function
|
||||
res = filter_function(xpath_filter=line, html_content=form.last_html_for_form_validation)
|
||||
# It's OK if this is an empty result, we just want to check that it doesn't crash the parser
|
||||
except (elementpath.ElementPathError,XPathEvalError) as e:
|
||||
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
||||
raise ValidationError(message % (line, str(e)))
|
||||
except:
|
||||
except Exception as e:
|
||||
raise ValidationError("A system-error occurred when validating your XPath expression")
|
||||
|
||||
if line.strip().startswith('xpath1:'):
|
||||
if not self.allow_xpath:
|
||||
raise ValidationError("XPath not permitted in this field!")
|
||||
from lxml import etree, html
|
||||
tree = html.fromstring("<html></html>")
|
||||
line = re.sub(r'^xpath1:', '', line)
|
||||
|
||||
try:
|
||||
tree.xpath(line.strip())
|
||||
except etree.XPathEvalError as e:
|
||||
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
||||
raise ValidationError(message % (line, str(e)))
|
||||
except:
|
||||
raise ValidationError("A system-error occurred when validating your XPath expression")
|
||||
|
||||
if 'json:' in line:
|
||||
elif 'json:' in line:
|
||||
if not self.allow_json:
|
||||
raise ValidationError("JSONPath not permitted in this field!")
|
||||
|
||||
@@ -392,7 +382,7 @@ class ValidateCSSJSONXPATHInput(object):
|
||||
if not self.allow_json:
|
||||
raise ValidationError("jq not permitted in this field!")
|
||||
|
||||
if 'jq:' in line:
|
||||
elif line.startswith('jq:'):
|
||||
try:
|
||||
import jq
|
||||
except ModuleNotFoundError:
|
||||
|
||||
@@ -8,6 +8,7 @@ from xml.sax.saxutils import escape as xml_escape
|
||||
import json
|
||||
import re
|
||||
|
||||
from loguru import logger
|
||||
|
||||
# HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
|
||||
TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
|
||||
@@ -108,6 +109,20 @@ def elementpath_tostring(obj):
|
||||
|
||||
return str(obj)
|
||||
|
||||
def extract_namespaces(xml_content):
|
||||
"""
|
||||
Extracts all namespaces from the XML content.
|
||||
"""
|
||||
from lxml import etree
|
||||
from io import BytesIO
|
||||
|
||||
it = etree.iterparse(BytesIO(xml_content), events=('start-ns',))
|
||||
namespaces = {}
|
||||
for _, ns in it:
|
||||
prefix, uri = ns
|
||||
namespaces[prefix] = uri
|
||||
return namespaces
|
||||
|
||||
# Return str Utf-8 of matched rules
|
||||
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
|
||||
from lxml import etree, html
|
||||
@@ -123,7 +138,14 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
|
||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||
html_block = ""
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser)
|
||||
# Automatically extract all namespaces from the XML content
|
||||
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||
try:
|
||||
namespaces.update(extract_namespaces(html_content.encode('utf-8')))
|
||||
except Exception as e:
|
||||
logger.warning(f"Problem extracting namespaces from HTMl/XML content {str(e)}")
|
||||
|
||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
|
||||
#@note: //title/text() wont work where <title>CDATA..
|
||||
|
||||
if type(r) != list:
|
||||
|
||||
@@ -77,11 +77,12 @@ class perform_site_check(difference_detection_processor):
|
||||
|
||||
ctype_header = self.fetcher.get_all_headers().get('content-type', '').lower()
|
||||
# Go into RSS preprocess for converting CDATA/comment to usable text
|
||||
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']):
|
||||
if '<rss' in self.fetcher.content[:100].lower():
|
||||
# Ctype_header could be unset if we are just reprocessing the existin content
|
||||
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']) or not ctype_header:
|
||||
top_text = self.fetcher.content[:200].lower().strip()
|
||||
if '<rss' in top_text or 'search.yahoo.com/mrss/' in top_text:
|
||||
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
|
||||
is_rss = True
|
||||
|
||||
# source: support, basically treat it as plaintext
|
||||
if watch.is_source_type_url:
|
||||
is_html = False
|
||||
|
||||
@@ -164,3 +164,46 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
|
||||
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
|
||||
|
||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
def test_namespace_selectors(live_server, client):
|
||||
set_original_cdata_xml()
|
||||
#live_server_setup(live_server)
|
||||
|
||||
test_url = url_for('test_endpoint', content_type="application/xml", _external=True)
|
||||
|
||||
res = client.post(
|
||||
url_for("import_page"),
|
||||
data={"urls": test_url},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
assert b"1 Imported" in res.data
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
uuid = extract_UUID_from_client(client)
|
||||
# because it will look for the namespaced stuff during form validation, but on the first check it wont exist..
|
||||
res = client.post(
|
||||
url_for("edit_page", uuid=uuid),
|
||||
data={
|
||||
"include_filters": "//media:thumbnail/@url",
|
||||
"fetch_backend": "html_requests",
|
||||
"headers": "",
|
||||
"proxy": "no-proxy",
|
||||
"tags": "",
|
||||
"url": test_url,
|
||||
},
|
||||
follow_redirects=True
|
||||
)
|
||||
|
||||
wait_for_all_checks(client)
|
||||
|
||||
res = client.get(
|
||||
url_for("preview_page", uuid="first"),
|
||||
follow_redirects=True
|
||||
)
|
||||
assert b'CDATA' not in res.data
|
||||
assert b'<![' not in res.data
|
||||
assert b'https://testsite.com/thumbnail-c224e10d81488e818701c981da04869e.jpg' in res.data
|
||||
|
||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||
|
||||
Reference in New Issue
Block a user