mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-11-05 09:04:55 +00:00
598 lines
24 KiB
Python
598 lines
24 KiB
Python
# HTML to TEXT/JSON DIFFERENCE self.fetcher
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import re
|
|
import urllib3
|
|
|
|
from changedetectionio.conditions import execute_ruleset_against_all_plugins
|
|
from changedetectionio.diff import ADDED_PLACEMARKER_OPEN
|
|
from changedetectionio.processors import difference_detection_processor
|
|
from changedetectionio.html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text, TRANSLATE_WHITESPACE_TABLE
|
|
from changedetectionio import html_tools, content_fetchers
|
|
from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT
|
|
from loguru import logger
|
|
|
|
from changedetectionio.processors.magic import guess_stream_type
|
|
|
|
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
|
|
|
|
name = 'Webpage Text/HTML, JSON and PDF changes'
|
|
description = 'Detects all text changes where possible'
|
|
|
|
JSON_FILTER_PREFIXES = ['json:', 'jq:', 'jqraw:']
|
|
|
|
# Assume it's this type if the server says nothing on content-type
|
|
DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html'
|
|
|
|
class FilterNotFoundInResponse(ValueError):
|
|
def __init__(self, msg, screenshot=None, xpath_data=None):
|
|
self.screenshot = screenshot
|
|
self.xpath_data = xpath_data
|
|
ValueError.__init__(self, msg)
|
|
|
|
|
|
class PDFToHTMLToolNotFound(ValueError):
|
|
def __init__(self, msg):
|
|
ValueError.__init__(self, msg)
|
|
|
|
|
|
class FilterConfig:
|
|
"""Consolidates all filter and rule configurations from watch, tags, and global settings."""
|
|
|
|
def __init__(self, watch, datastore):
|
|
self.watch = watch
|
|
self.datastore = datastore
|
|
self.watch_uuid = watch.get('uuid')
|
|
# Cache computed properties to avoid repeated list operations
|
|
self._include_filters_cache = None
|
|
self._subtractive_selectors_cache = None
|
|
|
|
def _get_merged_rules(self, attr, include_global=False):
|
|
"""Merge rules from watch, tags, and optionally global settings."""
|
|
watch_rules = self.watch.get(attr, [])
|
|
tag_rules = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr=attr)
|
|
rules = list(dict.fromkeys(watch_rules + tag_rules))
|
|
|
|
if include_global:
|
|
global_rules = self.datastore.data['settings']['application'].get(f'global_{attr}', [])
|
|
rules = list(dict.fromkeys(rules + global_rules))
|
|
|
|
return rules
|
|
|
|
@property
|
|
def include_filters(self):
|
|
if self._include_filters_cache is None:
|
|
filters = self._get_merged_rules('include_filters')
|
|
# Inject LD+JSON price tracker rule if enabled
|
|
if self.watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT:
|
|
filters += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS
|
|
self._include_filters_cache = filters
|
|
return self._include_filters_cache
|
|
|
|
@property
|
|
def subtractive_selectors(self):
|
|
if self._subtractive_selectors_cache is None:
|
|
watch_selectors = self.watch.get("subtractive_selectors", [])
|
|
tag_selectors = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr='subtractive_selectors')
|
|
global_selectors = self.datastore.data["settings"]["application"].get("global_subtractive_selectors", [])
|
|
self._subtractive_selectors_cache = [*tag_selectors, *watch_selectors, *global_selectors]
|
|
return self._subtractive_selectors_cache
|
|
|
|
@property
|
|
def extract_text(self):
|
|
return self._get_merged_rules('extract_text')
|
|
|
|
@property
|
|
def ignore_text(self):
|
|
return self._get_merged_rules('ignore_text', include_global=True)
|
|
|
|
@property
|
|
def trigger_text(self):
|
|
return self._get_merged_rules('trigger_text')
|
|
|
|
@property
|
|
def text_should_not_be_present(self):
|
|
return self._get_merged_rules('text_should_not_be_present')
|
|
|
|
@property
|
|
def has_include_filters(self):
|
|
return bool(self.include_filters) and bool(self.include_filters[0].strip())
|
|
|
|
@property
|
|
def has_include_json_filters(self):
|
|
return any(f.strip().startswith(prefix) for f in self.include_filters for prefix in JSON_FILTER_PREFIXES)
|
|
|
|
@property
|
|
def has_subtractive_selectors(self):
|
|
return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip())
|
|
|
|
|
|
class ContentTransformer:
|
|
"""Handles text transformations like trimming, sorting, and deduplication."""
|
|
|
|
@staticmethod
|
|
def trim_whitespace(text):
|
|
"""Remove leading/trailing whitespace from each line."""
|
|
# Use generator expression to avoid building intermediate list
|
|
return '\n'.join(line.strip() for line in text.replace("\n\n", "\n").splitlines())
|
|
|
|
@staticmethod
|
|
def remove_duplicate_lines(text):
|
|
"""Remove duplicate lines while preserving order."""
|
|
return '\n'.join(dict.fromkeys(line for line in text.replace("\n\n", "\n").splitlines()))
|
|
|
|
@staticmethod
|
|
def sort_alphabetically(text):
|
|
"""Sort lines alphabetically (case-insensitive)."""
|
|
# Remove double line feeds before sorting
|
|
text = text.replace("\n\n", "\n")
|
|
return '\n'.join(sorted(text.splitlines(), key=lambda x: x.lower()))
|
|
|
|
@staticmethod
|
|
def extract_by_regex(text, regex_patterns):
|
|
"""Extract text matching regex patterns."""
|
|
# Use list of strings instead of concatenating lists repeatedly (avoids O(n²) behavior)
|
|
regex_matched_output = []
|
|
|
|
for s_re in regex_patterns:
|
|
# Check if it's perl-style regex /.../
|
|
if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE):
|
|
regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re)
|
|
result = re.findall(regex, text)
|
|
|
|
for match in result:
|
|
if type(match) is tuple:
|
|
regex_matched_output.extend(match)
|
|
regex_matched_output.append('\n')
|
|
else:
|
|
regex_matched_output.append(match)
|
|
regex_matched_output.append('\n')
|
|
else:
|
|
# Plain text search (case-insensitive)
|
|
r = re.compile(re.escape(s_re), re.IGNORECASE)
|
|
res = r.findall(text)
|
|
if res:
|
|
for match in res:
|
|
regex_matched_output.append(match)
|
|
regex_matched_output.append('\n')
|
|
|
|
return ''.join(regex_matched_output) if regex_matched_output else ''
|
|
|
|
|
|
class RuleEngine:
|
|
"""Evaluates blocking rules (triggers, conditions, text_should_not_be_present)."""
|
|
|
|
@staticmethod
|
|
def evaluate_trigger_text(content, trigger_patterns):
|
|
"""
|
|
Check if trigger text is present. If trigger_text is configured,
|
|
content is blocked UNLESS the trigger is found.
|
|
Returns True if blocked, False if allowed.
|
|
"""
|
|
if not trigger_patterns:
|
|
return False
|
|
|
|
# Assume blocked if trigger_text is configured
|
|
result = html_tools.strip_ignore_text(
|
|
content=str(content),
|
|
wordlist=trigger_patterns,
|
|
mode="line numbers"
|
|
)
|
|
# Unblock if trigger was found
|
|
return not bool(result)
|
|
|
|
@staticmethod
|
|
def evaluate_text_should_not_be_present(content, patterns):
|
|
"""
|
|
Check if forbidden text is present. If found, block the change.
|
|
Returns True if blocked, False if allowed.
|
|
"""
|
|
if not patterns:
|
|
return False
|
|
|
|
result = html_tools.strip_ignore_text(
|
|
content=str(content),
|
|
wordlist=patterns,
|
|
mode="line numbers"
|
|
)
|
|
# Block if forbidden text was found
|
|
return bool(result)
|
|
|
|
@staticmethod
|
|
def evaluate_conditions(watch, datastore, content):
|
|
"""
|
|
Evaluate custom conditions ruleset.
|
|
Returns True if blocked, False if allowed.
|
|
"""
|
|
if not watch.get('conditions') or not watch.get('conditions_match_logic'):
|
|
return False
|
|
|
|
conditions_result = execute_ruleset_against_all_plugins(
|
|
current_watch_uuid=watch.get('uuid'),
|
|
application_datastruct=datastore.data,
|
|
ephemeral_data={'text': content}
|
|
)
|
|
|
|
# Block if conditions not met
|
|
return not conditions_result.get('result')
|
|
|
|
|
|
class ContentProcessor:
|
|
"""Handles content preprocessing, filtering, and extraction."""
|
|
|
|
def __init__(self, fetcher, watch, filter_config, datastore):
|
|
self.fetcher = fetcher
|
|
self.watch = watch
|
|
self.filter_config = filter_config
|
|
self.datastore = datastore
|
|
|
|
def preprocess_rss(self, content):
|
|
"""
|
|
Convert CDATA/comments in RSS to usable text.
|
|
|
|
Supports two RSS processing modes:
|
|
- 'default': Inline CDATA replacement (original behavior)
|
|
- 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked)
|
|
"""
|
|
from changedetectionio import rss_tools
|
|
rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode")
|
|
if rss_mode:
|
|
# Format RSS items nicely with CDATA content unmarked and converted to text
|
|
return rss_tools.format_rss_items(content)
|
|
else:
|
|
# Default: Original inline CDATA replacement
|
|
return cdata_in_document_to_text(html_content=content)
|
|
|
|
def preprocess_pdf(self, raw_content):
|
|
"""Convert PDF to HTML using external tool."""
|
|
from shutil import which
|
|
tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml")
|
|
if not which(tool):
|
|
raise PDFToHTMLToolNotFound(
|
|
f"Command-line `{tool}` tool was not found in system PATH, was it installed?"
|
|
)
|
|
|
|
import subprocess
|
|
proc = subprocess.Popen(
|
|
[tool, '-stdout', '-', '-s', 'out.pdf', '-i'],
|
|
stdout=subprocess.PIPE,
|
|
stdin=subprocess.PIPE
|
|
)
|
|
proc.stdin.write(raw_content)
|
|
proc.stdin.close()
|
|
html_content = proc.stdout.read().decode('utf-8')
|
|
proc.wait(timeout=60)
|
|
|
|
# Add metadata for change detection
|
|
metadata = (
|
|
f"<p>Added by changedetection.io: Document checksum - "
|
|
f"{hashlib.md5(raw_content).hexdigest().upper()} "
|
|
f"Original file size - {len(raw_content)} bytes</p>"
|
|
)
|
|
return html_content.replace('</body>', metadata + '</body>')
|
|
|
|
def preprocess_json(self, raw_content):
|
|
"""Format and sort JSON content."""
|
|
# Then we re-format it, else it does have filters (later on) which will reformat it anyway
|
|
content = html_tools.extract_json_as_string(content=raw_content, json_filter="json:$")
|
|
|
|
# Sort JSON to avoid false alerts from reordering
|
|
try:
|
|
content = json.dumps(json.loads(content), sort_keys=True, indent=4)
|
|
except Exception:
|
|
# Might be malformed JSON, continue anyway
|
|
pass
|
|
|
|
return content
|
|
|
|
def apply_include_filters(self, content, stream_content_type):
|
|
"""Apply CSS, XPath, or JSON filters to extract specific content."""
|
|
filtered_content = ""
|
|
|
|
for filter_rule in self.filter_config.include_filters:
|
|
# XPath filters
|
|
if filter_rule[0] == '/' or filter_rule.startswith('xpath:'):
|
|
filtered_content += html_tools.xpath_filter(
|
|
xpath_filter=filter_rule.replace('xpath:', ''),
|
|
html_content=content,
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url,
|
|
is_rss=stream_content_type.is_rss
|
|
)
|
|
|
|
# XPath1 filters (first match only)
|
|
elif filter_rule.startswith('xpath1:'):
|
|
filtered_content += html_tools.xpath1_filter(
|
|
xpath_filter=filter_rule.replace('xpath1:', ''),
|
|
html_content=content,
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url,
|
|
is_rss=stream_content_type.is_rss
|
|
)
|
|
|
|
# JSON filters
|
|
elif any(filter_rule.startswith(prefix) for prefix in JSON_FILTER_PREFIXES):
|
|
filtered_content += html_tools.extract_json_as_string(
|
|
content=content,
|
|
json_filter=filter_rule
|
|
)
|
|
|
|
# CSS selectors, default fallback
|
|
else:
|
|
filtered_content += html_tools.include_filters(
|
|
include_filters=filter_rule,
|
|
html_content=content,
|
|
append_pretty_line_formatting=not self.watch.is_source_type_url
|
|
)
|
|
|
|
# Raise error if filter returned nothing
|
|
if not filtered_content.strip():
|
|
raise FilterNotFoundInResponse(
|
|
msg=self.filter_config.include_filters,
|
|
screenshot=self.fetcher.screenshot,
|
|
xpath_data=self.fetcher.xpath_data
|
|
)
|
|
|
|
return filtered_content
|
|
|
|
def apply_subtractive_selectors(self, content):
|
|
"""Remove elements matching subtractive selectors."""
|
|
return html_tools.element_removal(self.filter_config.subtractive_selectors, content)
|
|
|
|
def extract_text_from_html(self, html_content, stream_content_type):
|
|
"""Convert HTML to plain text."""
|
|
do_anchor = self.datastore.data["settings"]["application"].get("render_anchor_tag_content", False)
|
|
return html_tools.html_to_text(
|
|
html_content=html_content,
|
|
render_anchor_tag_content=do_anchor,
|
|
is_rss=stream_content_type.is_rss
|
|
)
|
|
|
|
|
|
class ChecksumCalculator:
|
|
"""Calculates checksums with various options."""
|
|
|
|
@staticmethod
|
|
def calculate(text, ignore_whitespace=False):
|
|
"""Calculate MD5 checksum of text content."""
|
|
if ignore_whitespace:
|
|
text = text.translate(TRANSLATE_WHITESPACE_TABLE)
|
|
return hashlib.md5(text.encode('utf-8')).hexdigest()
|
|
|
|
|
|
# Some common stuff here that can be moved to a base class
|
|
# (set_proxy_from_list)
|
|
class perform_site_check(difference_detection_processor):
|
|
|
|
def run_changedetection(self, watch):
|
|
changed_detected = False
|
|
|
|
if not watch:
|
|
raise Exception("Watch no longer exists.")
|
|
|
|
# Initialize components
|
|
filter_config = FilterConfig(watch, self.datastore)
|
|
content_processor = ContentProcessor(self.fetcher, watch, filter_config, self.datastore)
|
|
transformer = ContentTransformer()
|
|
rule_engine = RuleEngine()
|
|
|
|
# Get content type and stream info
|
|
ctype_header = self.fetcher.get_all_headers().get('content-type', DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER).lower()
|
|
stream_content_type = guess_stream_type(http_content_header=ctype_header, content=self.fetcher.content)
|
|
|
|
# Unset any existing notification error
|
|
update_obj = {'last_notification_error': False, 'last_error': False}
|
|
url = watch.link
|
|
|
|
self.screenshot = self.fetcher.screenshot
|
|
self.xpath_data = self.fetcher.xpath_data
|
|
|
|
# Track the content type and checksum before filters
|
|
update_obj['content_type'] = ctype_header
|
|
update_obj['previous_md5_before_filters'] = hashlib.md5(self.fetcher.content.encode('utf-8')).hexdigest()
|
|
|
|
# === CONTENT PREPROCESSING ===
|
|
# Avoid creating unnecessary intermediate string copies by reassigning only when needed
|
|
content = self.fetcher.content
|
|
|
|
# RSS preprocessing
|
|
if stream_content_type.is_rss:
|
|
content = content_processor.preprocess_rss(content)
|
|
if self.datastore.data["settings"]["application"].get("rss_reader_mode"):
|
|
# Now just becomes regular HTML that can have xpath/CSS applied (first of the set etc)
|
|
stream_content_type.is_rss = False
|
|
stream_content_type.is_html = True
|
|
self.fetcher.content = content
|
|
|
|
# PDF preprocessing
|
|
if watch.is_pdf or stream_content_type.is_pdf:
|
|
content = content_processor.preprocess_pdf(raw_content=self.fetcher.raw_content)
|
|
stream_content_type.is_html = True
|
|
|
|
# JSON - Always reformat it nicely for consistency.
|
|
|
|
if stream_content_type.is_json:
|
|
if not filter_config.has_include_json_filters:
|
|
content = content_processor.preprocess_json(raw_content=content)
|
|
#else, otherwise it gets sorted/formatted in the filter stage anyway
|
|
|
|
# HTML obfuscation workarounds
|
|
if stream_content_type.is_html:
|
|
content = html_tools.workarounds_for_obfuscations(content)
|
|
|
|
# Check for LD+JSON price data (for HTML content)
|
|
if stream_content_type.is_html:
|
|
update_obj['has_ldjson_price_data'] = html_tools.has_ldjson_product_info(content)
|
|
|
|
# === FILTER APPLICATION ===
|
|
# Start with content reference, avoid copy until modification
|
|
html_content = content
|
|
|
|
# Apply include filters (CSS, XPath, JSON)
|
|
# Except for plaintext (incase they tried to confuse the system, it will HTML escape
|
|
#if not stream_content_type.is_plaintext:
|
|
if filter_config.has_include_filters:
|
|
html_content = content_processor.apply_include_filters(content, stream_content_type)
|
|
|
|
# Apply subtractive selectors
|
|
if filter_config.has_subtractive_selectors:
|
|
html_content = content_processor.apply_subtractive_selectors(html_content)
|
|
|
|
# === TEXT EXTRACTION ===
|
|
if watch.is_source_type_url:
|
|
# For source URLs, keep raw content
|
|
stripped_text = html_content
|
|
elif stream_content_type.is_plaintext:
|
|
# For plaintext, keep as-is without HTML-to-text conversion
|
|
stripped_text = html_content
|
|
else:
|
|
# Extract text from HTML/RSS content (not generic XML)
|
|
if stream_content_type.is_html or stream_content_type.is_rss:
|
|
stripped_text = content_processor.extract_text_from_html(html_content, stream_content_type)
|
|
else:
|
|
stripped_text = html_content
|
|
|
|
# === TEXT TRANSFORMATIONS ===
|
|
if watch.get('trim_text_whitespace'):
|
|
stripped_text = transformer.trim_whitespace(stripped_text)
|
|
|
|
# Save text before ignore filters (for diff calculation)
|
|
text_content_before_ignored_filter = stripped_text
|
|
|
|
# === DIFF FILTERING ===
|
|
# If user wants specific diff types (added/removed/replaced only)
|
|
if watch.has_special_diff_filter_options_set() and len(watch.history.keys()):
|
|
stripped_text = self._apply_diff_filtering(watch, stripped_text, text_content_before_ignored_filter)
|
|
if stripped_text is None:
|
|
# No differences found, but content exists
|
|
c = ChecksumCalculator.calculate(text_content_before_ignored_filter, ignore_whitespace=True)
|
|
return False, {'previous_md5': c}, text_content_before_ignored_filter.encode('utf-8')
|
|
|
|
# === EMPTY PAGE CHECK ===
|
|
empty_pages_are_a_change = self.datastore.data['settings']['application'].get('empty_pages_are_a_change', False)
|
|
if not stream_content_type.is_json and not empty_pages_are_a_change and len(stripped_text.strip()) == 0:
|
|
raise content_fetchers.exceptions.ReplyWithContentButNoText(
|
|
url=url,
|
|
status_code=self.fetcher.get_last_status_code(),
|
|
screenshot=self.fetcher.screenshot,
|
|
has_filters=filter_config.has_include_filters,
|
|
html_content=html_content,
|
|
xpath_data=self.fetcher.xpath_data
|
|
)
|
|
|
|
update_obj["last_check_status"] = self.fetcher.get_last_status_code()
|
|
|
|
# === REGEX EXTRACTION ===
|
|
if filter_config.extract_text:
|
|
extracted = transformer.extract_by_regex(stripped_text, filter_config.extract_text)
|
|
stripped_text = extracted
|
|
|
|
# === MORE TEXT TRANSFORMATIONS ===
|
|
if watch.get('remove_duplicate_lines'):
|
|
stripped_text = transformer.remove_duplicate_lines(stripped_text)
|
|
|
|
if watch.get('sort_text_alphabetically'):
|
|
stripped_text = transformer.sort_alphabetically(stripped_text)
|
|
|
|
# === CHECKSUM CALCULATION ===
|
|
text_for_checksuming = stripped_text
|
|
|
|
# Apply ignore_text for checksum calculation
|
|
if filter_config.ignore_text:
|
|
text_for_checksuming = html_tools.strip_ignore_text(stripped_text, filter_config.ignore_text)
|
|
|
|
# Optionally remove ignored lines from output
|
|
strip_ignored_lines = watch.get('strip_ignored_lines')
|
|
if strip_ignored_lines is None:
|
|
strip_ignored_lines = self.datastore.data['settings']['application'].get('strip_ignored_lines')
|
|
if strip_ignored_lines:
|
|
stripped_text = text_for_checksuming
|
|
|
|
# Calculate checksum
|
|
ignore_whitespace = self.datastore.data['settings']['application'].get('ignore_whitespace', False)
|
|
fetched_md5 = ChecksumCalculator.calculate(text_for_checksuming, ignore_whitespace=ignore_whitespace)
|
|
|
|
# === BLOCKING RULES EVALUATION ===
|
|
blocked = False
|
|
|
|
# Check trigger_text
|
|
if rule_engine.evaluate_trigger_text(stripped_text, filter_config.trigger_text):
|
|
blocked = True
|
|
|
|
# Check text_should_not_be_present
|
|
if rule_engine.evaluate_text_should_not_be_present(stripped_text, filter_config.text_should_not_be_present):
|
|
blocked = True
|
|
|
|
# Check custom conditions
|
|
if rule_engine.evaluate_conditions(watch, self.datastore, stripped_text):
|
|
blocked = True
|
|
|
|
# === CHANGE DETECTION ===
|
|
if blocked:
|
|
changed_detected = False
|
|
else:
|
|
# Compare checksums
|
|
if watch.get('previous_md5') != fetched_md5:
|
|
changed_detected = True
|
|
|
|
# Always record the new checksum
|
|
update_obj["previous_md5"] = fetched_md5
|
|
|
|
# On first run, initialize previous_md5
|
|
if not watch.get('previous_md5'):
|
|
watch['previous_md5'] = fetched_md5
|
|
|
|
logger.debug(f"Watch UUID {watch.get('uuid')} content check - Previous MD5: {watch.get('previous_md5')}, Fetched MD5 {fetched_md5}")
|
|
|
|
# === UNIQUE LINES CHECK ===
|
|
if changed_detected and watch.get('check_unique_lines', False):
|
|
has_unique_lines = watch.lines_contain_something_unique_compared_to_history(
|
|
lines=stripped_text.splitlines(),
|
|
ignore_whitespace=ignore_whitespace
|
|
)
|
|
|
|
if not has_unique_lines:
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} didnt have anything new setting change_detected=False")
|
|
changed_detected = False
|
|
else:
|
|
logger.debug(f"check_unique_lines: UUID {watch.get('uuid')} had unique content")
|
|
|
|
# Note: Explicit cleanup is only needed here because text_json_diff handles
|
|
# large strings (100KB-300KB for RSS/HTML). The other processors work with
|
|
# small strings and don't need this.
|
|
#
|
|
# Python would clean these up automatically, but explicit `del` frees memory
|
|
# immediately rather than waiting for function return, reducing peak memory usage.
|
|
del content
|
|
if 'html_content' in locals() and html_content is not stripped_text:
|
|
del html_content
|
|
if 'text_content_before_ignored_filter' in locals() and text_content_before_ignored_filter is not stripped_text:
|
|
del text_content_before_ignored_filter
|
|
if 'text_for_checksuming' in locals() and text_for_checksuming is not stripped_text:
|
|
del text_for_checksuming
|
|
|
|
return changed_detected, update_obj, stripped_text
|
|
|
|
def _apply_diff_filtering(self, watch, stripped_text, text_before_filter):
|
|
"""Apply user's diff filtering preferences (show only added/removed/replaced lines)."""
|
|
from changedetectionio import diff
|
|
|
|
rendered_diff = diff.render_diff(
|
|
previous_version_file_contents=watch.get_last_fetched_text_before_filters(),
|
|
newest_version_file_contents=stripped_text,
|
|
include_equal=False,
|
|
include_added=watch.get('filter_text_added', True),
|
|
include_removed=watch.get('filter_text_removed', True),
|
|
include_replaced=watch.get('filter_text_replaced', True),
|
|
line_feed_sep="\n",
|
|
include_change_type_prefix=False
|
|
)
|
|
|
|
watch.save_last_text_fetched_before_filters(text_before_filter.encode('utf-8'))
|
|
|
|
if not rendered_diff and stripped_text:
|
|
# No differences found
|
|
return None
|
|
|
|
return rendered_diff
|