# HTML to TEXT/JSON DIFFERENCE self.fetcher import hashlib import json import os import re import urllib3 from changedetectionio.conditions import execute_ruleset_against_all_plugins from changedetectionio.processors import difference_detection_processor from changedetectionio.html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text, TRANSLATE_WHITESPACE_TABLE from changedetectionio import html_tools, content_fetchers from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from loguru import logger urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) name = 'Webpage Text/HTML, JSON and PDF changes' description = 'Detects all text changes where possible' json_filter_prefixes = ['json:', 'jq:', 'jqraw:'] # Assume it's this type if the server says nothing on content-type DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html' # When to apply the 'cdata to real HTML' hack # @todo Some heuristic check instead? first and last bytes? maybe some new def that gets header+first 200 bytes? then we can unittest RSS_XML_CONTENT_TYPES = [ "application/rss+xml", "application/rdf+xml", "text/xml", "application/xml", "application/atom+xml", "text/rss+xml", # rare, non-standard "application/x-rss+xml", # legacy (older feed software) "application/x-atom+xml", # legacy (older Atom) ] # JSON Content-types # @todo Some heuristic check instead? first and last bytes? maybe some new def that gets header+first 200 bytes? then we can unittest JSON_CONTENT_TYPES = [ "application/activity+json", "application/feed+json", "application/json", "application/ld+json", "application/vnd.api+json", ] class FilterNotFoundInResponse(ValueError): def __init__(self, msg, screenshot=None, xpath_data=None): self.screenshot = screenshot self.xpath_data = xpath_data ValueError.__init__(self, msg) class PDFToHTMLToolNotFound(ValueError): def __init__(self, msg): ValueError.__init__(self, msg) # Some common stuff here that can be moved to a base class # (set_proxy_from_list) class perform_site_check(difference_detection_processor): def run_changedetection(self, watch): changed_detected = False html_content = "" screenshot = False # as bytes stripped_text_from_html = "" if not watch: raise Exception("Watch no longer exists.") ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower() # Try to detect better mime types if its a download or not announced as HTML #@todo also goes into our heurestic detcet class if 'attachment' in self.fetcher.get_all_headers().get('content-disposition','').lower() or 'octet-stream' in ctype_header: logger.debug(f"Got a reply that may be a download or possibly a text attachment, checking..") try: import magic mime = magic.from_buffer(self.fetcher.content, mime=True) logger.debug(f"Guessing mime type, original content_type '{ctype_header}', mime type detected '{mime}'") if mime and "/" in mime: # looks valid and is a valid mime type ctype_header = mime except Exception as e: logger.error(f"Error getting a more precise mime type from 'magic' library ({str(e)}") # Unset any existing notification error update_obj = {'last_notification_error': False, 'last_error': False} url = watch.link self.screenshot = self.fetcher.screenshot self.xpath_data = self.fetcher.xpath_data # Track the content type update_obj['content_type'] = ctype_header # Watches added automatically in the queue manager will skip if its the same checksum as the previous run # Saves a lot of CPU update_obj['previous_md5_before_filters'] = hashlib.md5(self.fetcher.content.encode('utf-8')).hexdigest() # Fetching complete, now filters # @note: I feel like the following should be in a more obvious chain system # - Check filter text # - Is the checksum different? # - Do we convert to JSON? # https://stackoverflow.com/questions/41817578/basic-method-chaining ? # return content().textfilter().jsonextract().checksumcompare() ? is_json = any(s in ctype_header for s in JSON_CONTENT_TYPES) is_html = not is_json is_rss = False # Go into RSS preprocess for converting CDATA/comment to usable text if '', metadata + '') # Better would be if Watch.model could access the global data also # and then use getattr https://docs.python.org/3/reference/datamodel.html#object.__getitem__ # https://realpython.com/inherit-python-dict/ instead of doing it procedurely include_filters_from_tags = self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='include_filters') # 1845 - remove duplicated filters in both group and watch include filter include_filters_rule = list(dict.fromkeys(watch.get('include_filters', []) + include_filters_from_tags)) subtractive_selectors = [*self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='subtractive_selectors'), *watch.get("subtractive_selectors", []), *self.datastore.data["settings"]["application"].get("global_subtractive_selectors", []) ] # Inject a virtual LD+JSON price tracker rule if watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT: include_filters_rule += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS has_filter_rule = len(include_filters_rule) and len(include_filters_rule[0].strip()) has_subtractive_selectors = len(subtractive_selectors) and len(subtractive_selectors[0].strip()) if is_json and not has_filter_rule: include_filters_rule.append("json:$") has_filter_rule = True if is_json: # Sort the JSON so we dont get false alerts when the content is just re-ordered try: self.fetcher.content = json.dumps(json.loads(self.fetcher.content), sort_keys=True) except Exception as e: # Might have just been a snippet, or otherwise bad JSON, continue pass if has_filter_rule: for filter in include_filters_rule: if any(prefix in filter for prefix in json_filter_prefixes): stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter) is_html = False if is_html or watch.is_source_type_url: # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content) html_content = self.fetcher.content # Some kind of "text" but definitely not RSS looking if (not is_rss and not 'html' in ctype_header and 'text/' in ctype_header and not any(s in ctype_header for s in RSS_XML_CONTENT_TYPES) and not ' 0: regex_matched_output = [] for s_re in extract_text: # incase they specified something in '/.../x' if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE): regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re) result = re.findall(regex, stripped_text_from_html) for l in result: if type(l) is tuple: # @todo - some formatter option default (between groups) regex_matched_output += list(l) + ['\n'] else: # @todo - some formatter option default (between each ungrouped result) regex_matched_output += [l] + ['\n'] else: # Doesnt look like regex, just hunt for plaintext and return that which matches # `stripped_text_from_html` will be bytes, so we must encode s_re also to bytes r = re.compile(re.escape(s_re), re.IGNORECASE) res = r.findall(stripped_text_from_html) if res: for match in res: regex_matched_output += [match] + ['\n'] ########################################################## stripped_text_from_html = '' if regex_matched_output: # @todo some formatter for presentation? stripped_text_from_html = ''.join(regex_matched_output) if watch.get('remove_duplicate_lines'): stripped_text_from_html = '\n'.join(dict.fromkeys(line for line in stripped_text_from_html.replace("\n\n", "\n").splitlines())) if watch.get('sort_text_alphabetically'): # Note: Because a

something

will add an extra line feed to signify the paragraph gap # we end up with 'Some text\n\n', sorting will add all those extra \n at the start, so we remove them here. stripped_text_from_html = stripped_text_from_html.replace("\n\n", "\n") stripped_text_from_html = '\n'.join(sorted(stripped_text_from_html.splitlines(), key=lambda x: x.lower())) ### CALCULATE MD5 # If there's text to ignore text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', []) text_to_ignore += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='ignore_text') text_for_checksuming = stripped_text_from_html if text_to_ignore: text_for_checksuming = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore) # Some people prefer to also completely remove it strip_ignored_lines = watch.get('strip_ignored_lines') if watch.get('strip_ignored_lines') is not None else self.datastore.data['settings']['application'].get('strip_ignored_lines') if strip_ignored_lines: # @todo add test in the 'preview' mode, check the widget works? compare to datastruct stripped_text_from_html = text_for_checksuming # Re #133 - if we should strip whitespaces from triggering the change detected comparison if text_for_checksuming and self.datastore.data['settings']['application'].get('ignore_whitespace', False): fetched_md5 = hashlib.md5(text_for_checksuming.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest() else: fetched_md5 = hashlib.md5(text_for_checksuming.encode('utf-8')).hexdigest() ############ Blocking rules, after checksum ################# blocked = False trigger_text = list(dict.fromkeys(watch.get('trigger_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text'))) if len(trigger_text): # Assume blocked blocked = True # Filter and trigger works the same, so reuse it # It should return the line numbers that match # Unblock flow if the trigger was found (some text remained after stripped what didnt match) result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), wordlist=trigger_text, mode="line numbers") # Unblock if the trigger was found if result: blocked = False text_should_not_be_present = list(dict.fromkeys(watch.get('text_should_not_be_present', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present'))) if len(text_should_not_be_present): # If anything matched, then we should block a change from happening result = html_tools.strip_ignore_text(content=str(stripped_text_from_html), wordlist=text_should_not_be_present, mode="line numbers") if result: blocked = True # And check if 'conditions' will let this pass through if watch.get('conditions') and watch.get('conditions_match_logic'): conditions_result = execute_ruleset_against_all_plugins(current_watch_uuid=watch.get('uuid'), application_datastruct=self.datastore.data, ephemeral_data={ 'text': stripped_text_from_html } ) if not conditions_result.get('result'): # Conditions say "Condition not met" so we block it. blocked = True # Looks like something changed, but did it match all the rules? if blocked: changed_detected = False else: # The main thing that all this at the moment comes down to :) if watch.get('previous_md5') != fetched_md5: changed_detected = True # Always record the new checksum update_obj["previous_md5"] = fetched_md5 # On the first run of a site, watch['previous_md5'] will be None, set it the current one. if not watch.get('previous_md5'): watch['previous_md5'] = fetched_md5 logger.debug(f"Watch UUID {watch.get('uuid')} content check - Previous MD5: {watch.get('previous_md5')}, Fetched MD5 {fetched_md5}") if changed_detected: if watch.get('check_unique_lines', False): ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace') has_unique_lines = watch.lines_contain_something_unique_compared_to_history( lines=stripped_text_from_html.splitlines(), ignore_whitespace=ignore_whitespace ) # One or more lines? unsure? if not has_unique_lines: logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False") changed_detected = False else: logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content") # stripped_text_from_html - Everything after filters and NO 'ignored' content return changed_detected, update_obj, stripped_text_from_html