# HTML to TEXT/JSON DIFFERENCE self.fetcher import hashlib import json import os import re import urllib3 from changedetectionio.conditions import execute_ruleset_against_all_plugins from changedetectionio.processors import difference_detection_processor from changedetectionio.html_tools import PERL_STYLE_REGEX, cdata_in_document_to_text, TRANSLATE_WHITESPACE_TABLE from changedetectionio import html_tools, content_fetchers from changedetectionio.blueprint.price_data_follower import PRICE_DATA_TRACK_ACCEPT, PRICE_DATA_TRACK_REJECT from loguru import logger from changedetectionio.processors.magic import guess_stream_type urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) name = 'Webpage Text/HTML, JSON and PDF changes' description = 'Detects all text changes where possible' JSON_FILTER_PREFIXES = ['json:', 'jq:', 'jqraw:'] # Assume it's this type if the server says nothing on content-type DEFAULT_WHEN_NO_CONTENT_TYPE_HEADER = 'text/html' class FilterNotFoundInResponse(ValueError): def __init__(self, msg, screenshot=None, xpath_data=None): self.screenshot = screenshot self.xpath_data = xpath_data ValueError.__init__(self, msg) class PDFToHTMLToolNotFound(ValueError): def __init__(self, msg): ValueError.__init__(self, msg) class FilterConfig: """Consolidates all filter and rule configurations from watch, tags, and global settings.""" def __init__(self, watch, datastore): self.watch = watch self.datastore = datastore self.watch_uuid = watch.get('uuid') # Cache computed properties to avoid repeated list operations self._include_filters_cache = None self._subtractive_selectors_cache = None def _get_merged_rules(self, attr, include_global=False): """Merge rules from watch, tags, and optionally global settings.""" watch_rules = self.watch.get(attr, []) tag_rules = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr=attr) rules = list(dict.fromkeys(watch_rules + tag_rules)) if include_global: global_rules = self.datastore.data['settings']['application'].get(f'global_{attr}', []) rules = list(dict.fromkeys(rules + global_rules)) return rules @property def include_filters(self): if self._include_filters_cache is None: filters = self._get_merged_rules('include_filters') # Inject LD+JSON price tracker rule if enabled if self.watch.get('track_ldjson_price_data', '') == PRICE_DATA_TRACK_ACCEPT: filters += html_tools.LD_JSON_PRODUCT_OFFER_SELECTORS self._include_filters_cache = filters return self._include_filters_cache @property def subtractive_selectors(self): if self._subtractive_selectors_cache is None: watch_selectors = self.watch.get("subtractive_selectors", []) tag_selectors = self.datastore.get_tag_overrides_for_watch(uuid=self.watch_uuid, attr='subtractive_selectors') global_selectors = self.datastore.data["settings"]["application"].get("global_subtractive_selectors", []) self._subtractive_selectors_cache = [*tag_selectors, *watch_selectors, *global_selectors] return self._subtractive_selectors_cache @property def extract_text(self): return self._get_merged_rules('extract_text') @property def ignore_text(self): return self._get_merged_rules('ignore_text', include_global=True) @property def trigger_text(self): return self._get_merged_rules('trigger_text') @property def text_should_not_be_present(self): return self._get_merged_rules('text_should_not_be_present') @property def has_include_filters(self): return bool(self.include_filters) and bool(self.include_filters[0].strip()) @property def has_include_json_filters(self): return any(f.strip().startswith(prefix) for f in self.include_filters for prefix in JSON_FILTER_PREFIXES) @property def has_subtractive_selectors(self): return bool(self.subtractive_selectors) and bool(self.subtractive_selectors[0].strip()) class ContentTransformer: """Handles text transformations like trimming, sorting, and deduplication.""" @staticmethod def trim_whitespace(text): """Remove leading/trailing whitespace from each line.""" # Use generator expression to avoid building intermediate list return '\n'.join(line.strip() for line in text.replace("\n\n", "\n").splitlines()) @staticmethod def remove_duplicate_lines(text): """Remove duplicate lines while preserving order.""" return '\n'.join(dict.fromkeys(line for line in text.replace("\n\n", "\n").splitlines())) @staticmethod def sort_alphabetically(text): """Sort lines alphabetically (case-insensitive).""" # Remove double line feeds before sorting text = text.replace("\n\n", "\n") return '\n'.join(sorted(text.splitlines(), key=lambda x: x.lower())) @staticmethod def extract_by_regex(text, regex_patterns): """Extract text matching regex patterns.""" # Use list of strings instead of concatenating lists repeatedly (avoids O(n²) behavior) regex_matched_output = [] for s_re in regex_patterns: # Check if it's perl-style regex /.../ if re.search(PERL_STYLE_REGEX, s_re, re.IGNORECASE): regex = html_tools.perl_style_slash_enclosed_regex_to_options(s_re) result = re.findall(regex, text) for match in result: if type(match) is tuple: regex_matched_output.extend(match) regex_matched_output.append('\n') else: regex_matched_output.append(match) regex_matched_output.append('\n') else: # Plain text search (case-insensitive) r = re.compile(re.escape(s_re), re.IGNORECASE) res = r.findall(text) if res: for match in res: regex_matched_output.append(match) regex_matched_output.append('\n') return ''.join(regex_matched_output) if regex_matched_output else '' class RuleEngine: """Evaluates blocking rules (triggers, conditions, text_should_not_be_present).""" @staticmethod def evaluate_trigger_text(content, trigger_patterns): """ Check if trigger text is present. If trigger_text is configured, content is blocked UNLESS the trigger is found. Returns True if blocked, False if allowed. """ if not trigger_patterns: return False # Assume blocked if trigger_text is configured result = html_tools.strip_ignore_text( content=str(content), wordlist=trigger_patterns, mode="line numbers" ) # Unblock if trigger was found return not bool(result) @staticmethod def evaluate_text_should_not_be_present(content, patterns): """ Check if forbidden text is present. If found, block the change. Returns True if blocked, False if allowed. """ if not patterns: return False result = html_tools.strip_ignore_text( content=str(content), wordlist=patterns, mode="line numbers" ) # Block if forbidden text was found return bool(result) @staticmethod def evaluate_conditions(watch, datastore, content): """ Evaluate custom conditions ruleset. Returns True if blocked, False if allowed. """ if not watch.get('conditions') or not watch.get('conditions_match_logic'): return False conditions_result = execute_ruleset_against_all_plugins( current_watch_uuid=watch.get('uuid'), application_datastruct=datastore.data, ephemeral_data={'text': content} ) # Block if conditions not met return not conditions_result.get('result') class ContentProcessor: """Handles content preprocessing, filtering, and extraction.""" def __init__(self, fetcher, watch, filter_config, datastore): self.fetcher = fetcher self.watch = watch self.filter_config = filter_config self.datastore = datastore def preprocess_rss(self, content): """ Convert CDATA/comments in RSS to usable text. Supports two RSS processing modes: - 'default': Inline CDATA replacement (original behavior) - 'formatted': Format RSS items with title, link, guid, pubDate, and description (CDATA unmarked) """ from changedetectionio import rss_tools rss_mode = self.datastore.data["settings"]["application"].get("rss_reader_mode") if rss_mode: # Format RSS items nicely with CDATA content unmarked and converted to text return rss_tools.format_rss_items(content) else: # Default: Original inline CDATA replacement return cdata_in_document_to_text(html_content=content) def preprocess_pdf(self, raw_content): """Convert PDF to HTML using external tool.""" from shutil import which tool = os.getenv("PDF_TO_HTML_TOOL", "pdftohtml") if not which(tool): raise PDFToHTMLToolNotFound( f"Command-line `{tool}` tool was not found in system PATH, was it installed?" ) import subprocess proc = subprocess.Popen( [tool, '-stdout', '-', '-s', 'out.pdf', '-i'], stdout=subprocess.PIPE, stdin=subprocess.PIPE ) proc.stdin.write(raw_content) proc.stdin.close() html_content = proc.stdout.read().decode('utf-8') proc.wait(timeout=60) # Add metadata for change detection metadata = ( f"
Added by changedetection.io: Document checksum - " f"{hashlib.md5(raw_content).hexdigest().upper()} " f"Original file size - {len(raw_content)} bytes
" ) return html_content.replace('