mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-02-26 08:06:09 +00:00
Compare commits
4 Commits
API-fields
...
memory-imp
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fbdd993c04 | ||
|
|
499f480950 | ||
|
|
b2e75c312e | ||
|
|
103e2d879b |
@@ -16,6 +16,7 @@ recursive-include changedetectionio/widgets *
|
|||||||
prune changedetectionio/static/package-lock.json
|
prune changedetectionio/static/package-lock.json
|
||||||
prune changedetectionio/static/styles/node_modules
|
prune changedetectionio/static/styles/node_modules
|
||||||
prune changedetectionio/static/styles/package-lock.json
|
prune changedetectionio/static/styles/package-lock.json
|
||||||
|
include changedetectionio/favicon_utils.py
|
||||||
include changedetection.py
|
include changedetection.py
|
||||||
include requirements.txt
|
include requirements.txt
|
||||||
include README-pip.md
|
include README-pip.md
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ import os
|
|||||||
import threading
|
import threading
|
||||||
|
|
||||||
from changedetectionio.validate_url import is_safe_valid_url
|
from changedetectionio.validate_url import is_safe_valid_url
|
||||||
|
from changedetectionio.favicon_utils import get_favicon_mime_type
|
||||||
|
|
||||||
from . import auth
|
from . import auth
|
||||||
from changedetectionio import queuedWatchMetaData, strtobool
|
from changedetectionio import queuedWatchMetaData, strtobool
|
||||||
@@ -402,16 +403,9 @@ class WatchFavicon(Resource):
|
|||||||
|
|
||||||
favicon_filename = watch.get_favicon_filename()
|
favicon_filename = watch.get_favicon_filename()
|
||||||
if favicon_filename:
|
if favicon_filename:
|
||||||
try:
|
# Use cached MIME type detection
|
||||||
import magic
|
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
|
||||||
mime = magic.from_file(
|
mime = get_favicon_mime_type(filepath)
|
||||||
os.path.join(watch.watch_data_dir, favicon_filename),
|
|
||||||
mime=True
|
|
||||||
)
|
|
||||||
except ImportError:
|
|
||||||
# Fallback, no python-magic
|
|
||||||
import mimetypes
|
|
||||||
mime, encoding = mimetypes.guess_type(favicon_filename)
|
|
||||||
|
|
||||||
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
||||||
response.headers['Content-type'] = mime
|
response.headers['Content-type'] = mime
|
||||||
|
|||||||
@@ -30,13 +30,23 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
app: Flask application instance
|
app: Flask application instance
|
||||||
datastore: Application datastore
|
datastore: Application datastore
|
||||||
executor: ThreadPoolExecutor for queue operations (optional)
|
executor: ThreadPoolExecutor for queue operations (optional)
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
"restart" if worker should restart, "shutdown" for clean exit
|
||||||
"""
|
"""
|
||||||
# Set a descriptive name for this task
|
# Set a descriptive name for this task
|
||||||
task = asyncio.current_task()
|
task = asyncio.current_task()
|
||||||
if task:
|
if task:
|
||||||
task.set_name(f"async-worker-{worker_id}")
|
task.set_name(f"async-worker-{worker_id}")
|
||||||
|
|
||||||
logger.info(f"Starting async worker {worker_id}")
|
# Read restart policy from environment
|
||||||
|
max_jobs = int(os.getenv("WORKER_MAX_JOBS", "10"))
|
||||||
|
max_runtime_seconds = int(os.getenv("WORKER_MAX_RUNTIME", "3600")) # 1 hour default
|
||||||
|
|
||||||
|
jobs_processed = 0
|
||||||
|
start_time = time.time()
|
||||||
|
|
||||||
|
logger.info(f"Starting async worker {worker_id} (max_jobs={max_jobs}, max_runtime={max_runtime_seconds}s)")
|
||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
update_handler = None
|
update_handler = None
|
||||||
@@ -51,7 +61,11 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
)
|
)
|
||||||
|
|
||||||
except asyncio.TimeoutError:
|
except asyncio.TimeoutError:
|
||||||
# No jobs available, continue loop
|
# No jobs available - check if we should restart based on time while idle
|
||||||
|
runtime = time.time() - start_time
|
||||||
|
if runtime >= max_runtime_seconds:
|
||||||
|
logger.info(f"Worker {worker_id} idle and reached max runtime ({runtime:.0f}s), restarting")
|
||||||
|
return "restart"
|
||||||
continue
|
continue
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
# Handle expected Empty exception from queue timeout
|
# Handle expected Empty exception from queue timeout
|
||||||
@@ -488,6 +502,19 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
# Small yield for normal completion
|
# Small yield for normal completion
|
||||||
await asyncio.sleep(0.01)
|
await asyncio.sleep(0.01)
|
||||||
|
|
||||||
|
# Job completed - increment counter and check restart conditions
|
||||||
|
jobs_processed += 1
|
||||||
|
runtime = time.time() - start_time
|
||||||
|
|
||||||
|
# Check if we should restart (only when idle, between jobs)
|
||||||
|
should_restart_jobs = jobs_processed >= max_jobs
|
||||||
|
should_restart_time = runtime >= max_runtime_seconds
|
||||||
|
|
||||||
|
if should_restart_jobs or should_restart_time:
|
||||||
|
reason = f"{jobs_processed} jobs" if should_restart_jobs else f"{runtime:.0f}s runtime"
|
||||||
|
logger.info(f"Worker {worker_id} restarting after {reason} ({jobs_processed} jobs, {runtime:.0f}s runtime)")
|
||||||
|
return "restart"
|
||||||
|
|
||||||
# Check if we should exit
|
# Check if we should exit
|
||||||
if app.config.exit.is_set():
|
if app.config.exit.is_set():
|
||||||
break
|
break
|
||||||
@@ -495,10 +522,12 @@ async def async_update_worker(worker_id, q, notification_q, app, datastore, exec
|
|||||||
# Check if we're in pytest environment - if so, be more gentle with logging
|
# Check if we're in pytest environment - if so, be more gentle with logging
|
||||||
import sys
|
import sys
|
||||||
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
|
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info(f"Worker {worker_id} shutting down")
|
logger.info(f"Worker {worker_id} shutting down")
|
||||||
|
|
||||||
|
return "shutdown"
|
||||||
|
|
||||||
|
|
||||||
def cleanup_error_artifacts(uuid, datastore):
|
def cleanup_error_artifacts(uuid, datastore):
|
||||||
"""Helper function to clean up error artifacts"""
|
"""Helper function to clean up error artifacts"""
|
||||||
|
|||||||
43
changedetectionio/favicon_utils.py
Normal file
43
changedetectionio/favicon_utils.py
Normal file
@@ -0,0 +1,43 @@
|
|||||||
|
"""
|
||||||
|
Favicon utilities for changedetection.io
|
||||||
|
Handles favicon MIME type detection with caching
|
||||||
|
"""
|
||||||
|
|
||||||
|
from functools import lru_cache
|
||||||
|
|
||||||
|
|
||||||
|
@lru_cache(maxsize=1000)
|
||||||
|
def get_favicon_mime_type(filepath):
|
||||||
|
"""
|
||||||
|
Detect MIME type of favicon by reading file content using puremagic.
|
||||||
|
Results are cached to avoid repeatedly reading the same files.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
filepath: Full path to the favicon file
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
MIME type string (e.g., 'image/png')
|
||||||
|
"""
|
||||||
|
mime = None
|
||||||
|
|
||||||
|
try:
|
||||||
|
import puremagic
|
||||||
|
with open(filepath, 'rb') as f:
|
||||||
|
content_bytes = f.read(200) # Read first 200 bytes
|
||||||
|
|
||||||
|
detections = puremagic.magic_string(content_bytes)
|
||||||
|
if detections:
|
||||||
|
mime = detections[0].mime_type
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Fallback to mimetypes if puremagic fails
|
||||||
|
if not mime:
|
||||||
|
import mimetypes
|
||||||
|
mime, _ = mimetypes.guess_type(filepath)
|
||||||
|
|
||||||
|
# Final fallback based on extension
|
||||||
|
if not mime:
|
||||||
|
mime = 'image/x-icon' if filepath.endswith('.ico') else 'image/png'
|
||||||
|
|
||||||
|
return mime
|
||||||
@@ -44,6 +44,8 @@ from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, Watch
|
|||||||
from changedetectionio.api.Search import Search
|
from changedetectionio.api.Search import Search
|
||||||
from .time_handler import is_within_schedule
|
from .time_handler import is_within_schedule
|
||||||
from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale
|
from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale
|
||||||
|
from changedetectionio.favicon_utils import get_favicon_mime_type
|
||||||
|
|
||||||
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
IN_PYTEST = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ
|
||||||
|
|
||||||
datastore = None
|
datastore = None
|
||||||
@@ -69,9 +71,13 @@ socketio_server = None
|
|||||||
CORS(app)
|
CORS(app)
|
||||||
|
|
||||||
# Super handy for compressing large BrowserSteps responses and others
|
# Super handy for compressing large BrowserSteps responses and others
|
||||||
FlaskCompress(app)
|
# Flask-Compress handles HTTP compression, Socket.IO compression disabled to prevent memory leak
|
||||||
app.config['COMPRESS_MIN_SIZE'] = 4096
|
compress = FlaskCompress()
|
||||||
|
app.config['COMPRESS_MIN_SIZE'] = 2096
|
||||||
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml']
|
app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml']
|
||||||
|
# Use gzip only - smaller memory footprint than zstd/brotli (4-8KB vs 200-500KB contexts)
|
||||||
|
app.config['COMPRESS_ALGORITHM'] = ['gzip']
|
||||||
|
compress.init_app(app)
|
||||||
app.config['TEMPLATES_AUTO_RELOAD'] = False
|
app.config['TEMPLATES_AUTO_RELOAD'] = False
|
||||||
|
|
||||||
|
|
||||||
@@ -682,16 +688,9 @@ def changedetection_app(config=None, datastore_o=None):
|
|||||||
|
|
||||||
favicon_filename = watch.get_favicon_filename()
|
favicon_filename = watch.get_favicon_filename()
|
||||||
if favicon_filename:
|
if favicon_filename:
|
||||||
try:
|
# Use cached MIME type detection
|
||||||
import magic
|
filepath = os.path.join(watch.watch_data_dir, favicon_filename)
|
||||||
mime = magic.from_file(
|
mime = get_favicon_mime_type(filepath)
|
||||||
os.path.join(watch.watch_data_dir, favicon_filename),
|
|
||||||
mime=True
|
|
||||||
)
|
|
||||||
except ImportError:
|
|
||||||
# Fallback, no python-magic
|
|
||||||
import mimetypes
|
|
||||||
mime, encoding = mimetypes.guess_type(favicon_filename)
|
|
||||||
|
|
||||||
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename))
|
||||||
response.headers['Content-type'] = mime
|
response.headers['Content-type'] = mime
|
||||||
|
|||||||
@@ -69,7 +69,7 @@ class RecheckPriorityQueue:
|
|||||||
# Emit signals
|
# Emit signals
|
||||||
self._emit_put_signals(item)
|
self._emit_put_signals(item)
|
||||||
|
|
||||||
logger.debug(f"Successfully queued item: {self._get_item_uuid(item)}")
|
logger.trace(f"Successfully queued item: {self._get_item_uuid(item)}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
@@ -240,7 +240,10 @@ def init_socketio(app, datastore):
|
|||||||
async_mode=async_mode,
|
async_mode=async_mode,
|
||||||
cors_allowed_origins=cors_origins, # None means same-origin only
|
cors_allowed_origins=cors_origins, # None means same-origin only
|
||||||
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
||||||
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')))
|
engineio_logger=strtobool(os.getenv('SOCKETIO_LOGGING', 'False')),
|
||||||
|
# Disable WebSocket compression to prevent memory accumulation
|
||||||
|
# Flask-Compress already handles HTTP response compression
|
||||||
|
engineio_options={'http_compression': False, 'compression_threshold': 0})
|
||||||
|
|
||||||
# Set up event handlers
|
# Set up event handlers
|
||||||
logger.info("Socket.IO: Registering connect event handler")
|
logger.info("Socket.IO: Registering connect event handler")
|
||||||
|
|||||||
@@ -91,8 +91,8 @@ REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 tests/test_notif
|
|||||||
# And again with brotli+screenshot attachment
|
# And again with brotli+screenshot attachment
|
||||||
SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5 REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --dist=load tests/test_backend.py tests/test_rss.py tests/test_unique_lines.py tests/test_notification.py tests/test_access_control.py
|
SNAPSHOT_BROTLI_COMPRESSION_THRESHOLD=5 REMOVE_REQUESTS_OLD_SCREENSHOTS=false pytest -vv -s --maxfail=1 --dist=load tests/test_backend.py tests/test_rss.py tests/test_unique_lines.py tests/test_notification.py tests/test_access_control.py
|
||||||
|
|
||||||
# Try high concurrency
|
# Try high concurrency with aggressive worker restarts
|
||||||
FETCH_WORKERS=50 pytest tests/test_history_consistency.py -vv -l -s
|
FETCH_WORKERS=50 WORKER_MAX_RUNTIME=2 WORKER_MAX_JOBS=1 pytest tests/test_history_consistency.py -vv -l -s
|
||||||
|
|
||||||
# Check file:// will pickup a file when enabled
|
# Check file:// will pickup a file when enabled
|
||||||
echo "Hello world" > /tmp/test-file.txt
|
echo "Hello world" > /tmp/test-file.txt
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
import gc
|
||||||
import shutil
|
import shutil
|
||||||
|
|
||||||
from changedetectionio.strtobool import strtobool
|
from changedetectionio.strtobool import strtobool
|
||||||
@@ -125,6 +126,10 @@ class ChangeDetectionStore:
|
|||||||
if 'application' in from_disk['settings']:
|
if 'application' in from_disk['settings']:
|
||||||
self.__data['settings']['application'].update(from_disk['settings']['application'])
|
self.__data['settings']['application'].update(from_disk['settings']['application'])
|
||||||
|
|
||||||
|
# from_disk no longer needed - free memory immediately
|
||||||
|
del from_disk
|
||||||
|
gc.collect()
|
||||||
|
|
||||||
# Convert each existing watch back to the Watch.model object
|
# Convert each existing watch back to the Watch.model object
|
||||||
for uuid, watch in self.__data['watching'].items():
|
for uuid, watch in self.__data['watching'].items():
|
||||||
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
|
self.__data['watching'][uuid] = self.rehydrate_entity(uuid, watch)
|
||||||
@@ -450,7 +455,7 @@ class ChangeDetectionStore:
|
|||||||
data = deepcopy(self.__data)
|
data = deepcopy(self.__data)
|
||||||
except RuntimeError as e:
|
except RuntimeError as e:
|
||||||
# Try again in 15 seconds
|
# Try again in 15 seconds
|
||||||
time.sleep(15)
|
time.sleep(1)
|
||||||
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
|
logger.error(f"! Data changed when writing to JSON, trying again.. {str(e)}")
|
||||||
self.sync_to_json()
|
self.sync_to_json()
|
||||||
return
|
return
|
||||||
|
|||||||
@@ -132,11 +132,19 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
|
|||||||
|
|
||||||
while not app.config.exit.is_set():
|
while not app.config.exit.is_set():
|
||||||
try:
|
try:
|
||||||
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
result = await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
|
||||||
# If we reach here, worker exited cleanly
|
|
||||||
if not in_pytest:
|
if result == "restart":
|
||||||
logger.info(f"Async worker {worker_id} exited cleanly")
|
# Worker requested restart - immediately loop back and restart
|
||||||
break
|
if not in_pytest:
|
||||||
|
logger.debug(f"Async worker {worker_id} restarting")
|
||||||
|
continue
|
||||||
|
else:
|
||||||
|
# Worker exited cleanly (shutdown)
|
||||||
|
if not in_pytest:
|
||||||
|
logger.info(f"Async worker {worker_id} exited cleanly")
|
||||||
|
break
|
||||||
|
|
||||||
except asyncio.CancelledError:
|
except asyncio.CancelledError:
|
||||||
# Task was cancelled (normal shutdown)
|
# Task was cancelled (normal shutdown)
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
@@ -147,7 +155,7 @@ async def start_single_async_worker(worker_id, update_q, notification_q, app, da
|
|||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
|
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
|
||||||
await asyncio.sleep(5)
|
await asyncio.sleep(5)
|
||||||
|
|
||||||
if not in_pytest:
|
if not in_pytest:
|
||||||
logger.info(f"Async worker {worker_id} shutdown complete")
|
logger.info(f"Async worker {worker_id} shutdown complete")
|
||||||
|
|
||||||
@@ -161,7 +169,11 @@ def add_worker(update_q, notification_q, app, datastore):
|
|||||||
"""Add a new async worker (for dynamic scaling)"""
|
"""Add a new async worker (for dynamic scaling)"""
|
||||||
global worker_threads
|
global worker_threads
|
||||||
|
|
||||||
worker_id = len(worker_threads)
|
# Reuse lowest available ID to prevent unbounded growth over time
|
||||||
|
used_ids = {w.worker_id for w in worker_threads}
|
||||||
|
worker_id = 0
|
||||||
|
while worker_id in used_ids:
|
||||||
|
worker_id += 1
|
||||||
logger.info(f"Adding async worker {worker_id}")
|
logger.info(f"Adding async worker {worker_id}")
|
||||||
|
|
||||||
try:
|
try:
|
||||||
@@ -251,7 +263,7 @@ def queue_item_async_safe(update_q, item, silent=False):
|
|||||||
return False
|
return False
|
||||||
|
|
||||||
if not silent:
|
if not silent:
|
||||||
logger.debug(f"Successfully queued item: {item_uuid}")
|
logger.trace(f"Successfully queued item: {item_uuid}")
|
||||||
return True
|
return True
|
||||||
|
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
|
|||||||
Reference in New Issue
Block a user