|
|
|
|
@@ -37,355 +37,516 @@ class PDFToHTMLToolNotFound(ValueError):
|
|
|
|
|
ValueError.__init__(self, msg)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class FilterConfig:
|
|
|
|
|
"""Consolidates all filter and rule configurations from watch, tags, and global settings."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, watch, datastore):
|
|
|
|
|
self.watch = watch
|
|
|
|
|
self.datastore = datastore
|
|
|
|
|
self.watch_uuid = watch.get('uuid')
|
|
|
|
|
# Cache computed properties to avoid repeated list operations
|
|
|
|
|
self._include_filters_cache = None
|
|
|
|
|
self._subtractive_selectors_cache = None
|
|
|
|
|
|
|
|
|
|
def _get_merged_rules(self, attr, include_global=False):
|
|
|
|
|
"""Merge rules from watch, tags, and optionally global settings."""
|
|
|
|
|
watch_rules = self.watch.get(attr, [])
|
|
|
|
|
tag_rules = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr=attr)
|
|
|
|
|
rules = list(dict.fromkeys(watch_rules + tag_rules))
|
|
|
|
|
|
|
|
|
|
if include_global:
|
|
|
|
|
global_rules = self.datastore.data['settings']['application'].get(f'global_{attr}', [])
|
|
|
|
|
rules = list(dict.fromkeys(rules + global_rules))
|
|
|
|
|
|
|
|
|
|
return rules
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def include_filters(self):
|
|
|
|
|
if self._include_filters_cache is None:
|
|
|
|
|
filters = self._get_merged_rules('include_filters')
|
|
|
|
|
# Inject LD+JSON price tracker rule if enabled
|
|
|
|
|
if self.watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT:
|
|
|
|
|
filters += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS
|
|
|
|
|
self._include_filters_cache = filters
|
|
|
|
|
return self._include_filters_cache
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def subtractive_selectors(self):
|
|
|
|
|
if self._subtractive_selectors_cache is None:
|
|
|
|
|
watch_selectors = self.watch.get("subtractive_selectors", [])
|
|
|
|
|
tag_selectors = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr='subtractive_selectors')
|
|
|
|
|
global_selectors = self.datastore.data["settings"]["application"].get("global_subtractive_selectors", [])
|
|
|
|
|
self._subtractive_selectors_cache = [*tag_selectors, *watch_selectors, *global_selectors]
|
|
|
|
|
return self._subtractive_selectors_cache
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def extract_text(self):
|
|
|
|
|
return self._get_merged_rules('extract_text')
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def ignore_text(self):
|
|
|
|
|
return self._get_merged_rules('ignore_text', include_global=True)
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def trigger_text(self):
|
|
|
|
|
return self._get_merged_rules('trigger_text')
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def text_should_not_be_present(self):
|
|
|
|
|
return self._get_merged_rules('text_should_not_be_present')
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def has_include_filters(self):
|
|
|
|
|
return bool(self.include_filters) and bool(self.include_filters[0].strip())
|
|
|
|
|
|
|
|
|
|
@property
|
|
|
|
|
def has_subtractive_selectors(self):
|
|
|
|
|
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ContentTransformer:
|
|
|
|
|
"""Handles text transformations like trimming, sorting, and deduplication."""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def trim_whitespace(text):
|
|
|
|
|
"""Remove leading/trailing whitespace from each line."""
|
|
|
|
|
# Use generator expression to avoid building intermediate list
|
|
|
|
|
return '\n'.join(line.strip() for line in text.replace("\n\n", "\n").splitlines())
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def remove_duplicate_lines(text):
|
|
|
|
|
"""Remove duplicate lines while preserving order."""
|
|
|
|
|
return '\n'.join(dict.fromkeys(line for line in text.replace("\n\n", "\n").splitlines()))
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def sort_alphabetically(text):
|
|
|
|
|
"""Sort lines alphabetically (case-insensitive)."""
|
|
|
|
|
# Remove double line feeds before sorting
|
|
|
|
|
text = text.replace("\n\n", "\n")
|
|
|
|
|
return '\n'.join(sorted(text.splitlines(), key=lambda x: x.lower()))
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def extract_by_regex(text, regex_patterns):
|
|
|
|
|
"""Extract text matching regex patterns."""
|
|
|
|
|
# Use list of strings instead of concatenating lists repeatedly (avoids O(n²) behavior)
|
|
|
|
|
regex_matched_output = []
|
|
|
|
|
|
|
|
|
|
for s_re in regex_patterns:
|
|
|
|
|
# Check if it's perl-style regex /.../
|
|
|
|
|
if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
|
|
|
|
|
regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
|
|
|
|
|
result = re.findall(regex, text)
|
|
|
|
|
|
|
|
|
|
for match in result:
|
|
|
|
|
if type(match) is tuple:
|
|
|
|
|
regex_matched_output.extend(match)
|
|
|
|
|
regex_matched_output.append('\n')
|
|
|
|
|
else:
|
|
|
|
|
regex_matched_output.append(match)
|
|
|
|
|
regex_matched_output.append('\n')
|
|
|
|
|
else:
|
|
|
|
|
# Plain text search (case-insensitive)
|
|
|
|
|
r = re.compile(re.escape(s_re), re.IGNORECASE)
|
|
|
|
|
res = r.findall(text)
|
|
|
|
|
if res:
|
|
|
|
|
for match in res:
|
|
|
|
|
regex_matched_output.append(match)
|
|
|
|
|
regex_matched_output.append('\n')
|
|
|
|
|
|
|
|
|
|
return ''.join(regex_matched_output) if regex_matched_output else ''
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class RuleEngine:
|
|
|
|
|
"""Evaluates blocking rules (triggers, conditions, text_should_not_be_present)."""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def evaluate_trigger_text(content, trigger_patterns):
|
|
|
|
|
"""
|
|
|
|
|
Check if trigger text is present. If trigger_text is configured,
|
|
|
|
|
content is blocked UNLESS the trigger is found.
|
|
|
|
|
Returns True if blocked, False if allowed.
|
|
|
|
|
"""
|
|
|
|
|
if not trigger_patterns:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
# Assume blocked if trigger_text is configured
|
|
|
|
|
result = html_tools.strip_ignore_text(
|
|
|
|
|
content=str(content),
|
|
|
|
|
wordlist=trigger_patterns,
|
|
|
|
|
mode="line numbers"
|
|
|
|
|
)
|
|
|
|
|
# Unblock if trigger was found
|
|
|
|
|
return not bool(result)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def evaluate_text_should_not_be_present(content, patterns):
|
|
|
|
|
"""
|
|
|
|
|
Check if forbidden text is present. If found, block the change.
|
|
|
|
|
Returns True if blocked, False if allowed.
|
|
|
|
|
"""
|
|
|
|
|
if not patterns:
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
result = html_tools.strip_ignore_text(
|
|
|
|
|
content=str(content),
|
|
|
|
|
wordlist=patterns,
|
|
|
|
|
mode="line numbers"
|
|
|
|
|
)
|
|
|
|
|
# Block if forbidden text was found
|
|
|
|
|
return bool(result)
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def evaluate_conditions(watch, datastore, content):
|
|
|
|
|
"""
|
|
|
|
|
Evaluate custom conditions ruleset.
|
|
|
|
|
Returns True if blocked, False if allowed.
|
|
|
|
|
"""
|
|
|
|
|
if not watch.get('conditions') or not watch.get('conditions_match_logic'):
|
|
|
|
|
return False
|
|
|
|
|
|
|
|
|
|
conditions_result = execute_ruleset_against_all_plugins(
|
|
|
|
|
current_watch_uuid=watch.get('uuid'),
|
|
|
|
|
application_datastruct=datastore.data,
|
|
|
|
|
ephemeral_data={'text': content}
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Block if conditions not met
|
|
|
|
|
return not conditions_result.get('result')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ContentProcessor:
|
|
|
|
|
"""Handles content preprocessing, filtering, and extraction."""
|
|
|
|
|
|
|
|
|
|
def __init__(self, fetcher, watch, filter_config, datastore):
|
|
|
|
|
self.fetcher = fetcher
|
|
|
|
|
self.watch = watch
|
|
|
|
|
self.filter_config = filter_config
|
|
|
|
|
self.datastore = datastore
|
|
|
|
|
|
|
|
|
|
def preprocess_rss(self, content):
|
|
|
|
|
"""Convert CDATA/comments in RSS to usable text."""
|
|
|
|
|
return cdata_in_document_to_text(html_content=content)
|
|
|
|
|
|
|
|
|
|
def preprocess_pdf(self, content, raw_content):
|
|
|
|
|
"""Convert PDF to HTML using external tool."""
|
|
|
|
|
from shutil import which
|
|
|
|
|
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
|
|
|
|
|
if not which(tool):
|
|
|
|
|
raise PDFToHTMLToolNotFound(
|
|
|
|
|
f"Command-line `{tool}` tool was not found in system PATH, was it installed?"
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
|
proc = subprocess.Popen(
|
|
|
|
|
[tool, '-stdout', '-', '-s', 'out.pdf', '-i'],
|
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
|
stdin=subprocess.PIPE
|
|
|
|
|
)
|
|
|
|
|
proc.stdin.write(raw_content)
|
|
|
|
|
proc.stdin.close()
|
|
|
|
|
html_content = proc.stdout.read().decode('utf-8')
|
|
|
|
|
proc.wait(timeout=60)
|
|
|
|
|
|
|
|
|
|
# Add metadata for change detection
|
|
|
|
|
metadata = (
|
|
|
|
|
f"<p>Added by changedetection.io: Document checksum - "
|
|
|
|
|
f"{hashlib.md5(raw_content).hexdigest().upper()} "
|
|
|
|
|
f"Filesize - {len(html_content)} bytes</p>"
|
|
|
|
|
)
|
|
|
|
|
return html_content.replace('</body>', metadata + '</body>')
|
|
|
|
|
|
|
|
|
|
def preprocess_json(self, content, has_filters):
|
|
|
|
|
"""Format and sort JSON content."""
|
|
|
|
|
# Force reformat if no filters specified
|
|
|
|
|
if not has_filters:
|
|
|
|
|
content = html_tools.extract_json_as_string(content=content, json_filter="json:$")
|
|
|
|
|
|
|
|
|
|
# Sort JSON to avoid false alerts from reordering
|
|
|
|
|
try:
|
|
|
|
|
content = json.dumps(json.loads(content), sort_keys=True)
|
|
|
|
|
except Exception:
|
|
|
|
|
# Might be malformed JSON, continue anyway
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
return content
|
|
|
|
|
|
|
|
|
|
def apply_include_filters(self, content, stream_content_type):
|
|
|
|
|
"""Apply CSS, XPath, or JSON filters to extract specific content."""
|
|
|
|
|
filtered_content = ""
|
|
|
|
|
|
|
|
|
|
for filter_rule in self.filter_config.include_filters:
|
|
|
|
|
# XPath filters
|
|
|
|
|
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
|
|
|
|
|
filtered_content += html_tools.xpath_filter(
|
|
|
|
|
xpath_filter=filter_rule.replace('xpath:', ''),
|
|
|
|
|
html_content=content,
|
|
|
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url,
|
|
|
|
|
is_rss=stream_content_type.is_rss
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# XPath1 filters (first match only)
|
|
|
|
|
elif filter_rule.startswith('xpath1:'):
|
|
|
|
|
filtered_content += html_tools.xpath1_filter(
|
|
|
|
|
xpath_filter=filter_rule.replace('xpath1:', ''),
|
|
|
|
|
html_content=content,
|
|
|
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url,
|
|
|
|
|
is_rss=stream_content_type.is_rss
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# JSON filters
|
|
|
|
|
elif any(filter_rule.startswith(prefix) for prefix in json_filter_prefixes):
|
|
|
|
|
filtered_content += html_tools.extract_json_as_string(
|
|
|
|
|
content=content,
|
|
|
|
|
json_filter=filter_rule
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# CSS selectors, default fallback
|
|
|
|
|
else:
|
|
|
|
|
filtered_content += html_tools.include_filters(
|
|
|
|
|
include_filters=filter_rule,
|
|
|
|
|
html_content=content,
|
|
|
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# Raise error if filter returned nothing
|
|
|
|
|
if not filtered_content.strip():
|
|
|
|
|
raise FilterNotFoundInResponse(
|
|
|
|
|
msg=self.filter_config.include_filters,
|
|
|
|
|
screenshot=self.fetcher.screenshot,
|
|
|
|
|
xpath_data=self.fetcher.xpath_data
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
return filtered_content
|
|
|
|
|
|
|
|
|
|
def apply_subtractive_selectors(self, content):
|
|
|
|
|
"""Remove elements matching subtractive selectors."""
|
|
|
|
|
return html_tools.element_removal(self.filter_config.subtractive_selectors, content)
|
|
|
|
|
|
|
|
|
|
def extract_text_from_html(self, html_content, stream_content_type):
|
|
|
|
|
"""Convert HTML to plain text."""
|
|
|
|
|
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
|
|
|
|
|
return html_tools.html_to_text(
|
|
|
|
|
html_content=html_content,
|
|
|
|
|
render_anchor_tag_content=do_anchor,
|
|
|
|
|
is_rss=stream_content_type.is_rss
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class ChecksumCalculator:
|
|
|
|
|
"""Calculates checksums with various options."""
|
|
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
|
def calculate(text, ignore_whitespace=False):
|
|
|
|
|
"""Calculate MD5 checksum of text content."""
|
|
|
|
|
if ignore_whitespace:
|
|
|
|
|
text = text.translate(TRANSLATE_WHITESPACE_TABLE)
|
|
|
|
|
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Some common stuff here that can be moved to a base class
|
|
|
|
|
# (set_proxy_from_list)
|
|
|
|
|
class perform_site_check(difference_detection_processor):
|
|
|
|
|
|
|
|
|
|
def run_changedetection(self, watch):
|
|
|
|
|
changed_detected = False
|
|
|
|
|
html_content = ""
|
|
|
|
|
screenshot = False # as bytes
|
|
|
|
|
stripped_text_from_html = ""
|
|
|
|
|
|
|
|
|
|
if not watch:
|
|
|
|
|
raise Exception("Watch no longer exists.")
|
|
|
|
|
|
|
|
|
|
# Initialize components
|
|
|
|
|
filter_config = FilterConfig(watch, self.datastore)
|
|
|
|
|
content_processor = ContentProcessor(self.fetcher, watch, filter_config, self.datastore)
|
|
|
|
|
transformer = ContentTransformer()
|
|
|
|
|
rule_engine = RuleEngine()
|
|
|
|
|
|
|
|
|
|
# Get content type and stream info
|
|
|
|
|
ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower()
|
|
|
|
|
stream_content_type = guess_stream_type(http_content_header=ctype_header, content=self.fetcher.content)
|
|
|
|
|
|
|
|
|
|
# Unset any existing notification error
|
|
|
|
|
update_obj = {'last_notification_error': False, 'last_error': False}
|
|
|
|
|
|
|
|
|
|
url = watch.link
|
|
|
|
|
|
|
|
|
|
self.screenshot = self.fetcher.screenshot
|
|
|
|
|
self.xpath_data = self.fetcher.xpath_data
|
|
|
|
|
|
|
|
|
|
# Track the content type
|
|
|
|
|
# Track the content type and checksum before filters
|
|
|
|
|
update_obj['content_type'] = ctype_header
|
|
|
|
|
|
|
|
|
|
# Watches added automatically in the queue manager will skip if its the same checksum as the previous run
|
|
|
|
|
# Saves a lot of CPU
|
|
|
|
|
update_obj['previous_md5_before_filters'] = hashlib.md5(self.fetcher.content.encode('utf-8')).hexdigest()
|
|
|
|
|
|
|
|
|
|
# Fetching complete, now filters
|
|
|
|
|
# === CONTENT PREPROCESSING ===
|
|
|
|
|
# Avoid creating unnecessary intermediate string copies by reassigning only when needed
|
|
|
|
|
content = self.fetcher.content
|
|
|
|
|
|
|
|
|
|
# @note: I feel like the following should be in a more obvious chain system
|
|
|
|
|
# - Check filter text
|
|
|
|
|
# - Is the checksum different?
|
|
|
|
|
# - Do we convert to JSON?
|
|
|
|
|
# https://stackoverflow.com/questions/41817578/basic-method-chaining ?
|
|
|
|
|
# return content().textfilter().jsonextract().checksumcompare() ?
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Go into RSS preprocess for converting CDATA/comment to usable text
|
|
|
|
|
# RSS preprocessing
|
|
|
|
|
if stream_content_type.is_rss:
|
|
|
|
|
self.fetcher.content = cdata_in_document_to_text(html_content=self.fetcher.content)
|
|
|
|
|
content = content_processor.preprocess_rss(content)
|
|
|
|
|
|
|
|
|
|
# PDF preprocessing
|
|
|
|
|
if watch.is_pdf or stream_content_type.is_pdf:
|
|
|
|
|
from shutil import which
|
|
|
|
|
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
|
|
|
|
|
if not which(tool):
|
|
|
|
|
raise PDFToHTMLToolNotFound("Command-line `{}` tool was not found in system PATH, was it installed?".format(tool))
|
|
|
|
|
|
|
|
|
|
import subprocess
|
|
|
|
|
proc = subprocess.Popen(
|
|
|
|
|
[tool, '-stdout', '-', '-s', 'out.pdf', '-i'],
|
|
|
|
|
stdout=subprocess.PIPE,
|
|
|
|
|
stdin=subprocess.PIPE)
|
|
|
|
|
proc.stdin.write(self.fetcher.raw_content)
|
|
|
|
|
proc.stdin.close()
|
|
|
|
|
self.fetcher.content = proc.stdout.read().decode('utf-8')
|
|
|
|
|
proc.wait(timeout=60)
|
|
|
|
|
|
|
|
|
|
# Add a little metadata so we know if the file changes (like if an image changes, but the text is the same
|
|
|
|
|
# @todo may cause problems with non-UTF8?
|
|
|
|
|
metadata = "<p>Added by changedetection.io: Document checksum - {} Filesize - {} bytes</p>".format(
|
|
|
|
|
hashlib.md5(self.fetcher.raw_content).hexdigest().upper(),
|
|
|
|
|
len(self.fetcher.content))
|
|
|
|
|
|
|
|
|
|
self.fetcher.content = self.fetcher.content.replace('</body>', metadata + '</body>')
|
|
|
|
|
|
|
|
|
|
# Better would be if Watch.model could access the global data also
|
|
|
|
|
# and then use getattr https://docs.python.org/3/reference/datamodel.html#object.__getitem__
|
|
|
|
|
# https://realpython.com/inherit-python-dict/ instead of doing it procedurely
|
|
|
|
|
include_filters_from_tags = self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='include_filters')
|
|
|
|
|
|
|
|
|
|
# 1845 - remove duplicated filters in both group and watch include filter
|
|
|
|
|
include_filters_rule = list(dict.fromkeys(watch.get('include_filters', []) + include_filters_from_tags))
|
|
|
|
|
|
|
|
|
|
subtractive_selectors = [*self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='subtractive_selectors'),
|
|
|
|
|
*watch.get("subtractive_selectors", []),
|
|
|
|
|
*self.datastore.data["settings"]["application"].get("global_subtractive_selectors", [])
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# Inject a virtual LD+JSON price tracker rule
|
|
|
|
|
if watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT:
|
|
|
|
|
include_filters_rule += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS
|
|
|
|
|
|
|
|
|
|
has_filter_rule = len(include_filters_rule) and len(include_filters_rule[0].strip())
|
|
|
|
|
has_subtractive_selectors = len(subtractive_selectors) and len(subtractive_selectors[0].strip())
|
|
|
|
|
content = content_processor.preprocess_pdf(content, self.fetcher.raw_content)
|
|
|
|
|
|
|
|
|
|
# JSON preprocessing
|
|
|
|
|
if stream_content_type.is_json:
|
|
|
|
|
if not has_filter_rule:
|
|
|
|
|
# Force a reformat
|
|
|
|
|
include_filters_rule.append("json:$")
|
|
|
|
|
has_filter_rule = True
|
|
|
|
|
content = content_processor.preprocess_json(content, filter_config.has_include_filters)
|
|
|
|
|
|
|
|
|
|
# Sort the JSON so we dont get false alerts when the content is just re-ordered
|
|
|
|
|
try:
|
|
|
|
|
self.fetcher.content = json.dumps(json.loads(self.fetcher.content), sort_keys=True)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
# Might have just been a snippet, or otherwise bad JSON, continue
|
|
|
|
|
pass
|
|
|
|
|
# HTML obfuscation workarounds
|
|
|
|
|
if stream_content_type.is_html:
|
|
|
|
|
content = html_tools.workarounds_for_obfuscations(content)
|
|
|
|
|
|
|
|
|
|
if has_filter_rule:
|
|
|
|
|
for filter in include_filters_rule:
|
|
|
|
|
if any(prefix in filter for prefix in json_filter_prefixes):
|
|
|
|
|
stripped_text_from_html += html_tools.extract_json_as_string(content=self.fetcher.content, json_filter=filter)
|
|
|
|
|
if stripped_text_from_html:
|
|
|
|
|
stream_content_type.is_json = True
|
|
|
|
|
stream_content_type.is_html = False
|
|
|
|
|
# Check for LD+JSON price data (for HTML content)
|
|
|
|
|
if stream_content_type.is_html:
|
|
|
|
|
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(content)
|
|
|
|
|
|
|
|
|
|
# We have 'watch.is_source_type_url' because we should be able to use selectors on the raw HTML but return just that selected HTML
|
|
|
|
|
if stream_content_type.is_html or watch.is_source_type_url or stream_content_type.is_plaintext or stream_content_type.is_rss or stream_content_type.is_xml or stream_content_type.is_pdf:
|
|
|
|
|
# === FILTER APPLICATION ===
|
|
|
|
|
# Start with content reference, avoid copy until modification
|
|
|
|
|
html_content = content
|
|
|
|
|
|
|
|
|
|
# CSS Filter, extract the HTML that matches and feed that into the existing inscriptis::get_text
|
|
|
|
|
self.fetcher.content = html_tools.workarounds_for_obfuscations(self.fetcher.content)
|
|
|
|
|
html_content = self.fetcher.content
|
|
|
|
|
# Apply include filters (CSS, XPath, JSON)
|
|
|
|
|
if filter_config.has_include_filters:
|
|
|
|
|
html_content = content_processor.apply_include_filters(content, stream_content_type)
|
|
|
|
|
|
|
|
|
|
# Some kind of "text" but definitely not RSS looking
|
|
|
|
|
if stream_content_type.is_plaintext:
|
|
|
|
|
# Don't run get_text or xpath/css filters on plaintext
|
|
|
|
|
# We are not HTML, we are not any kind of RSS, doesnt even look like HTML
|
|
|
|
|
stripped_text_from_html = html_content
|
|
|
|
|
# Apply subtractive selectors
|
|
|
|
|
if filter_config.has_subtractive_selectors:
|
|
|
|
|
html_content = content_processor.apply_subtractive_selectors(html_content)
|
|
|
|
|
|
|
|
|
|
# === TEXT EXTRACTION ===
|
|
|
|
|
if watch.is_source_type_url:
|
|
|
|
|
# For source URLs, keep raw content
|
|
|
|
|
stripped_text = html_content
|
|
|
|
|
else:
|
|
|
|
|
# Extract text from HTML/RSS content (not generic XML)
|
|
|
|
|
if stream_content_type.is_html or stream_content_type.is_rss:
|
|
|
|
|
stripped_text = content_processor.extract_text_from_html(html_content, stream_content_type)
|
|
|
|
|
else:
|
|
|
|
|
# If not JSON, and if it's not text/plain..
|
|
|
|
|
# Does it have some ld+json price data? used for easier monitoring
|
|
|
|
|
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(self.fetcher.content)
|
|
|
|
|
|
|
|
|
|
# Then we assume HTML
|
|
|
|
|
if has_filter_rule:
|
|
|
|
|
html_content = ""
|
|
|
|
|
|
|
|
|
|
for filter_rule in include_filters_rule:
|
|
|
|
|
# For HTML/XML we offer xpath as an option, just start a regular xPath "/.."
|
|
|
|
|
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
|
|
|
|
|
html_content += html_tools.xpath_filter(xpath_filter=filter_rule.replace('xpath:', ''),
|
|
|
|
|
html_content=self.fetcher.content,
|
|
|
|
|
append_pretty_line_formatting=not watch.is_source_type_url,
|
|
|
|
|
is_rss=stream_content_type.is_rss)
|
|
|
|
|
|
|
|
|
|
elif filter_rule.startswith('xpath1:'):
|
|
|
|
|
html_content += html_tools.xpath1_filter(xpath_filter=filter_rule.replace('xpath1:', ''),
|
|
|
|
|
html_content=self.fetcher.content,
|
|
|
|
|
append_pretty_line_formatting=not watch.is_source_type_url,
|
|
|
|
|
is_rss=stream_content_type.is_rss)
|
|
|
|
|
else:
|
|
|
|
|
html_content += html_tools.include_filters(include_filters=filter_rule,
|
|
|
|
|
html_content=self.fetcher.content,
|
|
|
|
|
append_pretty_line_formatting=not watch.is_source_type_url)
|
|
|
|
|
|
|
|
|
|
if not html_content.strip():
|
|
|
|
|
raise FilterNotFoundInResponse(msg=include_filters_rule, screenshot=self.fetcher.screenshot, xpath_data=self.fetcher.xpath_data)
|
|
|
|
|
|
|
|
|
|
if has_subtractive_selectors:
|
|
|
|
|
html_content = html_tools.element_removal(subtractive_selectors, html_content)
|
|
|
|
|
|
|
|
|
|
if watch.is_source_type_url:
|
|
|
|
|
stripped_text_from_html = html_content
|
|
|
|
|
else:
|
|
|
|
|
# extract text
|
|
|
|
|
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
|
|
|
|
|
stripped_text_from_html = html_tools.html_to_text(html_content=html_content,
|
|
|
|
|
render_anchor_tag_content=do_anchor,
|
|
|
|
|
is_rss=stream_content_type.is_rss) # 1874 activate the <title workaround hack
|
|
|
|
|
stripped_text = html_content
|
|
|
|
|
|
|
|
|
|
# === TEXT TRANSFORMATIONS ===
|
|
|
|
|
if watch.get('trim_text_whitespace'):
|
|
|
|
|
stripped_text_from_html = '\n'.join(line.strip() for line in stripped_text_from_html.replace("\n\n", "\n").splitlines())
|
|
|
|
|
stripped_text = transformer.trim_whitespace(stripped_text)
|
|
|
|
|
|
|
|
|
|
# Re #340 - return the content before the 'ignore text' was applied
|
|
|
|
|
# Also used to calculate/show what was removed
|
|
|
|
|
text_content_before_ignored_filter = stripped_text_from_html
|
|
|
|
|
|
|
|
|
|
# @todo whitespace coming from missing rtrim()?
|
|
|
|
|
# stripped_text_from_html could be based on their preferences, replace the processed text with only that which they want to know about.
|
|
|
|
|
# Rewrite's the processing text based on only what diff result they want to see
|
|
|
|
|
# Save text before ignore filters (for diff calculation)
|
|
|
|
|
text_content_before_ignored_filter = stripped_text
|
|
|
|
|
|
|
|
|
|
# === DIFF FILTERING ===
|
|
|
|
|
# If user wants specific diff types (added/removed/replaced only)
|
|
|
|
|
if watch.has_special_diff_filter_options_set() and len(watch.history.keys()):
|
|
|
|
|
# Now the content comes from the diff-parser and not the returned HTTP traffic, so could be some differences
|
|
|
|
|
from changedetectionio import diff
|
|
|
|
|
# needs to not include (added) etc or it may get used twice
|
|
|
|
|
# Replace the processed text with the preferred result
|
|
|
|
|
rendered_diff = diff.render_diff(previous_version_file_contents=watch.get_last_fetched_text_before_filters(),
|
|
|
|
|
newest_version_file_contents=stripped_text_from_html,
|
|
|
|
|
include_equal=False, # not the same lines
|
|
|
|
|
include_added=watch.get('filter_text_added', True),
|
|
|
|
|
include_removed=watch.get('filter_text_removed', True),
|
|
|
|
|
include_replaced=watch.get('filter_text_replaced', True),
|
|
|
|
|
line_feed_sep="\n",
|
|
|
|
|
include_change_type_prefix=False)
|
|
|
|
|
stripped_text = self._apply_diff_filtering(watch, stripped_text, text_content_before_ignored_filter)
|
|
|
|
|
if stripped_text is None:
|
|
|
|
|
# No differences found, but content exists
|
|
|
|
|
c = ChecksumCalculator.calculate(text_content_before_ignored_filter, ignore_whitespace=True)
|
|
|
|
|
return False, {'previous_md5': c}, text_content_before_ignored_filter.encode('utf-8')
|
|
|
|
|
|
|
|
|
|
watch.save_last_text_fetched_before_filters(text_content_before_ignored_filter.encode('utf-8'))
|
|
|
|
|
|
|
|
|
|
if not rendered_diff and stripped_text_from_html:
|
|
|
|
|
# We had some content, but no differences were found
|
|
|
|
|
# Store our new file as the MD5 so it will trigger in the future
|
|
|
|
|
c = hashlib.md5(stripped_text_from_html.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest()
|
|
|
|
|
return False, {'previous_md5': c}, stripped_text_from_html.encode('utf-8')
|
|
|
|
|
else:
|
|
|
|
|
stripped_text_from_html = rendered_diff
|
|
|
|
|
|
|
|
|
|
# Treat pages with no renderable text content as a change? No by default
|
|
|
|
|
# === EMPTY PAGE CHECK ===
|
|
|
|
|
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
|
|
|
|
|
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text_from_html.strip()) == 0:
|
|
|
|
|
raise content_fetchers.exceptions.ReplyWithContentButNoText(url=url,
|
|
|
|
|
status_code=self.fetcher.get_last_status_code(),
|
|
|
|
|
screenshot=self.fetcher.screenshot,
|
|
|
|
|
has_filters=has_filter_rule,
|
|
|
|
|
html_content=html_content,
|
|
|
|
|
xpath_data=self.fetcher.xpath_data
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
# We rely on the actual text in the html output.. many sites have random script vars etc,
|
|
|
|
|
# in the future we'll implement other mechanisms.
|
|
|
|
|
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text.strip()) == 0:
|
|
|
|
|
raise content_fetchers.exceptions.ReplyWithContentButNoText(
|
|
|
|
|
url=url,
|
|
|
|
|
status_code=self.fetcher.get_last_status_code(),
|
|
|
|
|
screenshot=self.fetcher.screenshot,
|
|
|
|
|
has_filters=filter_config.has_include_filters,
|
|
|
|
|
html_content=html_content,
|
|
|
|
|
xpath_data=self.fetcher.xpath_data
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
update_obj["last_check_status"] = self.fetcher.get_last_status_code()
|
|
|
|
|
|
|
|
|
|
# 615 Extract text by regex
|
|
|
|
|
extract_text = list(dict.fromkeys(watch.get('extract_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='extract_text')))
|
|
|
|
|
if len(extract_text) > 0:
|
|
|
|
|
regex_matched_output = []
|
|
|
|
|
for s_re in extract_text:
|
|
|
|
|
# incase they specified something in '/.../x'
|
|
|
|
|
if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
|
|
|
|
|
regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
|
|
|
|
|
result = re.findall(regex, stripped_text_from_html)
|
|
|
|
|
|
|
|
|
|
for l in result:
|
|
|
|
|
if type(l) is tuple:
|
|
|
|
|
# @todo - some formatter option default (between groups)
|
|
|
|
|
regex_matched_output += list(l) + ['\n']
|
|
|
|
|
else:
|
|
|
|
|
# @todo - some formatter option default (between each ungrouped result)
|
|
|
|
|
regex_matched_output += [l] + ['\n']
|
|
|
|
|
else:
|
|
|
|
|
# Doesnt look like regex, just hunt for plaintext and return that which matches
|
|
|
|
|
# `stripped_text_from_html` will be bytes, so we must encode s_re also to bytes
|
|
|
|
|
r = re.compile(re.escape(s_re), re.IGNORECASE)
|
|
|
|
|
res = r.findall(stripped_text_from_html)
|
|
|
|
|
if res:
|
|
|
|
|
for match in res:
|
|
|
|
|
regex_matched_output += [match] + ['\n']
|
|
|
|
|
|
|
|
|
|
##########################################################
|
|
|
|
|
stripped_text_from_html = ''
|
|
|
|
|
|
|
|
|
|
if regex_matched_output:
|
|
|
|
|
# @todo some formatter for presentation?
|
|
|
|
|
stripped_text_from_html = ''.join(regex_matched_output)
|
|
|
|
|
# === REGEX EXTRACTION ===
|
|
|
|
|
if filter_config.extract_text:
|
|
|
|
|
extracted = transformer.extract_by_regex(stripped_text, filter_config.extract_text)
|
|
|
|
|
stripped_text = extracted
|
|
|
|
|
|
|
|
|
|
# === MORE TEXT TRANSFORMATIONS ===
|
|
|
|
|
if watch.get('remove_duplicate_lines'):
|
|
|
|
|
stripped_text_from_html = '\n'.join(dict.fromkeys(line for line in stripped_text_from_html.replace("\n\n", "\n").splitlines()))
|
|
|
|
|
|
|
|
|
|
stripped_text = transformer.remove_duplicate_lines(stripped_text)
|
|
|
|
|
|
|
|
|
|
if watch.get('sort_text_alphabetically'):
|
|
|
|
|
# Note: Because a <p>something</p> will add an extra line feed to signify the paragraph gap
|
|
|
|
|
# we end up with 'Some text\n\n', sorting will add all those extra \n at the start, so we remove them here.
|
|
|
|
|
stripped_text_from_html = stripped_text_from_html.replace("\n\n", "\n")
|
|
|
|
|
stripped_text_from_html = '\n'.join(sorted(stripped_text_from_html.splitlines(), key=lambda x: x.lower()))
|
|
|
|
|
stripped_text = transformer.sort_alphabetically(stripped_text)
|
|
|
|
|
|
|
|
|
|
### CALCULATE MD5
|
|
|
|
|
# If there's text to ignore
|
|
|
|
|
text_to_ignore = watch.get('ignore_text', []) + self.datastore.data['settings']['application'].get('global_ignore_text', [])
|
|
|
|
|
text_to_ignore += self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='ignore_text')
|
|
|
|
|
# === CHECKSUM CALCULATION ===
|
|
|
|
|
text_for_checksuming = stripped_text
|
|
|
|
|
|
|
|
|
|
text_for_checksuming = stripped_text_from_html
|
|
|
|
|
if text_to_ignore:
|
|
|
|
|
text_for_checksuming = html_tools.strip_ignore_text(stripped_text_from_html, text_to_ignore)
|
|
|
|
|
# Some people prefer to also completely remove it
|
|
|
|
|
strip_ignored_lines = watch.get('strip_ignored_lines') if watch.get('strip_ignored_lines') is not None else self.datastore.data['settings']['application'].get('strip_ignored_lines')
|
|
|
|
|
# Apply ignore_text for checksum calculation
|
|
|
|
|
if filter_config.ignore_text:
|
|
|
|
|
text_for_checksuming = html_tools.strip_ignore_text(stripped_text, filter_config.ignore_text)
|
|
|
|
|
|
|
|
|
|
# Optionally remove ignored lines from output
|
|
|
|
|
strip_ignored_lines = watch.get('strip_ignored_lines')
|
|
|
|
|
if strip_ignored_lines is None:
|
|
|
|
|
strip_ignored_lines = self.datastore.data['settings']['application'].get('strip_ignored_lines')
|
|
|
|
|
if strip_ignored_lines:
|
|
|
|
|
# @todo add test in the 'preview' mode, check the widget works? compare to datastruct
|
|
|
|
|
stripped_text_from_html = text_for_checksuming
|
|
|
|
|
stripped_text = text_for_checksuming
|
|
|
|
|
|
|
|
|
|
# Re #133 - if we should strip whitespaces from triggering the change detected comparison
|
|
|
|
|
if text_for_checksuming and self.datastore.data['settings']['application'].get('ignore_whitespace', False):
|
|
|
|
|
fetched_md5 = hashlib.md5(text_for_checksuming.translate(TRANSLATE_WHITESPACE_TABLE).encode('utf-8')).hexdigest()
|
|
|
|
|
else:
|
|
|
|
|
fetched_md5 = hashlib.md5(text_for_checksuming.encode('utf-8')).hexdigest()
|
|
|
|
|
# Calculate checksum
|
|
|
|
|
ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace', False)
|
|
|
|
|
fetched_md5 = ChecksumCalculator.calculate(text_for_checksuming, ignore_whitespace=ignore_whitespace)
|
|
|
|
|
|
|
|
|
|
############ Blocking rules, after checksum #################
|
|
|
|
|
# === BLOCKING RULES EVALUATION ===
|
|
|
|
|
blocked = False
|
|
|
|
|
trigger_text = list(dict.fromkeys(watch.get('trigger_text', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='trigger_text')))
|
|
|
|
|
if len(trigger_text):
|
|
|
|
|
# Assume blocked
|
|
|
|
|
|
|
|
|
|
# Check trigger_text
|
|
|
|
|
if rule_engine.evaluate_trigger_text(stripped_text, filter_config.trigger_text):
|
|
|
|
|
blocked = True
|
|
|
|
|
# Filter and trigger works the same, so reuse it
|
|
|
|
|
# It should return the line numbers that match
|
|
|
|
|
# Unblock flow if the trigger was found (some text remained after stripped what didnt match)
|
|
|
|
|
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
|
|
|
|
|
wordlist=trigger_text,
|
|
|
|
|
mode="line numbers")
|
|
|
|
|
# Unblock if the trigger was found
|
|
|
|
|
if result:
|
|
|
|
|
blocked = False
|
|
|
|
|
|
|
|
|
|
text_should_not_be_present = list(dict.fromkeys(watch.get('text_should_not_be_present', []) + self.datastore.get_tag_overrides_for_watch(uuid=watch.get('uuid'), attr='text_should_not_be_present')))
|
|
|
|
|
if len(text_should_not_be_present):
|
|
|
|
|
# If anything matched, then we should block a change from happening
|
|
|
|
|
result = html_tools.strip_ignore_text(content=str(stripped_text_from_html),
|
|
|
|
|
wordlist=text_should_not_be_present,
|
|
|
|
|
mode="line numbers")
|
|
|
|
|
if result:
|
|
|
|
|
blocked = True
|
|
|
|
|
# Check text_should_not_be_present
|
|
|
|
|
if rule_engine.evaluate_text_should_not_be_present(stripped_text, filter_config.text_should_not_be_present):
|
|
|
|
|
blocked = True
|
|
|
|
|
|
|
|
|
|
# And check if 'conditions' will let this pass through
|
|
|
|
|
if watch.get('conditions') and watch.get('conditions_match_logic'):
|
|
|
|
|
conditions_result = execute_ruleset_against_all_plugins(current_watch_uuid=watch.get('uuid'),
|
|
|
|
|
application_datastruct=self.datastore.data,
|
|
|
|
|
ephemeral_data={
|
|
|
|
|
'text': stripped_text_from_html
|
|
|
|
|
}
|
|
|
|
|
)
|
|
|
|
|
# Check custom conditions
|
|
|
|
|
if rule_engine.evaluate_conditions(watch, self.datastore, stripped_text):
|
|
|
|
|
blocked = True
|
|
|
|
|
|
|
|
|
|
if not conditions_result.get('result'):
|
|
|
|
|
# Conditions say "Condition not met" so we block it.
|
|
|
|
|
blocked = True
|
|
|
|
|
|
|
|
|
|
# Looks like something changed, but did it match all the rules?
|
|
|
|
|
# === CHANGE DETECTION ===
|
|
|
|
|
if blocked:
|
|
|
|
|
changed_detected = False
|
|
|
|
|
else:
|
|
|
|
|
# The main thing that all this at the moment comes down to :)
|
|
|
|
|
# Compare checksums
|
|
|
|
|
if watch.get('previous_md5') != fetched_md5:
|
|
|
|
|
changed_detected = True
|
|
|
|
|
|
|
|
|
|
# Always record the new checksum
|
|
|
|
|
update_obj["previous_md5"] = fetched_md5
|
|
|
|
|
|
|
|
|
|
# On the first run of a site, watch['previous_md5'] will be None, set it the current one.
|
|
|
|
|
# On first run, initialize previous_md5
|
|
|
|
|
if not watch.get('previous_md5'):
|
|
|
|
|
watch['previous_md5'] = fetched_md5
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Watch UUID {watch.get('uuid')} content check - Previous MD5: {watch.get('previous_md5')}, Fetched MD5 {fetched_md5}")
|
|
|
|
|
|
|
|
|
|
if changed_detected:
|
|
|
|
|
if watch.get('check_unique_lines', False):
|
|
|
|
|
ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace')
|
|
|
|
|
# === UNIQUE LINES CHECK ===
|
|
|
|
|
if changed_detected and watch.get('check_unique_lines', False):
|
|
|
|
|
has_unique_lines = watch.lines_contain_something_unique_compared_to_history(
|
|
|
|
|
lines=stripped_text.splitlines(),
|
|
|
|
|
ignore_whitespace=ignore_whitespace
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
has_unique_lines = watch.lines_contain_something_unique_compared_to_history(
|
|
|
|
|
lines=stripped_text_from_html.splitlines(),
|
|
|
|
|
ignore_whitespace=ignore_whitespace
|
|
|
|
|
)
|
|
|
|
|
if not has_unique_lines:
|
|
|
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False")
|
|
|
|
|
changed_detected = False
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
|
|
|
|
|
|
|
|
|
# One or more lines? unsure?
|
|
|
|
|
if not has_unique_lines:
|
|
|
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False")
|
|
|
|
|
changed_detected = False
|
|
|
|
|
else:
|
|
|
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
|
|
|
|
return changed_detected, update_obj, stripped_text
|
|
|
|
|
|
|
|
|
|
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
|
|
|
|
|
"""Apply user's diff filtering preferences (show only added/removed/replaced lines)."""
|
|
|
|
|
from changedetectionio import diff
|
|
|
|
|
|
|
|
|
|
# stripped_text_from_html - Everything after filters and NO 'ignored' content
|
|
|
|
|
return changed_detected, update_obj, stripped_text_from_html
|
|
|
|
|
rendered_diff = diff.render_diff(
|
|
|
|
|
previous_version_file_contents=watch.get_last_fetched_text_before_filters(),
|
|
|
|
|
newest_version_file_contents=stripped_text,
|
|
|
|
|
include_equal=False,
|
|
|
|
|
include_added=watch.get('filter_text_added', True),
|
|
|
|
|
include_removed=watch.get('filter_text_removed', True),
|
|
|
|
|
include_replaced=watch.get('filter_text_replaced', True),
|
|
|
|
|
line_feed_sep="\n",
|
|
|
|
|
include_change_type_prefix=False
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
watch.save_last_text_fetched_before_filters(text_before_filter.encode('utf-8'))
|
|
|
|
|
|
|
|
|
|
if not rendered_diff and stripped_text:
|
|
|
|
|
# No differences found
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
return rendered_diff
|
|
|
|
|
|