mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-16 13:06:37 +00:00
Compare commits
4 Commits
0.50.41
...
enhanced-R
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
365df6cf81 | ||
|
|
76729f2106 | ||
|
|
591dd5b570 | ||
|
|
919812bf8b |
@@ -729,6 +729,12 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
for p in datastore.proxy_list:
|
for p in datastore.proxy_list:
|
||||||
form.proxy.choices.append(tuple((p, datastore.proxy_list[p]['label'])))
|
form.proxy.choices.append(tuple((p, datastore.proxy_list[p]['label'])))
|
||||||
|
|
||||||
|
# Add some HTML to be used for form validation
|
||||||
|
if datastore.data['watching'][uuid].history.keys():
|
||||||
|
timestamp = list(datastore.data['watching'][uuid].history.keys())[-1]
|
||||||
|
form.last_html_for_form_validation = datastore.data['watching'][uuid].get_fetched_html(timestamp)
|
||||||
|
else:
|
||||||
|
form.last_html_for_form_validation = "<html><body></body></html>"
|
||||||
|
|
||||||
if request.method == 'POST' and form.validate():
|
if request.method == 'POST' and form.validate():
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,9 @@
|
|||||||
import os
|
import os
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
import elementpath
|
||||||
|
|
||||||
|
from changedetectionio.html_tools import xpath_filter, xpath1_filter
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
|
|
||||||
from wtforms import (
|
from wtforms import (
|
||||||
@@ -322,52 +325,39 @@ class ValidateCSSJSONXPATHInput(object):
|
|||||||
self.allow_json = allow_json
|
self.allow_json = allow_json
|
||||||
|
|
||||||
def __call__(self, form, field):
|
def __call__(self, form, field):
|
||||||
|
from lxml.etree import XPathEvalError
|
||||||
if isinstance(field.data, str):
|
if isinstance(field.data, str):
|
||||||
data = [field.data]
|
data = [field.data]
|
||||||
else:
|
else:
|
||||||
data = field.data
|
data = field.data
|
||||||
|
|
||||||
for line in data:
|
for line in data:
|
||||||
# Nothing to see here
|
line = line.strip()
|
||||||
if not len(line.strip()):
|
|
||||||
return
|
|
||||||
|
|
||||||
# Does it look like XPath?
|
if not line:
|
||||||
if line.strip()[0] == '/' or line.strip().startswith('xpath:'):
|
continue
|
||||||
|
|
||||||
|
if line.startswith('xpath') or line.startswith('/'):
|
||||||
if not self.allow_xpath:
|
if not self.allow_xpath:
|
||||||
raise ValidationError("XPath not permitted in this field!")
|
raise ValidationError("XPath not permitted in this field!")
|
||||||
from lxml import etree, html
|
|
||||||
import elementpath
|
if line.startswith('xpath1:'):
|
||||||
# xpath 2.0-3.1
|
filter_function = xpath1_filter
|
||||||
from elementpath.xpath3 import XPath3Parser
|
else:
|
||||||
tree = html.fromstring("<html></html>")
|
line = line.replace('xpath:', '')
|
||||||
line = line.replace('xpath:', '')
|
filter_function = xpath_filter
|
||||||
|
|
||||||
try:
|
try:
|
||||||
elementpath.select(tree, line.strip(), parser=XPath3Parser)
|
# Call the determined function
|
||||||
except elementpath.ElementPathError as e:
|
res = filter_function(xpath_filter=line, html_content=form.last_html_for_form_validation)
|
||||||
|
# It's OK if this is an empty result, we just want to check that it doesn't crash the parser
|
||||||
|
except (elementpath.ElementPathError,XPathEvalError) as e:
|
||||||
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
||||||
raise ValidationError(message % (line, str(e)))
|
raise ValidationError(message % (line, str(e)))
|
||||||
except:
|
except Exception as e:
|
||||||
raise ValidationError("A system-error occurred when validating your XPath expression")
|
raise ValidationError("A system-error occurred when validating your XPath expression")
|
||||||
|
|
||||||
if line.strip().startswith('xpath1:'):
|
elif 'json:' in line:
|
||||||
if not self.allow_xpath:
|
|
||||||
raise ValidationError("XPath not permitted in this field!")
|
|
||||||
from lxml import etree, html
|
|
||||||
tree = html.fromstring("<html></html>")
|
|
||||||
line = re.sub(r'^xpath1:', '', line)
|
|
||||||
|
|
||||||
try:
|
|
||||||
tree.xpath(line.strip())
|
|
||||||
except etree.XPathEvalError as e:
|
|
||||||
message = field.gettext('\'%s\' is not a valid XPath expression. (%s)')
|
|
||||||
raise ValidationError(message % (line, str(e)))
|
|
||||||
except:
|
|
||||||
raise ValidationError("A system-error occurred when validating your XPath expression")
|
|
||||||
|
|
||||||
if 'json:' in line:
|
|
||||||
if not self.allow_json:
|
if not self.allow_json:
|
||||||
raise ValidationError("JSONPath not permitted in this field!")
|
raise ValidationError("JSONPath not permitted in this field!")
|
||||||
|
|
||||||
@@ -392,7 +382,7 @@ class ValidateCSSJSONXPATHInput(object):
|
|||||||
if not self.allow_json:
|
if not self.allow_json:
|
||||||
raise ValidationError("jq not permitted in this field!")
|
raise ValidationError("jq not permitted in this field!")
|
||||||
|
|
||||||
if 'jq:' in line:
|
elif line.startswith('jq:'):
|
||||||
try:
|
try:
|
||||||
import jq
|
import jq
|
||||||
except ModuleNotFoundError:
|
except ModuleNotFoundError:
|
||||||
|
|||||||
@@ -8,6 +8,7 @@ from xml.sax.saxutils import escape as xml_escape
|
|||||||
import json
|
import json
|
||||||
import re
|
import re
|
||||||
|
|
||||||
|
from loguru import logger
|
||||||
|
|
||||||
# HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
|
# HTML added to be sure each result matching a filter (.example) gets converted to a new line by Inscriptis
|
||||||
TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
|
TEXT_FILTER_LIST_LINE_SUFFIX = "<br>"
|
||||||
@@ -108,6 +109,20 @@ def elementpath_tostring(obj):
|
|||||||
|
|
||||||
return str(obj)
|
return str(obj)
|
||||||
|
|
||||||
|
def extract_namespaces(xml_content):
|
||||||
|
"""
|
||||||
|
Extracts all namespaces from the XML content.
|
||||||
|
"""
|
||||||
|
from lxml import etree
|
||||||
|
from io import BytesIO
|
||||||
|
|
||||||
|
it = etree.iterparse(BytesIO(xml_content), events=('start-ns',))
|
||||||
|
namespaces = {}
|
||||||
|
for _, ns in it:
|
||||||
|
prefix, uri = ns
|
||||||
|
namespaces[prefix] = uri
|
||||||
|
return namespaces
|
||||||
|
|
||||||
# Return str Utf-8 of matched rules
|
# Return str Utf-8 of matched rules
|
||||||
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
|
def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False, is_rss=False):
|
||||||
from lxml import etree, html
|
from lxml import etree, html
|
||||||
@@ -123,7 +138,14 @@ def xpath_filter(xpath_filter, html_content, append_pretty_line_formatting=False
|
|||||||
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
tree = html.fromstring(bytes(html_content, encoding='utf-8'), parser=parser)
|
||||||
html_block = ""
|
html_block = ""
|
||||||
|
|
||||||
r = elementpath.select(tree, xpath_filter.strip(), namespaces={'re': 'http://exslt.org/regular-expressions'}, parser=XPath3Parser)
|
# Automatically extract all namespaces from the XML content
|
||||||
|
namespaces = {'re': 'http://exslt.org/regular-expressions'}
|
||||||
|
try:
|
||||||
|
namespaces.update(extract_namespaces(html_content.encode('utf-8')))
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning(f"Problem extracting namespaces from HTMl/XML content {str(e)}")
|
||||||
|
|
||||||
|
r = elementpath.select(tree, xpath_filter.strip(), namespaces=namespaces, parser=XPath3Parser)
|
||||||
#@note: //title/text() wont work where <title>CDATA..
|
#@note: //title/text() wont work where <title>CDATA..
|
||||||
|
|
||||||
if type(r) != list:
|
if type(r) != list:
|
||||||
|
|||||||
@@ -77,11 +77,12 @@ class perform_site_check(difference_detection_processor):
|
|||||||
|
|
||||||
ctype_header = self.fetcher.get_all_headers().get('content-type', '').lower()
|
ctype_header = self.fetcher.get_all_headers().get('content-type', '').lower()
|
||||||
# Go into RSS preprocess for converting CDATA/comment to usable text
|
# Go into RSS preprocess for converting CDATA/comment to usable text
|
||||||
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']):
|
# Ctype_header could be unset if we are just reprocessing the existin content
|
||||||
if '<rss' in self.fetcher.content[:100].lower():
|
if any(substring in ctype_header for substring in ['application/xml', 'application/rss', 'text/xml']) or not ctype_header:
|
||||||
|
top_text = self.fetcher.content[:200].lower().strip()
|
||||||
|
if '<rss' in top_text or 'search.yahoo.com/mrss/' in top_text:
|
||||||
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
|
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
|
||||||
is_rss = True
|
is_rss = True
|
||||||
|
|
||||||
# source: support, basically treat it as plaintext
|
# source: support, basically treat it as plaintext
|
||||||
if watch.is_source_type_url:
|
if watch.is_source_type_url:
|
||||||
is_html = False
|
is_html = False
|
||||||
|
|||||||
@@ -164,3 +164,46 @@ def test_rss_xpath_filtering(client, live_server, measure_memory_usage):
|
|||||||
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
|
assert b'Some other description' not in res.data # Should NOT be selected by the xpath
|
||||||
|
|
||||||
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
|
||||||
|
def test_namespace_selectors(live_server, client):
|
||||||
|
set_original_cdata_xml()
|
||||||
|
#live_server_setup(live_server)
|
||||||
|
|
||||||
|
test_url = url_for('test_endpoint', content_type="application/xml", _external=True)
|
||||||
|
|
||||||
|
res = client.post(
|
||||||
|
url_for("import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
uuid = extract_UUID_from_client(client)
|
||||||
|
# because it will look for the namespaced stuff during form validation, but on the first check it wont exist..
|
||||||
|
res = client.post(
|
||||||
|
url_for("edit_page", uuid=uuid),
|
||||||
|
data={
|
||||||
|
"include_filters": "//media:thumbnail/@url",
|
||||||
|
"fetch_backend": "html_requests",
|
||||||
|
"headers": "",
|
||||||
|
"proxy": "no-proxy",
|
||||||
|
"tags": "",
|
||||||
|
"url": test_url,
|
||||||
|
},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
res = client.get(
|
||||||
|
url_for("preview_page", uuid="first"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b'CDATA' not in res.data
|
||||||
|
assert b'<![' not in res.data
|
||||||
|
assert b'https://testsite.com/thumbnail-c224e10d81488e818701c981da04869e.jpg' in res.data
|
||||||
|
|
||||||
|
res = client.get(url_for("form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user