mirror of
				https://github.com/dgtlmoon/changedetection.io.git
				synced 2025-11-04 00:27:48 +00:00 
			
		
		
		
	Compare commits
	
		
			1 Commits
		
	
	
		
			OpenAPI-va
			...
			fetcher-ti
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 
						 | 
					33f1dd6828 | 
@@ -46,139 +46,137 @@ class perform_site_check():
 | 
			
		||||
        if 'Accept-Encoding' in request_headers and "br" in request_headers['Accept-Encoding']:
 | 
			
		||||
            request_headers['Accept-Encoding'] = request_headers['Accept-Encoding'].replace(', br', '')
 | 
			
		||||
 | 
			
		||||
        # @todo check the failures are really handled how we expect
 | 
			
		||||
        timeout = self.datastore.data['settings']['requests']['timeout']
 | 
			
		||||
        url = self.datastore.get_val(uuid, 'url')
 | 
			
		||||
        request_body = self.datastore.get_val(uuid, 'body')
 | 
			
		||||
        request_method = self.datastore.get_val(uuid, 'method')
 | 
			
		||||
        ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
 | 
			
		||||
 | 
			
		||||
        # Pluggable content fetcher
 | 
			
		||||
        prefer_backend = watch['fetch_backend']
 | 
			
		||||
        if hasattr(content_fetcher, prefer_backend):
 | 
			
		||||
            klass = getattr(content_fetcher, prefer_backend)
 | 
			
		||||
        else:
 | 
			
		||||
            timeout = self.datastore.data['settings']['requests']['timeout']
 | 
			
		||||
            url = self.datastore.get_val(uuid, 'url')
 | 
			
		||||
            request_body = self.datastore.get_val(uuid, 'body')
 | 
			
		||||
            request_method = self.datastore.get_val(uuid, 'method')
 | 
			
		||||
            ignore_status_code = self.datastore.get_val(uuid, 'ignore_status_codes')
 | 
			
		||||
            # If the klass doesnt exist, just use a default
 | 
			
		||||
            klass = getattr(content_fetcher, "html_requests")
 | 
			
		||||
 | 
			
		||||
            # Pluggable content fetcher
 | 
			
		||||
            prefer_backend = watch['fetch_backend']
 | 
			
		||||
            if hasattr(content_fetcher, prefer_backend):
 | 
			
		||||
                klass = getattr(content_fetcher, prefer_backend)
 | 
			
		||||
 | 
			
		||||
        fetcher = klass()
 | 
			
		||||
        fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
 | 
			
		||||
        # Fetching complete, now filters
 | 
			
		||||
        # @todo move to class / maybe inside of fetcher abstract base?
 | 
			
		||||
 | 
			
		||||
        # @note: I feel like the following should be in a more obvious chain system
 | 
			
		||||
        #  - Check filter text
 | 
			
		||||
        #  - Is the checksum different?
 | 
			
		||||
        #  - Do we convert to JSON?
 | 
			
		||||
        # https://stackoverflow.com/questions/41817578/basic-method-chaining ?
 | 
			
		||||
        # return content().textfilter().jsonextract().checksumcompare() ?
 | 
			
		||||
 | 
			
		||||
        is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
 | 
			
		||||
        is_html = not is_json
 | 
			
		||||
        css_filter_rule = watch['css_filter']
 | 
			
		||||
        subtractive_selectors = watch.get(
 | 
			
		||||
            "subtractive_selectors", []
 | 
			
		||||
        ) + self.datastore.data["settings"]["application"].get(
 | 
			
		||||
            "global_subtractive_selectors", []
 | 
			
		||||
        )
 | 
			
		||||
 | 
			
		||||
        has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
 | 
			
		||||
        has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip())
 | 
			
		||||
 | 
			
		||||
        if is_json and not has_filter_rule:
 | 
			
		||||
            css_filter_rule = "json:$"
 | 
			
		||||
            has_filter_rule = True
 | 
			
		||||
 | 
			
		||||
        if has_filter_rule:
 | 
			
		||||
            if 'json:' in css_filter_rule:
 | 
			
		||||
                stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
 | 
			
		||||
                is_html = False
 | 
			
		||||
 | 
			
		||||
        if is_html:
 | 
			
		||||
            # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
 | 
			
		||||
            html_content = fetcher.content
 | 
			
		||||
 | 
			
		||||
            # If not JSON,  and if it's not text/plain..
 | 
			
		||||
            if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
 | 
			
		||||
                # Don't run get_text or xpath/css filters on plaintext
 | 
			
		||||
                stripped_text_from_html = html_content
 | 
			
		||||
            else:
 | 
			
		||||
                # If the klass doesnt exist, just use a default
 | 
			
		||||
                klass = getattr(content_fetcher, "html_requests")
 | 
			
		||||
                # Then we assume HTML
 | 
			
		||||
                if has_filter_rule:
 | 
			
		||||
                    # For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
 | 
			
		||||
                    if css_filter_rule[0] == '/':
 | 
			
		||||
                        html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule, html_content=fetcher.content)
 | 
			
		||||
                    else:
 | 
			
		||||
                        # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
 | 
			
		||||
                        html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
 | 
			
		||||
                if has_subtractive_selectors:
 | 
			
		||||
                    html_content = html_tools.element_removal(subtractive_selectors, html_content)
 | 
			
		||||
                # extract text
 | 
			
		||||
                stripped_text_from_html = \
 | 
			
		||||
                    html_tools.html_to_text(
 | 
			
		||||
                        html_content,
 | 
			
		||||
                        render_anchor_tag_content=self.datastore.data["settings"][
 | 
			
		||||
                            "application"].get(
 | 
			
		||||
                            "render_anchor_tag_content", False)
 | 
			
		||||
                    )
 | 
			
		||||
                
 | 
			
		||||
        # Re #340 - return the content before the 'ignore text' was applied
 | 
			
		||||
        text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
 | 
			
		||||
 | 
			
		||||
        # We rely on the actual text in the html output.. many sites have random script vars etc,
 | 
			
		||||
        # in the future we'll implement other mechanisms.
 | 
			
		||||
 | 
			
		||||
        update_obj["last_check_status"] = fetcher.get_last_status_code()
 | 
			
		||||
 | 
			
		||||
        # If there's text to skip
 | 
			
		||||
        # @todo we could abstract out the get_text() to handle this cleaner
 | 
			
		||||
        text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
 | 
			
		||||
        if len(text_to_ignore):
 | 
			
		||||
            stripped_text_from_html = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
 | 
			
		||||
        else:
 | 
			
		||||
            stripped_text_from_html = stripped_text_from_html.encode('utf8')
 | 
			
		||||
 | 
			
		||||
        # Re #133 - if we should strip whitespaces from triggering the change detected comparison
 | 
			
		||||
        if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
 | 
			
		||||
            fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
 | 
			
		||||
        else:
 | 
			
		||||
            fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
 | 
			
		||||
 | 
			
		||||
        # On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
 | 
			
		||||
        if not len(watch['previous_md5']):
 | 
			
		||||
            watch['previous_md5'] = fetched_md5
 | 
			
		||||
            update_obj["previous_md5"] = fetched_md5
 | 
			
		||||
 | 
			
		||||
        blocked_by_not_found_trigger_text = False
 | 
			
		||||
 | 
			
		||||
        if len(watch['trigger_text']):
 | 
			
		||||
            # Yeah, lets block first until something matches
 | 
			
		||||
            blocked_by_not_found_trigger_text = True
 | 
			
		||||
            # Filter and trigger works the same, so reuse it
 | 
			
		||||
            result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
 | 
			
		||||
                                                  wordlist=watch['trigger_text'],
 | 
			
		||||
                                                  mode="line numbers")
 | 
			
		||||
            if result:
 | 
			
		||||
                blocked_by_not_found_trigger_text = False
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            fetcher = klass()
 | 
			
		||||
            fetcher.run(url, timeout, request_headers, request_body, request_method, ignore_status_code)
 | 
			
		||||
            # Fetching complete, now filters
 | 
			
		||||
            # @todo move to class / maybe inside of fetcher abstract base?
 | 
			
		||||
 | 
			
		||||
            # @note: I feel like the following should be in a more obvious chain system
 | 
			
		||||
            #  - Check filter text
 | 
			
		||||
            #  - Is the checksum different?
 | 
			
		||||
            #  - Do we convert to JSON?
 | 
			
		||||
            # https://stackoverflow.com/questions/41817578/basic-method-chaining ?
 | 
			
		||||
            # return content().textfilter().jsonextract().checksumcompare() ?
 | 
			
		||||
 | 
			
		||||
            is_json = 'application/json' in fetcher.headers.get('Content-Type', '')
 | 
			
		||||
            is_html = not is_json
 | 
			
		||||
            css_filter_rule = watch['css_filter']
 | 
			
		||||
            subtractive_selectors = watch.get(
 | 
			
		||||
                "subtractive_selectors", []
 | 
			
		||||
            ) + self.datastore.data["settings"]["application"].get(
 | 
			
		||||
                "global_subtractive_selectors", []
 | 
			
		||||
            )
 | 
			
		||||
 | 
			
		||||
            has_filter_rule = css_filter_rule and len(css_filter_rule.strip())
 | 
			
		||||
            has_subtractive_selectors = subtractive_selectors and len(subtractive_selectors[0].strip())
 | 
			
		||||
 | 
			
		||||
            if is_json and not has_filter_rule:
 | 
			
		||||
                css_filter_rule = "json:$"
 | 
			
		||||
                has_filter_rule = True
 | 
			
		||||
 | 
			
		||||
            if has_filter_rule:
 | 
			
		||||
                if 'json:' in css_filter_rule:
 | 
			
		||||
                    stripped_text_from_html = html_tools.extract_json_as_string(content=fetcher.content, jsonpath_filter=css_filter_rule)
 | 
			
		||||
                    is_html = False
 | 
			
		||||
 | 
			
		||||
            if is_html:
 | 
			
		||||
                # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
 | 
			
		||||
                html_content = fetcher.content
 | 
			
		||||
 | 
			
		||||
                # If not JSON,  and if it's not text/plain..
 | 
			
		||||
                if 'text/plain' in fetcher.headers.get('Content-Type', '').lower():
 | 
			
		||||
                    # Don't run get_text or xpath/css filters on plaintext
 | 
			
		||||
                    stripped_text_from_html = html_content
 | 
			
		||||
                else:
 | 
			
		||||
                    # Then we assume HTML
 | 
			
		||||
                    if has_filter_rule:
 | 
			
		||||
                        # For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
 | 
			
		||||
                        if css_filter_rule[0] == '/':
 | 
			
		||||
                            html_content = html_tools.xpath_filter(xpath_filter=css_filter_rule, html_content=fetcher.content)
 | 
			
		||||
                        else:
 | 
			
		||||
                            # CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
 | 
			
		||||
                            html_content = html_tools.css_filter(css_filter=css_filter_rule, html_content=fetcher.content)
 | 
			
		||||
                    if has_subtractive_selectors:
 | 
			
		||||
                        html_content = html_tools.element_removal(subtractive_selectors, html_content)
 | 
			
		||||
                    # extract text
 | 
			
		||||
                    stripped_text_from_html = \
 | 
			
		||||
                        html_tools.html_to_text(
 | 
			
		||||
                            html_content,
 | 
			
		||||
                            render_anchor_tag_content=self.datastore.data["settings"][
 | 
			
		||||
                                "application"].get(
 | 
			
		||||
                                "render_anchor_tag_content", False)
 | 
			
		||||
                        )
 | 
			
		||||
            # Re #340 - return the content before the 'ignore text' was applied
 | 
			
		||||
            text_content_before_ignored_filter = stripped_text_from_html.encode('utf-8')
 | 
			
		||||
 | 
			
		||||
            # We rely on the actual text in the html output.. many sites have random script vars etc,
 | 
			
		||||
            # in the future we'll implement other mechanisms.
 | 
			
		||||
 | 
			
		||||
            update_obj["last_check_status"] = fetcher.get_last_status_code()
 | 
			
		||||
 | 
			
		||||
            # If there's text to skip
 | 
			
		||||
            # @todo we could abstract out the get_text() to handle this cleaner
 | 
			
		||||
            text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
 | 
			
		||||
            if len(text_to_ignore):
 | 
			
		||||
                stripped_text_from_html = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
 | 
			
		||||
            else:
 | 
			
		||||
                stripped_text_from_html = stripped_text_from_html.encode('utf8')
 | 
			
		||||
 | 
			
		||||
            # Re #133 - if we should strip whitespaces from triggering the change detected comparison
 | 
			
		||||
            if self.datastore.data['settings']['application'].get('ignore_whitespace', False):
 | 
			
		||||
                fetched_md5 = hashlib.md5(stripped_text_from_html.translate(None, b'\r\n\t ')).hexdigest()
 | 
			
		||||
            else:
 | 
			
		||||
                fetched_md5 = hashlib.md5(stripped_text_from_html).hexdigest()
 | 
			
		||||
 | 
			
		||||
            # On the first run of a site, watch['previous_md5'] will be an empty string, set it the current one.
 | 
			
		||||
            if not len(watch['previous_md5']):
 | 
			
		||||
                watch['previous_md5'] = fetched_md5
 | 
			
		||||
                update_obj["previous_md5"] = fetched_md5
 | 
			
		||||
 | 
			
		||||
            blocked_by_not_found_trigger_text = False
 | 
			
		||||
 | 
			
		||||
            if len(watch['trigger_text']):
 | 
			
		||||
                # Yeah, lets block first until something matches
 | 
			
		||||
                blocked_by_not_found_trigger_text = True
 | 
			
		||||
                # Filter and trigger works the same, so reuse it
 | 
			
		||||
                result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
 | 
			
		||||
                                                      wordlist=watch['trigger_text'],
 | 
			
		||||
                                                      mode="line numbers")
 | 
			
		||||
                if result:
 | 
			
		||||
                    blocked_by_not_found_trigger_text = False
 | 
			
		||||
        if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
 | 
			
		||||
            changed_detected = True
 | 
			
		||||
            update_obj["previous_md5"] = fetched_md5
 | 
			
		||||
            update_obj["last_changed"] = timestamp
 | 
			
		||||
 | 
			
		||||
 | 
			
		||||
            if not blocked_by_not_found_trigger_text and watch['previous_md5'] != fetched_md5:
 | 
			
		||||
                changed_detected = True
 | 
			
		||||
                update_obj["previous_md5"] = fetched_md5
 | 
			
		||||
                update_obj["last_changed"] = timestamp
 | 
			
		||||
        # Extract title as title
 | 
			
		||||
        if is_html:
 | 
			
		||||
            if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
 | 
			
		||||
                if not watch['title'] or not len(watch['title']):
 | 
			
		||||
                    update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
 | 
			
		||||
 | 
			
		||||
        if self.datastore.data['settings']['application'].get('real_browser_save_screenshot', True):
 | 
			
		||||
            screenshot = fetcher.screenshot()
 | 
			
		||||
 | 
			
		||||
            # Extract title as title
 | 
			
		||||
            if is_html:
 | 
			
		||||
                if self.datastore.data['settings']['application']['extract_title_as_title'] or watch['extract_title_as_title']:
 | 
			
		||||
                    if not watch['title'] or not len(watch['title']):
 | 
			
		||||
                        update_obj['title'] = html_tools.extract_element(find='title', html_content=fetcher.content)
 | 
			
		||||
 | 
			
		||||
            if self.datastore.data['settings']['application'].get('real_browser_save_screenshot', True):
 | 
			
		||||
                screenshot = fetcher.screenshot()
 | 
			
		||||
 | 
			
		||||
            fetcher.quit()
 | 
			
		||||
        fetcher.quit()
 | 
			
		||||
 | 
			
		||||
        return changed_detected, update_obj, text_content_before_ignored_filter, screenshot
 | 
			
		||||
 
 | 
			
		||||
		Reference in New Issue
	
	Block a user