mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-10-31 14:47:21 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			docker-bui
			...
			3434-detec
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 45f8e546d6 | 
| @@ -153,12 +153,26 @@ class perform_site_check(difference_detection_processor): | ||||
|             # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text | ||||
|             self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content) | ||||
|             html_content = self.fetcher.content | ||||
|             content_type = self.fetcher.get_all_headers().get('content-type', '').lower() | ||||
|             is_attachment = 'attachment' in self.fetcher.get_all_headers().get('content-disposition', '').lower() | ||||
|  | ||||
|             # If not JSON,  and if it's not text/plain.. | ||||
|             if 'text/plain' in self.fetcher.get_all_headers().get('content-type', '').lower(): | ||||
|             # Try to detect better mime types if its a download or not announced as HTML | ||||
|             if is_attachment or 'octet-stream' in content_type or not 'html' in content_type: | ||||
|                 logger.debug(f"Got a reply that may be a download or possibly a text attachment, checking..") | ||||
|                 try: | ||||
|                     import magic | ||||
|                     mime = magic.from_buffer(html_content, mime=True) | ||||
|                     logger.debug(f"Guessing mime type, original content_type '{content_type}', mime type detected '{mime}'") | ||||
|                     if mime and "/" in mime: # looks valid and is a valid mime type | ||||
|                         content_type = mime | ||||
|                 except Exception as e: | ||||
|                     logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}") | ||||
|  | ||||
|             if 'text/' in content_type and not 'html' in content_type: | ||||
|                 # Don't run get_text or xpath/css filters on plaintext | ||||
|                 stripped_text_from_html = html_content | ||||
|             else: | ||||
|                 # If not JSON, and if it's not text/plain.. | ||||
|                 # Does it have some ld+json price data? used for easier monitoring | ||||
|                 update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content) | ||||
|  | ||||
|   | ||||
| @@ -165,3 +165,53 @@ def test_check_basic_change_detection_functionality(client, live_server, measure | ||||
|     # Cleanup everything | ||||
|     res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) | ||||
|     assert b'Deleted' in res.data | ||||
|  | ||||
| def test_non_text_mime_or_downloads(client, live_server, measure_memory_usage): | ||||
|     with open("test-datastore/endpoint-content.txt", "w") as f: | ||||
|         f.write("""some random text that should be split by line | ||||
| and not parsed with html_to_text | ||||
| this way we know that it correctly parsed as plain text | ||||
| \r\n | ||||
| ok\r\n | ||||
| got it\r\n | ||||
| """) | ||||
|  | ||||
|     test_url = url_for('test_endpoint', content_type="application/octet-stream", _external=True) | ||||
|  | ||||
|     # Add our URL to the import page | ||||
|     res = client.post( | ||||
|         url_for("imports.import_page"), | ||||
|         data={"urls": test_url}, | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|  | ||||
|     assert b"1 Imported" in res.data | ||||
|  | ||||
|     wait_for_all_checks(client) | ||||
|  | ||||
|     ### check the front end | ||||
|     res = client.get( | ||||
|         url_for("ui.ui_views.preview_page", uuid="first"), | ||||
|         follow_redirects=True | ||||
|     ) | ||||
|     assert b"some random text that should be split by line\n" in res.data | ||||
|     #### | ||||
|  | ||||
|     # Check the snapshot by API that it has linefeeds too | ||||
|     watch_uuid = next(iter(live_server.app.config['DATASTORE'].data['watching'])) | ||||
|     api_key = live_server.app.config['DATASTORE'].data['settings']['application'].get('api_access_token') | ||||
|     res = client.get( | ||||
|         url_for("watchhistory", uuid=watch_uuid), | ||||
|         headers={'x-api-key': api_key}, | ||||
|     ) | ||||
|  | ||||
|     # Fetch a snapshot by timestamp, check the right one was found | ||||
|     res = client.get( | ||||
|         url_for("watchsinglehistory", uuid=watch_uuid, timestamp=list(res.json.keys())[-1]), | ||||
|         headers={'x-api-key': api_key}, | ||||
|     ) | ||||
|     assert b"some random text that should be split by line\n" in res.data | ||||
|  | ||||
|  | ||||
|     res = client.get(url_for("ui.form_delete", uuid="all"), follow_redirects=True) | ||||
|  | ||||
|   | ||||
		Reference in New Issue
	
	Block a user