mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-20 08:26:12 +00:00
Compare commits
1 Commits
0.50.20
...
3434-detec
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
45f8e546d6 |
@@ -153,12 +153,26 @@ class perform_site_check(difference_detection_processor):
|
|||||||
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
|
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
|
||||||
self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content)
|
self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content)
|
||||||
html_content = self.fetcher.content
|
html_content = self.fetcher.content
|
||||||
|
content_type = self.fetcher.get_all_headers().get('content-type', '').lower()
|
||||||
|
is_attachment = 'attachment' in self.fetcher.get_all_headers().get('content-disposition', '').lower()
|
||||||
|
|
||||||
# If not JSON, and if it's not text/plain..
|
# Try to detect better mime types if its a download or not announced as HTML
|
||||||
if 'text/plain' in self.fetcher.get_all_headers().get('content-type', '').lower():
|
if is_attachment or 'octet-stream' in content_type or not 'html' in content_type:
|
||||||
|
logger.debug(f"Got a reply that may be a download or possibly a text attachment, checking..")
|
||||||
|
try:
|
||||||
|
import magic
|
||||||
|
mime = magic.from_buffer(html_content, mime=True)
|
||||||
|
logger.debug(f"Guessing mime type, original content_type '{content_type}', mime type detected '{mime}'")
|
||||||
|
if mime and "/" in mime: # looks valid and is a valid mime type
|
||||||
|
content_type = mime
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}")
|
||||||
|
|
||||||
|
if 'text/' in content_type and not 'html' in content_type:
|
||||||
# Don't run get_text or xpath/css filters on plaintext
|
# Don't run get_text or xpath/css filters on plaintext
|
||||||
stripped_text_from_html = html_content
|
stripped_text_from_html = html_content
|
||||||
else:
|
else:
|
||||||
|
# If not JSON, and if it's not text/plain..
|
||||||
# Does it have some ld+json price data? used for easier monitoring
|
# Does it have some ld+json price data? used for easier monitoring
|
||||||
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content)
|
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content)
|
||||||
|
|
||||||
|
|||||||
@@ -165,3 +165,53 @@ def test_check_basic_change_detection_functionality(client, live_server, measure
|
|||||||
# Cleanup everything
|
# Cleanup everything
|
||||||
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||||
assert b'Deleted' in res.data
|
assert b'Deleted' in res.data
|
||||||
|
|
||||||
|
def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage):
|
||||||
|
with open("test-datastore/endpoint-content.txt", "w") as f:
|
||||||
|
f.write("""some random text that should be split by line
|
||||||
|
and not parsed with html_to_text
|
||||||
|
this way we know that it correctly parsed as plain text
|
||||||
|
\r\n
|
||||||
|
ok\r\n
|
||||||
|
got it\r\n
|
||||||
|
""")
|
||||||
|
|
||||||
|
test_url = url_for('test_endpoint', content_type="application/octet-stream", _external=True)
|
||||||
|
|
||||||
|
# Add our URL to the import page
|
||||||
|
res = client.post(
|
||||||
|
url_for("imports.import_page"),
|
||||||
|
data={"urls": test_url},
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
|
||||||
|
assert b"1 Imported" in res.data
|
||||||
|
|
||||||
|
wait_for_all_checks(client)
|
||||||
|
|
||||||
|
### check the front end
|
||||||
|
res = client.get(
|
||||||
|
url_for("ui.ui_views.preview_page", uuid="first"),
|
||||||
|
follow_redirects=True
|
||||||
|
)
|
||||||
|
assert b"some random text that should be split by line\n" in res.data
|
||||||
|
####
|
||||||
|
|
||||||
|
# Check the snapshot by API that it has linefeeds too
|
||||||
|
watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching']))
|
||||||
|
api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token')
|
||||||
|
res = client.get(
|
||||||
|
url_for("watchhistory", uuid=watch_uuid),
|
||||||
|
headers={'x-api-key': api_key},
|
||||||
|
)
|
||||||
|
|
||||||
|
# Fetch a snapshot by timestamp, check the right one was found
|
||||||
|
res = client.get(
|
||||||
|
url_for("watchsinglehistory", uuid=watch_uuid, timestamp=list(res.json.keys())[-1]),
|
||||||
|
headers={'x-api-key': api_key},
|
||||||
|
)
|
||||||
|
assert b"some random text that should be split by line\n" in res.data
|
||||||
|
|
||||||
|
|
||||||
|
res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True)
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user