mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-01-19 13:40:34 +00:00
Compare commits
2 Commits
python-314
...
brotli-nat
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
11dd28b115 | ||
|
|
109bbaf144 |
@@ -41,9 +41,10 @@ from loguru import logger
|
|||||||
#
|
#
|
||||||
# IMPLEMENTATION:
|
# IMPLEMENTATION:
|
||||||
# 1. Explicit contexts everywhere (primary protection):
|
# 1. Explicit contexts everywhere (primary protection):
|
||||||
# - Watch.py: ctx = multiprocessing.get_context('spawn')
|
|
||||||
# - playwright.py: ctx = multiprocessing.get_context('spawn')
|
# - playwright.py: ctx = multiprocessing.get_context('spawn')
|
||||||
# - puppeteer.py: ctx = multiprocessing.get_context('spawn')
|
# - puppeteer.py: ctx = multiprocessing.get_context('spawn')
|
||||||
|
# - isolated_opencv.py: ctx = multiprocessing.get_context('spawn')
|
||||||
|
# - isolated_libvips.py: ctx = multiprocessing.get_context('spawn')
|
||||||
#
|
#
|
||||||
# 2. Global default (defense-in-depth, below):
|
# 2. Global default (defense-in-depth, below):
|
||||||
# - Safety net if future code forgets explicit context
|
# - Safety net if future code forgets explicit context
|
||||||
|
|||||||
@@ -18,21 +18,31 @@ BROTLI_COMPRESS_SIZE_THRESHOLD = int(os.getenv('SNAPSHOT_BROTLI_COMPRESSION_THRE
|
|||||||
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
minimum_seconds_recheck_time = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3))
|
||||||
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
mtable = {'seconds': 1, 'minutes': 60, 'hours': 3600, 'days': 86400, 'weeks': 86400 * 7}
|
||||||
|
|
||||||
def _brotli_compress_worker(conn, filepath, mode=None):
|
def _brotli_save(contents, filepath, mode=None, fallback_uncompressed=False):
|
||||||
"""
|
"""
|
||||||
Worker function to compress data with brotli in a separate process.
|
Save compressed data using native brotli.
|
||||||
This isolates memory - when process exits, OS reclaims all memory.
|
Testing shows no memory leak when using gc.collect() after compression.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
conn: multiprocessing.Pipe connection to receive data
|
contents: data to compress (str or bytes)
|
||||||
filepath: destination file path
|
filepath: destination file path
|
||||||
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
||||||
|
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
str: actual filepath saved (may differ from input if fallback used)
|
||||||
|
|
||||||
|
Raises:
|
||||||
|
Exception: if compression fails and fallback_uncompressed is False
|
||||||
"""
|
"""
|
||||||
import brotli
|
import brotli
|
||||||
|
import gc
|
||||||
|
|
||||||
|
# Ensure contents are bytes
|
||||||
|
if isinstance(contents, str):
|
||||||
|
contents = contents.encode('utf-8')
|
||||||
|
|
||||||
try:
|
try:
|
||||||
# Receive data from parent process via pipe (avoids pickle overhead)
|
|
||||||
contents = conn.recv()
|
|
||||||
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
logger.debug(f"Starting brotli compression of {len(contents)} bytes.")
|
||||||
|
|
||||||
if mode is not None:
|
if mode is not None:
|
||||||
@@ -43,111 +53,25 @@ def _brotli_compress_worker(conn, filepath, mode=None):
|
|||||||
with open(filepath, 'wb') as f:
|
with open(filepath, 'wb') as f:
|
||||||
f.write(compressed_data)
|
f.write(compressed_data)
|
||||||
|
|
||||||
# Send success status back
|
|
||||||
conn.send(True)
|
|
||||||
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
logger.debug(f"Finished brotli compression - From {len(contents)} to {len(compressed_data)} bytes.")
|
||||||
# No need for explicit cleanup - process exit frees all memory
|
|
||||||
except Exception as e:
|
|
||||||
logger.critical(f"Brotli compression worker failed: {e}")
|
|
||||||
conn.send(False)
|
|
||||||
finally:
|
|
||||||
conn.close()
|
|
||||||
|
|
||||||
|
# Force garbage collection to prevent memory buildup
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
def _brotli_subprocess_save(contents, filepath, mode=None, timeout=30, fallback_uncompressed=False):
|
return filepath
|
||||||
"""
|
|
||||||
Save compressed data using subprocess to isolate memory.
|
|
||||||
Uses Pipe to avoid pickle overhead for large data.
|
|
||||||
|
|
||||||
Args:
|
|
||||||
contents: data to compress (str or bytes)
|
|
||||||
filepath: destination file path
|
|
||||||
mode: brotli compression mode (e.g., brotli.MODE_TEXT)
|
|
||||||
timeout: subprocess timeout in seconds
|
|
||||||
fallback_uncompressed: if True, save uncompressed on failure; if False, raise exception
|
|
||||||
|
|
||||||
Returns:
|
|
||||||
str: actual filepath saved (may differ from input if fallback used)
|
|
||||||
|
|
||||||
Raises:
|
|
||||||
Exception: if compression fails and fallback_uncompressed is False
|
|
||||||
"""
|
|
||||||
import multiprocessing
|
|
||||||
import sys
|
|
||||||
|
|
||||||
# Ensure contents are bytes
|
|
||||||
if isinstance(contents, str):
|
|
||||||
contents = contents.encode('utf-8')
|
|
||||||
|
|
||||||
# Use explicit spawn context for thread safety (avoids fork() with multi-threaded parent)
|
|
||||||
# Always use spawn - consistent behavior in tests and production
|
|
||||||
ctx = multiprocessing.get_context('spawn')
|
|
||||||
parent_conn, child_conn = ctx.Pipe()
|
|
||||||
|
|
||||||
# Run compression in subprocess using spawn (not fork)
|
|
||||||
proc = ctx.Process(target=_brotli_compress_worker, args=(child_conn, filepath, mode))
|
|
||||||
|
|
||||||
# Windows-safe: Set daemon=False explicitly to avoid issues with process cleanup
|
|
||||||
proc.daemon = False
|
|
||||||
proc.start()
|
|
||||||
|
|
||||||
try:
|
|
||||||
# Send data to subprocess via pipe (avoids pickle)
|
|
||||||
parent_conn.send(contents)
|
|
||||||
|
|
||||||
# Wait for result with timeout
|
|
||||||
if parent_conn.poll(timeout):
|
|
||||||
success = parent_conn.recv()
|
|
||||||
else:
|
|
||||||
success = False
|
|
||||||
logger.warning(f"Brotli compression subprocess timed out after {timeout}s")
|
|
||||||
# Graceful termination with platform-aware cleanup
|
|
||||||
try:
|
|
||||||
proc.terminate()
|
|
||||||
except Exception as term_error:
|
|
||||||
logger.debug(f"Process termination issue (may be normal on Windows): {term_error}")
|
|
||||||
|
|
||||||
parent_conn.close()
|
|
||||||
proc.join(timeout=5)
|
|
||||||
|
|
||||||
# Force kill if still alive after graceful termination
|
|
||||||
if proc.is_alive():
|
|
||||||
try:
|
|
||||||
if sys.platform == 'win32':
|
|
||||||
# Windows: use kill() which is more forceful
|
|
||||||
proc.kill()
|
|
||||||
else:
|
|
||||||
# Unix: terminate() already sent SIGTERM, now try SIGKILL
|
|
||||||
proc.kill()
|
|
||||||
proc.join(timeout=2)
|
|
||||||
except Exception as kill_error:
|
|
||||||
logger.warning(f"Failed to kill brotli compression process: {kill_error}")
|
|
||||||
|
|
||||||
# Check if file was created successfully
|
|
||||||
if success and os.path.exists(filepath):
|
|
||||||
return filepath
|
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Brotli compression error: {e}")
|
logger.error(f"Brotli compression error: {e}")
|
||||||
try:
|
|
||||||
parent_conn.close()
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
try:
|
|
||||||
proc.terminate()
|
|
||||||
proc.join(timeout=2)
|
|
||||||
except:
|
|
||||||
pass
|
|
||||||
|
|
||||||
# Compression failed
|
# Compression failed
|
||||||
if fallback_uncompressed:
|
if fallback_uncompressed:
|
||||||
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
|
logger.warning(f"Brotli compression failed for {filepath}, saving uncompressed")
|
||||||
fallback_path = filepath.replace('.br', '')
|
fallback_path = filepath.replace('.br', '')
|
||||||
with open(fallback_path, 'wb') as f:
|
with open(fallback_path, 'wb') as f:
|
||||||
f.write(contents)
|
f.write(contents)
|
||||||
return fallback_path
|
return fallback_path
|
||||||
else:
|
else:
|
||||||
raise Exception(f"Brotli compression subprocess failed for {filepath}")
|
raise Exception(f"Brotli compression failed for {filepath}: {e}")
|
||||||
|
|
||||||
|
|
||||||
class model(watch_base):
|
class model(watch_base):
|
||||||
@@ -523,7 +447,7 @@ class model(watch_base):
|
|||||||
|
|
||||||
if not os.path.exists(dest):
|
if not os.path.exists(dest):
|
||||||
try:
|
try:
|
||||||
actual_dest = _brotli_subprocess_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
|
actual_dest = _brotli_save(contents, dest, mode=brotli.MODE_TEXT, fallback_uncompressed=True)
|
||||||
if actual_dest != dest:
|
if actual_dest != dest:
|
||||||
snapshot_fname = os.path.basename(actual_dest)
|
snapshot_fname = os.path.basename(actual_dest)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
@@ -949,13 +873,13 @@ class model(watch_base):
|
|||||||
def save_last_text_fetched_before_filters(self, contents):
|
def save_last_text_fetched_before_filters(self, contents):
|
||||||
import brotli
|
import brotli
|
||||||
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
|
filepath = os.path.join(self.watch_data_dir, 'last-fetched.br')
|
||||||
_brotli_subprocess_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
|
_brotli_save(contents, filepath, mode=brotli.MODE_TEXT, fallback_uncompressed=False)
|
||||||
|
|
||||||
def save_last_fetched_html(self, timestamp, contents):
|
def save_last_fetched_html(self, timestamp, contents):
|
||||||
self.ensure_data_dir_exists()
|
self.ensure_data_dir_exists()
|
||||||
snapshot_fname = f"{timestamp}.html.br"
|
snapshot_fname = f"{timestamp}.html.br"
|
||||||
filepath = os.path.join(self.watch_data_dir, snapshot_fname)
|
filepath = os.path.join(self.watch_data_dir, snapshot_fname)
|
||||||
_brotli_subprocess_save(contents, filepath, mode=None, fallback_uncompressed=True)
|
_brotli_save(contents, filepath, mode=None, fallback_uncompressed=True)
|
||||||
self._prune_last_fetched_html_snapshots()
|
self._prune_last_fetched_html_snapshots()
|
||||||
|
|
||||||
def get_fetched_html(self, timestamp):
|
def get_fetched_html(self, timestamp):
|
||||||
|
|||||||
@@ -13,14 +13,9 @@ Research: https://github.com/libvips/pyvips/issues/234
|
|||||||
|
|
||||||
import multiprocessing
|
import multiprocessing
|
||||||
|
|
||||||
# CRITICAL: Use 'spawn' instead of 'fork' to avoid inheriting parent's
|
# CRITICAL: Use 'spawn' context instead of 'fork' to avoid inheriting parent's
|
||||||
# LibVIPS threading state which can cause hangs in gaussblur operations
|
# LibVIPS threading state which can cause hangs in gaussblur operations
|
||||||
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
# https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||||||
try:
|
|
||||||
multiprocessing.set_start_method('spawn', force=False)
|
|
||||||
except RuntimeError:
|
|
||||||
# Already set, ignore
|
|
||||||
pass
|
|
||||||
|
|
||||||
|
|
||||||
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
|
def _worker_generate_diff(conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height):
|
||||||
@@ -95,9 +90,10 @@ def generate_diff_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
Returns:
|
Returns:
|
||||||
bytes: JPEG diff image or None on failure
|
bytes: JPEG diff image or None on failure
|
||||||
"""
|
"""
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
p = multiprocessing.Process(
|
p = ctx.Process(
|
||||||
target=_worker_generate_diff,
|
target=_worker_generate_diff,
|
||||||
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
|
args=(child_conn, img_bytes_from, img_bytes_to, threshold, blur_sigma, max_width, max_height)
|
||||||
)
|
)
|
||||||
@@ -140,7 +136,8 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
|
|||||||
Returns:
|
Returns:
|
||||||
float: Change percentage
|
float: Change percentage
|
||||||
"""
|
"""
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
def _worker_calculate(conn):
|
def _worker_calculate(conn):
|
||||||
try:
|
try:
|
||||||
@@ -185,7 +182,7 @@ def calculate_change_percentage_isolated(img_bytes_from, img_bytes_to, threshold
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
p = multiprocessing.Process(target=_worker_calculate, args=(child_conn,))
|
p = ctx.Process(target=_worker_calculate, args=(child_conn,))
|
||||||
p.start()
|
p.start()
|
||||||
|
|
||||||
result = 0.0
|
result = 0.0
|
||||||
@@ -233,7 +230,8 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
tuple: (changed_detected, change_percentage)
|
tuple: (changed_detected, change_percentage)
|
||||||
"""
|
"""
|
||||||
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
|
print(f"[Parent] Starting compare_images_isolated subprocess", flush=True)
|
||||||
parent_conn, child_conn = multiprocessing.Pipe()
|
ctx = multiprocessing.get_context('spawn')
|
||||||
|
parent_conn, child_conn = ctx.Pipe()
|
||||||
|
|
||||||
def _worker_compare(conn):
|
def _worker_compare(conn):
|
||||||
try:
|
try:
|
||||||
@@ -301,7 +299,7 @@ def compare_images_isolated(img_bytes_from, img_bytes_to, threshold, blur_sigma,
|
|||||||
finally:
|
finally:
|
||||||
conn.close()
|
conn.close()
|
||||||
|
|
||||||
p = multiprocessing.Process(target=_worker_compare, args=(child_conn,))
|
p = ctx.Process(target=_worker_compare, args=(child_conn,))
|
||||||
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
|
print(f"[Parent] Starting subprocess (pid will be assigned)", flush=True)
|
||||||
p.start()
|
p.start()
|
||||||
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)
|
print(f"[Parent] Subprocess started (pid={p.pid}), waiting for result (30s timeout)", flush=True)
|
||||||
|
|||||||
Reference in New Issue
Block a user