Files
changedetection.io/changedetectionio/blueprint/ui/diff.py
2025-12-12 15:13:45 +01:00

265 lines
10 KiB
Python

from flask import Blueprint, request, redirect, url_for, flash, render_template, make_response, send_from_directory
import os
import time
import re
import importlib
from loguru import logger
from markupsafe import Markup
from changedetectionio.diff import (
REMOVED_STYLE, ADDED_STYLE, REMOVED_INNER_STYLE, ADDED_INNER_STYLE,
REMOVED_PLACEMARKER_OPEN, REMOVED_PLACEMARKER_CLOSED,
ADDED_PLACEMARKER_OPEN, ADDED_PLACEMARKER_CLOSED,
CHANGED_PLACEMARKER_OPEN, CHANGED_PLACEMARKER_CLOSED,
CHANGED_INTO_PLACEMARKER_OPEN, CHANGED_INTO_PLACEMARKER_CLOSED
)
from changedetectionio.store import ChangeDetectionStore
from changedetectionio.auth_decorator import login_optionally_required
def build_diff_cell_visualizer(content, resolution=100):
"""
Build a visual cell grid for the diff visualizer.
Analyzes the content for placemarkers indicating changes and creates a
grid of cells representing the document, with each cell marked as:
- 'deletion' for removed content
- 'insertion' for added content
- 'mixed' for cells containing both deletions and insertions
- empty string for cells with no changes
Args:
content: The diff content with placemarkers
resolution: Number of cells to create (default 100)
Returns:
List of dicts with 'class' key for each cell's CSS class
"""
if not content:
return [{'class': ''} for _ in range(resolution)]
now = time.time()
# Work with character positions for better accuracy
content_length = len(content)
if content_length == 0:
return [{'class': ''} for _ in range(resolution)]
chars_per_cell = max(1, content_length / resolution)
# Track change type for each cell
cell_data = {}
# Placemarkers to detect
change_markers = {
REMOVED_PLACEMARKER_OPEN: 'deletion',
ADDED_PLACEMARKER_OPEN: 'insertion',
CHANGED_PLACEMARKER_OPEN: 'deletion',
CHANGED_INTO_PLACEMARKER_OPEN: 'insertion',
}
# Find all occurrences of each marker
for marker, change_type in change_markers.items():
pos = 0
while True:
pos = content.find(marker, pos)
if pos == -1:
break
# Calculate which cell this marker falls into
cell_index = min(int(pos / chars_per_cell), resolution - 1)
if cell_index not in cell_data:
cell_data[cell_index] = change_type
elif cell_data[cell_index] != change_type:
# Mixed changes in this cell
cell_data[cell_index] = 'mixed'
pos += len(marker)
# Build the cell list
cells = []
for i in range(resolution):
change_type = cell_data.get(i, '')
cells.append({'class': change_type})
logger.debug(f"Built diff cell visualizer: {len([c for c in cells if c['class']])} cells with changes out of {resolution} in {time.time() - now:.2f}s")
return cells
def construct_blueprint(datastore: ChangeDetectionStore):
diff_blueprint = Blueprint('ui_diff', __name__, template_folder="../ui/templates")
@diff_blueprint.app_template_filter('diff_unescape_difference_spans')
def diff_unescape_difference_spans(content):
"""Emulate Jinja2's auto-escape, then selectively unescape our diff spans."""
from markupsafe import escape
if not content:
return Markup('')
# Step 1: Escape everything like Jinja2 would (this makes it XSS-safe)
escaped_content = escape(str(content))
# Step 2: Unescape only our exact diff spans generated by apply_html_color_to_body()
# Pattern matches the exact structure:
# <span style="{STYLE}" role="{ROLE}" aria-label="{LABEL}" title="{TITLE}">
# Unescape outer span opening tags with full attributes (role, aria-label, title)
# Matches removed/added/changed/changed_into spans
result = re.sub(
rf'&lt;span style=&#34;({re.escape(REMOVED_STYLE)}|{re.escape(ADDED_STYLE)})&#34; '
rf'role=&#34;(deletion|insertion|note)&#34; '
rf'aria-label=&#34;([^&]+?)&#34; '
rf'title=&#34;([^&]+?)&#34;&gt;',
r'<span style="\1" role="\2" aria-label="\3" title="\4">',
str(escaped_content),
flags=re.IGNORECASE
)
# Unescape inner span opening tags (without additional attributes)
# This matches the darker background styles for changed parts within lines
result = re.sub(
rf'&lt;span style=&#34;({re.escape(REMOVED_INNER_STYLE)}|{re.escape(ADDED_INNER_STYLE)})&#34;&gt;',
r'<span style="\1">',
result,
flags=re.IGNORECASE
)
# Unescape closing tags (but only as many as we opened)
open_count = result.count('<span style=')
close_count = str(escaped_content).count('&lt;/span&gt;')
# Replace up to the number of spans we opened
for _ in range(min(open_count, close_count)):
result = result.replace('&lt;/span&gt;', '</span>', 1)
return Markup(result)
@diff_blueprint.route("/diff/<string:uuid>", methods=['POST'])
@login_optionally_required
def diff_history_page_build_report(uuid):
from changedetectionio import forms
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
flash("No history found for the specified link, bad link?", "error")
return redirect(url_for('watchlist.index'))
# For submission of requesting an extract
extract_form = forms.extractDataForm(formdata=request.form,
data={'extract_regex': request.form.get('extract_regex', '')}
)
if not extract_form.validate():
flash("An error occurred, please see below.", "error")
# Use processor-specific render with the error form
processor_name = watch.get('processor', 'text_json_diff')
try:
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
if hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect,
extract_form=extract_form
)
except (ImportError, ModuleNotFoundError):
pass
# Fallback to text_json_diff
from changedetectionio.processors.text_json_diff.difference import render as default_render
return default_render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect,
extract_form=extract_form
)
else:
extract_regex = request.form.get('extract_regex', '').strip()
output = watch.extract_regex_from_all_history(extract_regex)
if output:
watch_dir = os.path.join(datastore.datastore_path, uuid)
response = make_response(send_from_directory(directory=watch_dir, path=output, as_attachment=True))
response.headers['Content-type'] = 'text/csv'
response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate'
response.headers['Pragma'] = 'no-cache'
response.headers['Expires'] = "0"
return response
flash('No matches found while scanning all of the watch history for that RegEx.', 'error')
return redirect(url_for('ui.ui_diff.diff_history_page', uuid=uuid) + '#extract')
@diff_blueprint.route("/diff/<string:uuid>", methods=['GET'])
@login_optionally_required
def diff_history_page(uuid):
"""
Render the history/diff page for a watch.
This route is processor-aware: it delegates rendering to the processor's
difference.py module, allowing different processor types to provide
custom visualizations:
- text_json_diff: Text/HTML diff with syntax highlighting
- restock_diff: Could show price charts and stock history
- image_diff: Could show image comparison slider/overlay
Each processor implements processors/{type}/difference.py::render()
If a processor doesn't have a difference module, falls back to text_json_diff.
"""
# More for testing, possible to return the first/only
if uuid == 'first':
uuid = list(datastore.data['watching'].keys()).pop()
try:
watch = datastore.data['watching'][uuid]
except KeyError:
flash("No history found for the specified link, bad link?", "error")
return redirect(url_for('watchlist.index'))
# Get the processor type for this watch
processor_name = watch.get('processor', 'text_json_diff')
try:
# Try to import the processor's difference module
processor_module = importlib.import_module(f'changedetectionio.processors.{processor_name}.difference')
# Call the processor's render() function
if hasattr(processor_module, 'render'):
return processor_module.render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
except (ImportError, ModuleNotFoundError) as e:
logger.warning(f"Processor {processor_name} does not have a difference module, falling back to text_json_diff: {e}")
# Fallback: if processor doesn't have difference module, use text_json_diff as default
from changedetectionio.processors.text_json_diff.difference import render as default_render
return default_render(
watch=watch,
datastore=datastore,
request=request,
url_for=url_for,
render_template=render_template,
flash=flash,
redirect=redirect
)
return diff_blueprint