Compare commits

...

2 Commits

Author SHA1 Message Date
dgtlmoon
11dd28b115 Fixes for image 2026-01-15 13:44:40 +01:00
dgtlmoon
109bbaf144 Revert sub-process brotli saving because it could fork-bomb/use up too many system resources 2026-01-15 13:41:10 +01:00
3 changed files with 43 additions and 120 deletions

View File

@@ -41,9 +41,10 @@ from loguru import logger
# #
# IMPLEMENTATION: # IMPLEMENTATION:
# 1. Explicit contexts everywhere (primary protection): # 1. Explicit contexts everywhere (primary protection):
# - Watch.py: ctx = multiprocessing.get_context('spawn')
# - playwright.py: ctx = multiprocessing.get_context('spawn') # - playwright.py: ctx = multiprocessing.get_context('spawn')
# - puppeteer.py: ctx = multiprocessing.get_context('spawn') # - puppeteer.py: ctx = multiprocessing.get_context('spawn')
# - isolated_opencv.py: ctx = multiprocessing.get_context('spawn')
# - isolated_libvips.py: ctx = multiprocessing.get_context('spawn')
# #
# 2. Global default (defense-in-depth, below): # 2. Global default (defense-in-depth, below):
# - Safety net if future code forgets explicit context # - Safety net if future code forgets explicit context

View File

@@ -18,21 +18,31 @@ BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRE
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7} mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
def _brotli_compress_worker(conn, filepath, mode=None): def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
""" """
Worker function to compress data with brotli in a separate process. Save compressed data using native brotli.
This isolates memory - when process exits, OS reclaims all memory. Testing shows no memory leak when using gc.collect() after compression.
Args: Args:
conn: multiprocessing.Pipe connection to receive data contents: data to compress (str or bytes)
filepath: destination file path filepath: destination file path
mode: brotli compression mode (e.g., brotli.MODE_TEXT) mode: brotli compression mode (e.g., brotli.MODE_TEXT)
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
Returns:
str: actual filepath saved (may differ from input if fallback used)
Raises:
Exception: if compression fails and fallback_uncompressed is False
""" """
import brotli import brotli
import gc
# Ensure contents are bytes
if isinstance(contents, str):
contents = contents.encode('utf-8')
try: try:
# Receive data from parent process via pipe (avoids pickle overhead)
contents = conn.recv()
logger.debug(f"Starting brotli compression of {len(contents)} bytes.") logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
if mode is not None: if mode is not None:
@@ -43,111 +53,25 @@ def _brotli_compress_worker(conn, filepath, mode=None):
with open(filepath, 'wb') as f: with open(filepath, 'wb') as f:
f.write(compressed_data) f.write(compressed_data)
# Send success status back
conn.send(True)
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.") logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
# No need for explicit cleanup - process exit frees all memory
except Exception as e:
logger.critical(f"Brotli compression worker failed: {e}")
conn.send(False)
finally:
conn.close()
# Force garbage collection to prevent memory buildup
gc.collect()
def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_uncompressed=False): return filepath
"""
Save compressed data using subprocess to isolate memory.
Uses Pipe to avoid pickle overhead for large data.
Args:
contents: data to compress (str or bytes)
filepath: destination file path
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
timeout: subprocess timeout in seconds
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
Returns:
str: actual filepath saved (may differ from input if fallback used)
Raises:
Exception: if compression fails and fallback_uncompressed is False
"""
import multiprocessing
import sys
# Ensure contents are bytes
if isinstance(contents, str):
contents = contents.encode('utf-8')
# Use explicit spawn context for thread safety (avoids fork() with multi-threaded parent)
# Always use spawn - consistent behavior in tests and production
ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
# Run compression in subprocess using spawn (not fork)
proc = ctx.Process(target=_brotli_compress_worker, args=(child_conn, filepath, mode))
# Windows-safe: Set daemon=False explicitly to avoid issues with process cleanup
proc.daemon = False
proc.start()
try:
# Send data to subprocess via pipe (avoids pickle)
parent_conn.send(contents)
# Wait for result with timeout
if parent_conn.poll(timeout):
success = parent_conn.recv()
else:
success = False
logger.warning(f"Brotli compression subprocess timed out after {timeout}s")
# Graceful termination with platform-aware cleanup
try:
proc.terminate()
except Exception as term_error:
logger.debug(f"Process termination issue (may be normal on Windows): {term_error}")
parent_conn.close()
proc.join(timeout=5)
# Force kill if still alive after graceful termination
if proc.is_alive():
try:
if sys.platform == 'win32':
# Windows: use kill() which is more forceful
proc.kill()
else:
# Unix: terminate() already sent SIGTERM, now try SIGKILL
proc.kill()
proc.join(timeout=2)
except Exception as kill_error:
logger.warning(f"Failed to kill brotli compression process: {kill_error}")
# Check if file was created successfully
if success and os.path.exists(filepath):
return filepath
except Exception as e: except Exception as e:
logger.error(f"Brotli compression error: {e}") logger.error(f"Brotli compression error: {e}")
try:
parent_conn.close()
except:
pass
try:
proc.terminate()
proc.join(timeout=2)
except:
pass
# Compression failed # Compression failed
if fallback_uncompressed: if fallback_uncompressed:
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed") logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
fallback_path = filepath.replace('.br', '') fallback_path = filepath.replace('.br', '')
with open(fallback_path, 'wb') as f: with open(fallback_path, 'wb') as f:
f.write(contents) f.write(contents)
return fallback_path return fallback_path
else: else:
raise Exception(f"Brotli compression subprocess failed for {filepath}") raise Exception(f"Brotli compression failed for {filepath}: {e}")
class model(watch_base): class model(watch_base):
@@ -523,7 +447,7 @@ class model(watch_base):
if not os.path.exists(dest): if not os.path.exists(dest):
try: try:
actual_dest = _brotli_subprocess_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True) actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
if actual_dest != dest: if actual_dest != dest:
snapshot_fname = os.path.basename(actual_dest) snapshot_fname = os.path.basename(actual_dest)
except Exception as e: except Exception as e:
@@ -949,13 +873,13 @@ class model(watch_base):
def save_last_text_fetched_before_filters(self, contents): def save_last_text_fetched_before_filters(self, contents):
import brotli import brotli
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br') filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
_brotli_subprocess_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False) _brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
def save_last_fetched_html(self, timestamp, contents): def save_last_fetched_html(self, timestamp, contents):
self.ensure_data_dir_exists() self.ensure_data_dir_exists()
snapshot_fname = f"{timestamp}.html.br" snapshot_fname = f"{timestamp}.html.br"
filepath = os.path.join(self.watch_data_dir, snapshot_fname) filepath = os.path.join(self.watch_data_dir, snapshot_fname)
_brotli_subprocess_save(contents, filepath, mode=None, fallback_uncompressed=True) _brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
self._prune_last_fetched_html_snapshots() self._prune_last_fetched_html_snapshots()
def get_fetched_html(self, timestamp): def get_fetched_html(self, timestamp):

View File

@@ -13,14 +13,9 @@ Research: https://github.com/libvips/pyvips/issues/234
import multiprocessing import multiprocessing
# CRITICAL: Use 'spawn' instead of 'fork' to avoid inheriting parent's # CRITICAL: Use 'spawn' context instead of 'fork' to avoid inheriting parent's
# LibVIPS threading state which can cause hangs in gaussblur operations # LibVIPS threading state which can cause hangs in gaussblur operations
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
try:
multiprocessing.set_start_method('spawn', force=False)
except RuntimeError:
# Already set, ignore
pass
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height): def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
@@ -95,9 +90,10 @@ def generate_diff_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
Returns: Returns:
bytes: JPEG diff image or None on failure bytes: JPEG diff image or None on failure
""" """
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
p = multiprocessing.Process( p = ctx.Process(
target=_worker_generate_diff, target=_worker_generate_diff,
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height) args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
) )
@@ -140,7 +136,8 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
Returns: Returns:
float: Change percentage float: Change percentage
""" """
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
def _worker_calculate(conn): def _worker_calculate(conn):
try: try:
@@ -185,7 +182,7 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
finally: finally:
conn.close() conn.close()
p = multiprocessing.Process(target=_worker_calculate, args=(child_conn,)) p = ctx.Process(target=_worker_calculate, args=(child_conn,))
p.start() p.start()
result = 0.0 result = 0.0
@@ -233,7 +230,8 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
tuple: (changed_detected, change_percentage) tuple: (changed_detected, change_percentage)
""" """
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True) print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
parent_conn, child_conn = multiprocessing.Pipe() ctx = multiprocessing.get_context('spawn')
parent_conn, child_conn = ctx.Pipe()
def _worker_compare(conn): def _worker_compare(conn):
try: try:
@@ -301,7 +299,7 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
finally: finally:
conn.close() conn.close()
p = multiprocessing.Process(target=_worker_compare, args=(child_conn,)) p = ctx.Process(target=_worker_compare, args=(child_conn,))
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True) print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
p.start() p.start()
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True) print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)