mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2025-12-17 13:35:50 +00:00
Some checks failed
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (alpine) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/amd64 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v7 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm/v8 (main) (push) Has been cancelled
ChangeDetection.io Container Build Test / Build linux/arm64 (main) (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
420 lines
14 KiB
Python
420 lines
14 KiB
Python
"""
|
|
Screenshot diff visualization for fast image comparison processor.
|
|
|
|
Generates side-by-side comparison with red-highlighted differences using
|
|
OpenCV or pixelmatch for fast rendering (10-100x faster than SSIM).
|
|
"""
|
|
|
|
import os
|
|
import json
|
|
import base64
|
|
import io
|
|
import time
|
|
from loguru import logger
|
|
from PIL import Image
|
|
import numpy as np
|
|
import cv2
|
|
from . import DEFAULT_COMPARISON_METHOD, DEFAULT_COMPARISON_THRESHOLD_OPENCV, DEFAULT_COMPARISON_THRESHOLD_PIXELMATCH
|
|
|
|
# Maximum dimensions for diff visualization (can be overridden via environment variable)
|
|
# Large screenshots don't need full resolution for visual inspection
|
|
MAX_DIFF_HEIGHT = int(os.getenv('MAX_DIFF_HEIGHT', '8000'))
|
|
MAX_DIFF_WIDTH = int(os.getenv('MAX_DIFF_WIDTH', '900'))
|
|
|
|
|
|
def _resize_for_diff(img, max_height=MAX_DIFF_HEIGHT, max_width=MAX_DIFF_WIDTH):
|
|
"""
|
|
Downscale image if too large for faster diff visualization.
|
|
|
|
Users don't need pixel-perfect diffs at 20000px resolution.
|
|
Downscaling to 2000px is 100x faster and still shows all visible changes.
|
|
|
|
Args:
|
|
img: PIL Image
|
|
max_height: Maximum height in pixels
|
|
max_width: Maximum width in pixels
|
|
|
|
Returns:
|
|
PIL Image (resized if needed)
|
|
"""
|
|
if img.height > max_height or img.width > max_width:
|
|
# Calculate scaling factor to fit within max dimensions
|
|
height_ratio = max_height / img.height if img.height > max_height else 1.0
|
|
width_ratio = max_width / img.width if img.width > max_width else 1.0
|
|
ratio = min(height_ratio, width_ratio)
|
|
|
|
new_size = (int(img.width * ratio), int(img.height * ratio))
|
|
logger.debug(f"Downscaling diff visualization: {img.size} -> {new_size} ({ratio*100:.1f}% scale)")
|
|
return img.resize(new_size, Image.Resampling.LANCZOS)
|
|
|
|
return img
|
|
|
|
|
|
def calculate_diff_opencv(img_bytes_from, img_bytes_to, threshold=30):
|
|
"""
|
|
Calculate image difference using OpenCV cv2.absdiff.
|
|
|
|
This is the fastest method for diff visualization.
|
|
|
|
Args:
|
|
img_bytes_from: Previous screenshot (bytes)
|
|
img_bytes_to: Current screenshot (bytes)
|
|
threshold: Pixel difference threshold (0-255)
|
|
|
|
Returns:
|
|
tuple: (change_percentage, diff_mask) where diff_mask is a 2D numpy binary mask
|
|
"""
|
|
# Load images from BytesIO buffers
|
|
buf_from = io.BytesIO(img_bytes_from)
|
|
buf_to = io.BytesIO(img_bytes_to)
|
|
img_from = Image.open(buf_from)
|
|
img_to = Image.open(buf_to)
|
|
|
|
# Ensure images are the same size
|
|
if img_from.size != img_to.size:
|
|
img_from = img_from.resize(img_to.size, Image.Resampling.LANCZOS)
|
|
|
|
# Downscale large images for faster diff visualization
|
|
# A 20000px tall screenshot doesn't need full resolution for visual inspection
|
|
img_from = _resize_for_diff(img_from)
|
|
img_to = _resize_for_diff(img_to)
|
|
|
|
# Convert to numpy arrays
|
|
arr_from = np.array(img_from)
|
|
arr_to = np.array(img_to)
|
|
|
|
# Convert to grayscale
|
|
if len(arr_from.shape) == 3:
|
|
gray_from = cv2.cvtColor(arr_from, cv2.COLOR_RGB2GRAY)
|
|
gray_to = cv2.cvtColor(arr_to, cv2.COLOR_RGB2GRAY)
|
|
else:
|
|
gray_from = arr_from
|
|
gray_to = arr_to
|
|
|
|
# Optional Gaussian blur to reduce noise
|
|
blur_sigma = float(os.getenv("OPENCV_BLUR_SIGMA", "0.8"))
|
|
if blur_sigma > 0:
|
|
gray_from = cv2.GaussianBlur(gray_from, (0, 0), blur_sigma)
|
|
gray_to = cv2.GaussianBlur(gray_to, (0, 0), blur_sigma)
|
|
|
|
# Calculate absolute difference
|
|
diff = cv2.absdiff(gray_from, gray_to)
|
|
|
|
# Apply threshold to create binary mask
|
|
_, diff_mask = cv2.threshold(diff, threshold, 255, cv2.THRESH_BINARY)
|
|
|
|
# Calculate change percentage
|
|
changed_pixels = np.count_nonzero(diff_mask)
|
|
total_pixels = diff_mask.size
|
|
change_percentage = (changed_pixels / total_pixels) * 100
|
|
|
|
# Explicit memory cleanup - close images and buffers, delete large arrays
|
|
img_from.close()
|
|
img_to.close()
|
|
buf_from.close()
|
|
buf_to.close()
|
|
del arr_from, arr_to, gray_from, gray_to, diff
|
|
|
|
return float(change_percentage), diff_mask
|
|
|
|
|
|
def calculate_diff_pixelmatch(img_bytes_from, img_bytes_to, threshold=0.1):
|
|
"""
|
|
Calculate image difference using pixelmatch.
|
|
|
|
Args:
|
|
img_bytes_from: Previous screenshot (bytes)
|
|
img_bytes_to: Current screenshot (bytes)
|
|
threshold: Color difference threshold (0-1)
|
|
|
|
Returns:
|
|
tuple: (change_percentage, diff_array) where diff_array is RGBA numpy array
|
|
"""
|
|
try:
|
|
from pybind11_pixelmatch import pixelmatch, Options
|
|
except ImportError:
|
|
logger.warning("pybind11-pixelmatch not installed, falling back to OpenCV")
|
|
return calculate_diff_opencv(img_bytes_from, img_bytes_to, threshold * 255)
|
|
|
|
# Load images from BytesIO buffers
|
|
buf_from = io.BytesIO(img_bytes_from)
|
|
buf_to = io.BytesIO(img_bytes_to)
|
|
img_from = Image.open(buf_from)
|
|
img_to = Image.open(buf_to)
|
|
|
|
# Ensure images are the same size
|
|
if img_from.size != img_to.size:
|
|
img_from = img_from.resize(img_to.size, Image.Resampling.LANCZOS)
|
|
|
|
# Downscale large images for faster diff visualization
|
|
img_from = _resize_for_diff(img_from)
|
|
img_to = _resize_for_diff(img_to)
|
|
|
|
# Convert to RGB
|
|
if img_from.mode != 'RGB':
|
|
img_from = img_from.convert('RGB')
|
|
if img_to.mode != 'RGB':
|
|
img_to = img_to.convert('RGB')
|
|
|
|
# Convert to numpy arrays
|
|
arr_from = np.array(img_from)
|
|
arr_to = np.array(img_to)
|
|
|
|
# Add alpha channel (pixelmatch expects RGBA)
|
|
if arr_from.shape[2] == 3:
|
|
alpha = np.ones((arr_from.shape[0], arr_from.shape[1], 1), dtype=np.uint8) * 255
|
|
arr_from = np.concatenate([arr_from, alpha], axis=2)
|
|
arr_to = np.concatenate([arr_to, alpha], axis=2)
|
|
|
|
# Create diff output array
|
|
diff_array = np.zeros_like(arr_from)
|
|
|
|
# Configure pixelmatch options
|
|
opts = Options()
|
|
opts.threshold = threshold
|
|
opts.includeAA = True # Detect anti-aliasing
|
|
opts.alpha = 0.1 # Opacity of diff overlay
|
|
|
|
# Run pixelmatch
|
|
width, height = img_from.size
|
|
num_diff_pixels = pixelmatch(
|
|
arr_from,
|
|
arr_to,
|
|
output=diff_array,
|
|
options=opts
|
|
)
|
|
|
|
# Calculate change percentage
|
|
total_pixels = width * height
|
|
change_percentage = (num_diff_pixels / total_pixels) * 100
|
|
|
|
# Explicit memory cleanup - close images and buffers, delete large arrays
|
|
img_from.close()
|
|
img_to.close()
|
|
buf_from.close()
|
|
buf_to.close()
|
|
del arr_from, arr_to
|
|
if 'alpha' in locals():
|
|
del alpha
|
|
|
|
return float(change_percentage), diff_array
|
|
|
|
|
|
def generate_diff_image_opencv(img_bytes_to, diff_mask):
|
|
"""
|
|
Generate a difference image with red highlights using OpenCV.
|
|
|
|
This is the fastest method for generating diff visualization.
|
|
|
|
Args:
|
|
img_bytes_to: Current screenshot (bytes)
|
|
diff_mask: Binary mask of changed pixels (2D numpy array)
|
|
|
|
Returns:
|
|
bytes: PNG image with red highlights on changed pixels
|
|
"""
|
|
# Load current image as base from BytesIO buffer
|
|
buf_to = io.BytesIO(img_bytes_to)
|
|
img_to = Image.open(buf_to)
|
|
|
|
# Downscale for faster diff visualization
|
|
img_to = _resize_for_diff(img_to)
|
|
|
|
# Convert to RGB
|
|
if img_to.mode != 'RGB':
|
|
img_to = img_to.convert('RGB')
|
|
|
|
result_array = np.array(img_to)
|
|
|
|
# Ensure mask is same size as image
|
|
if diff_mask.shape != result_array.shape[:2]:
|
|
diff_mask = cv2.resize(diff_mask, (result_array.shape[1], result_array.shape[0]))
|
|
|
|
# Create boolean mask
|
|
changed_mask = diff_mask > 0
|
|
|
|
# Apply red highlight where mask is True (50% blend)
|
|
result_array[changed_mask] = (
|
|
result_array[changed_mask] * 0.5 + np.array([255, 0, 0]) * 0.5
|
|
).astype(np.uint8)
|
|
|
|
# Convert back to PIL Image
|
|
diff_img = Image.fromarray(result_array.astype(np.uint8))
|
|
|
|
# Save to bytes as PNG (faster rendering than JPEG for diff images)
|
|
buf = io.BytesIO()
|
|
diff_img.save(buf, format='PNG', optimize=True)
|
|
diff_bytes = buf.getvalue()
|
|
|
|
# Explicit memory cleanup - close files and buffers, delete large objects
|
|
buf.close()
|
|
buf_to.close()
|
|
diff_img.close()
|
|
img_to.close()
|
|
del result_array, changed_mask, diff_mask
|
|
|
|
return diff_bytes
|
|
|
|
|
|
def generate_diff_image_pixelmatch(diff_array):
|
|
"""
|
|
Generate a difference image from pixelmatch diff array.
|
|
|
|
Args:
|
|
diff_array: RGBA diff array from pixelmatch (4D numpy array)
|
|
|
|
Returns:
|
|
bytes: PNG image with highlighted differences
|
|
"""
|
|
# Convert diff array to PIL Image
|
|
diff_img = Image.fromarray(diff_array.astype(np.uint8), mode='RGBA')
|
|
|
|
# Save to bytes as PNG
|
|
buf = io.BytesIO()
|
|
diff_img.save(buf, format='PNG', optimize=True)
|
|
diff_bytes = buf.getvalue()
|
|
|
|
# Explicit memory cleanup - close files first, then delete
|
|
buf.close()
|
|
diff_img.close()
|
|
|
|
return diff_bytes
|
|
|
|
|
|
def render(watch, datastore, request, url_for, render_template, flash, redirect):
|
|
"""
|
|
Render the screenshot comparison diff page.
|
|
|
|
Args:
|
|
watch: Watch object
|
|
datastore: Datastore object
|
|
request: Flask request
|
|
url_for: Flask url_for function
|
|
render_template: Flask render_template function
|
|
flash: Flask flash function
|
|
redirect: Flask redirect function
|
|
|
|
Returns:
|
|
Rendered template or redirect
|
|
"""
|
|
# Get version parameters (from_version, to_version)
|
|
versions = list(watch.history.keys())
|
|
|
|
if len(versions) < 2:
|
|
flash("Not enough history to compare. Need at least 2 snapshots.", "error")
|
|
return redirect(url_for('watchlist.index'))
|
|
|
|
# Default: compare latest two versions
|
|
from_version = request.args.get('from_version', versions[-2] if len(versions) >= 2 else versions[0])
|
|
to_version = request.args.get('to_version', versions[-1])
|
|
|
|
# Validate versions exist
|
|
if from_version not in versions:
|
|
from_version = versions[-2] if len(versions) >= 2 else versions[0]
|
|
if to_version not in versions:
|
|
to_version = versions[-1]
|
|
|
|
# Get comparison method (per-watch > global > env default)
|
|
comparison_method = (
|
|
watch.get('comparison_method') or
|
|
datastore.data['settings']['application'].get('comparison_method') or
|
|
DEFAULT_COMPARISON_METHOD
|
|
)
|
|
|
|
logger.debug(f"Using comparison_method {comparison_method}")
|
|
|
|
# Get threshold (per-watch > global > env default)
|
|
threshold = watch.get('comparison_threshold')
|
|
if not threshold or threshold == '':
|
|
default_threshold = (
|
|
DEFAULT_COMPARISON_THRESHOLD_OPENCV if comparison_method == 'opencv'
|
|
else DEFAULT_COMPARISON_THRESHOLD_PIXELMATCH
|
|
)
|
|
threshold = datastore.data['settings']['application'].get('comparison_threshold', default_threshold)
|
|
|
|
# Convert threshold to appropriate type
|
|
try:
|
|
threshold = float(threshold)
|
|
# For pixelmatch, convert from 0-100 scale to 0-1 scale
|
|
if comparison_method == 'pixelmatch':
|
|
threshold = threshold / 100.0
|
|
except (ValueError, TypeError):
|
|
logger.warning(f"Invalid threshold value '{threshold}', using default")
|
|
threshold = 30.0 if comparison_method == 'opencv' else 0.1
|
|
|
|
# Load screenshots from history
|
|
try:
|
|
img_bytes_from = watch.get_history_snapshot(timestamp=from_version)
|
|
img_bytes_to = watch.get_history_snapshot(timestamp=to_version)
|
|
|
|
# Convert to bytes if needed (should already be bytes for screenshots)
|
|
if isinstance(img_bytes_from, str):
|
|
img_bytes_from = img_bytes_from.encode('utf-8')
|
|
if isinstance(img_bytes_to, str):
|
|
img_bytes_to = img_bytes_to.encode('utf-8')
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to load screenshots: {e}")
|
|
flash(f"Failed to load screenshots: {e}", "error")
|
|
return redirect(url_for('watchlist.index'))
|
|
|
|
# Calculate diff and generate difference image based on method
|
|
now = time.time()
|
|
try:
|
|
if comparison_method == 'pixelmatch':
|
|
change_percentage, diff_data = calculate_diff_pixelmatch(img_bytes_from, img_bytes_to, threshold)
|
|
diff_image_bytes = generate_diff_image_pixelmatch(diff_data)
|
|
method_display = f"Pixelmatch (threshold: {threshold*100:.0f}%)"
|
|
del diff_data # Clean up diff array
|
|
else: # opencv
|
|
change_percentage, diff_mask = calculate_diff_opencv(img_bytes_from, img_bytes_to, threshold)
|
|
diff_image_bytes = generate_diff_image_opencv(img_bytes_to, diff_mask)
|
|
method_display = f"OpenCV (threshold: {threshold:.0f})"
|
|
del diff_mask # Clean up diff mask
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to generate diff: {e}")
|
|
flash(f"Failed to generate diff: {e}", "error")
|
|
return redirect(url_for('watchlist.index'))
|
|
finally:
|
|
logger.debug(f"Done '{comparison_method}' in {time.time() - now:.2f}s")
|
|
|
|
# Convert images to base64 for embedding in template
|
|
img_from_b64 = base64.b64encode(img_bytes_from).decode('utf-8')
|
|
img_to_b64 = base64.b64encode(img_bytes_to).decode('utf-8')
|
|
diff_image_b64 = base64.b64encode(diff_image_bytes).decode('utf-8')
|
|
|
|
# Clean up large byte objects after base64 encoding
|
|
del img_bytes_from
|
|
del img_bytes_to
|
|
del diff_image_bytes
|
|
|
|
# Load historical data if available (for charts/visualization)
|
|
comparison_data = {}
|
|
comparison_config_path = os.path.join(watch.watch_data_dir, "visual_comparison_data.json")
|
|
if os.path.isfile(comparison_config_path):
|
|
try:
|
|
with open(comparison_config_path, 'r') as f:
|
|
comparison_data = json.load(f)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to load comparison history data: {e}")
|
|
|
|
# Render custom template
|
|
# Template path is namespaced to avoid conflicts with other processors
|
|
return render_template(
|
|
'image_ssim_diff/diff.html',
|
|
watch=watch,
|
|
uuid=watch.get('uuid'),
|
|
img_from_b64=img_from_b64,
|
|
img_to_b64=img_to_b64,
|
|
diff_image_b64=diff_image_b64,
|
|
change_percentage=change_percentage,
|
|
comparison_data=comparison_data, # Full history for charts/visualization
|
|
threshold=threshold,
|
|
comparison_method=method_display,
|
|
versions=versions,
|
|
from_version=from_version,
|
|
to_version=to_version,
|
|
percentage_different=change_percentage
|
|
)
|