Files
changedetection.io/changedetectionio/worker_handler.py

427 lines
14 KiB
Python

"""
Worker management module for changedetection.io
Handles asynchronous workers for dynamic worker scaling.
Each worker runs in its own thread with its own event loop for isolation.
"""
import asyncio
import os
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from loguru import logger
# Global worker state - each worker has its own thread and event loop
worker_threads = [] # List of WorkerThread objects
# Track currently processing UUIDs for async workers - maps {uuid: worker_id}
currently_processing_uuids = {}
# Configuration - async workers only
USE_ASYNC_WORKERS = True
# Custom ThreadPoolExecutor for queue operations with named threads
# Scale executor threads with FETCH_WORKERS to avoid bottleneck at high concurrency
_max_executor_workers = max(50, int(os.getenv("FETCH_WORKERS", "10")))
queue_executor = ThreadPoolExecutor(
max_workers=_max_executor_workers,
thread_name_prefix="QueueGetter-"
)
class WorkerThread:
"""Container for a worker thread with its own event loop"""
def __init__(self, worker_id, update_q, notification_q, app, datastore):
self.worker_id = worker_id
self.update_q = update_q
self.notification_q = notification_q
self.app = app
self.datastore = datastore
self.thread = None
self.loop = None
self.running = False
def run(self):
"""Run the worker in its own event loop"""
try:
# Create a new event loop for this thread
self.loop = asyncio.new_event_loop()
asyncio.set_event_loop(self.loop)
self.running = True
# Run the worker coroutine
self.loop.run_until_complete(
start_single_async_worker(
self.worker_id,
self.update_q,
self.notification_q,
self.app,
self.datastore,
queue_executor
)
)
except asyncio.CancelledError:
# Normal shutdown - worker was cancelled
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info(f"Worker {self.worker_id} shutting down gracefully")
except RuntimeError as e:
# Ignore expected shutdown errors
if "Event loop stopped" not in str(e) and "Event loop is closed" not in str(e):
logger.error(f"Worker {self.worker_id} runtime error: {e}")
except Exception as e:
logger.error(f"Worker {self.worker_id} thread error: {e}")
finally:
# Clean up
if self.loop and not self.loop.is_closed():
self.loop.close()
self.running = False
self.loop = None
def start(self):
"""Start the worker thread"""
self.thread = threading.Thread(
target=self.run,
daemon=True,
name=f"PageFetchAsyncUpdateWorker-{self.worker_id}"
)
self.thread.start()
def stop(self):
"""Stop the worker thread"""
if self.loop and self.running:
try:
# Signal the loop to stop
self.loop.call_soon_threadsafe(self.loop.stop)
except RuntimeError:
pass
if self.thread and self.thread.is_alive():
self.thread.join(timeout=2.0)
def start_async_workers(n_workers, update_q, notification_q, app, datastore):
"""Start async workers, each with its own thread and event loop for isolation"""
global worker_threads, currently_processing_uuids
# Clear any stale state
currently_processing_uuids.clear()
# Start each worker in its own thread with its own event loop
logger.info(f"Starting {n_workers} async workers (isolated threads)")
for i in range(n_workers):
try:
worker = WorkerThread(i, update_q, notification_q, app, datastore)
worker.start()
worker_threads.append(worker)
# No sleep needed - threads start independently and asynchronously
except Exception as e:
logger.error(f"Failed to start async worker {i}: {e}")
continue
async def start_single_async_worker(worker_id, update_q, notification_q, app, datastore, executor=None):
"""Start a single async worker with auto-restart capability"""
from changedetectionio.async_update_worker import async_update_worker
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
while not app.config.exit.is_set():
try:
await async_update_worker(worker_id, update_q, notification_q, app, datastore, executor)
# If we reach here, worker exited cleanly
if not in_pytest:
logger.info(f"Async worker {worker_id} exited cleanly")
break
except asyncio.CancelledError:
# Task was cancelled (normal shutdown)
if not in_pytest:
logger.info(f"Async worker {worker_id} cancelled")
break
except Exception as e:
logger.error(f"Async worker {worker_id} crashed: {e}")
if not in_pytest:
logger.info(f"Restarting async worker {worker_id} in 5 seconds...")
await asyncio.sleep(5)
if not in_pytest:
logger.info(f"Async worker {worker_id} shutdown complete")
def start_workers(n_workers, update_q, notification_q, app, datastore):
"""Start async workers - sync workers are deprecated"""
start_async_workers(n_workers, update_q, notification_q, app, datastore)
def add_worker(update_q, notification_q, app, datastore):
"""Add a new async worker (for dynamic scaling)"""
global worker_threads
worker_id = len(worker_threads)
logger.info(f"Adding async worker {worker_id}")
try:
worker = WorkerThread(worker_id, update_q, notification_q, app, datastore)
worker.start()
worker_threads.append(worker)
return True
except Exception as e:
logger.error(f"Failed to add worker {worker_id}: {e}")
return False
def remove_worker():
"""Remove an async worker (for dynamic scaling)"""
global worker_threads
if not worker_threads:
return False
# Stop the last worker
worker = worker_threads.pop()
worker.stop()
logger.info(f"Removed async worker, {len(worker_threads)} workers remaining")
return True
def get_worker_count():
"""Get current number of async workers"""
return len(worker_threads)
def get_running_uuids():
"""Get list of UUIDs currently being processed by async workers"""
return list(currently_processing_uuids.keys())
def set_uuid_processing(uuid, worker_id=None, processing=True):
"""Mark a UUID as being processed or completed by a specific worker"""
global currently_processing_uuids
if processing:
currently_processing_uuids[uuid] = worker_id
logger.debug(f"Worker {worker_id} started processing UUID: {uuid}")
else:
currently_processing_uuids.pop(uuid, None)
logger.debug(f"Worker {worker_id} finished processing UUID: {uuid}")
def is_watch_running(watch_uuid):
"""Check if a specific watch is currently being processed by any worker"""
return watch_uuid in currently_processing_uuids
def is_watch_running_by_another_worker(watch_uuid, current_worker_id):
"""Check if a specific watch is currently being processed by a different worker"""
if watch_uuid not in currently_processing_uuids:
return False
processing_worker_id = currently_processing_uuids[watch_uuid]
return processing_worker_id != current_worker_id
def queue_item_async_safe(update_q, item, silent=False):
"""Bulletproof queue operation with comprehensive error handling"""
item_uuid = 'unknown'
try:
# Safely extract UUID for logging
if hasattr(item, 'item') and isinstance(item.item, dict):
item_uuid = item.item.get('uuid', 'unknown')
except Exception as uuid_e:
logger.critical(f"CRITICAL: Failed to extract UUID from queue item: {uuid_e}")
# Validate inputs
if not update_q:
logger.critical(f"CRITICAL: Queue is None/invalid for item {item_uuid}")
return False
if not item:
logger.critical(f"CRITICAL: Item is None/invalid")
return False
# Attempt queue operation with multiple fallbacks
try:
# Primary: Use sync interface (thread-safe)
success = update_q.put(item, block=True, timeout=5.0)
if success is False: # Explicit False return means failure
logger.critical(f"CRITICAL: Queue.put() returned False for item {item_uuid}")
return False
if not silent:
logger.debug(f"Successfully queued item: {item_uuid}")
return True
except Exception as e:
logger.critical(f"CRITICAL: Exception during queue operation for item {item_uuid}: {type(e).__name__}: {e}")
# Secondary: Attempt queue health check
try:
queue_size = update_q.qsize()
is_empty = update_q.empty()
logger.critical(f"CRITICAL: Queue health - size: {queue_size}, empty: {is_empty}")
except Exception as health_e:
logger.critical(f"CRITICAL: Queue health check failed: {health_e}")
# Log queue type for debugging
try:
logger.critical(f"CRITICAL: Queue type: {type(update_q)}, has sync_q: {hasattr(update_q, 'sync_q')}")
except Exception:
logger.critical(f"CRITICAL: Cannot determine queue type")
return False
def shutdown_workers():
"""Shutdown all async workers fast and aggressively"""
global worker_threads
# Check if we're in pytest environment - if so, be more gentle with logging
import os
in_pytest = "pytest" in os.sys.modules or "PYTEST_CURRENT_TEST" in os.environ
if not in_pytest:
logger.info("Fast shutdown of async workers initiated...")
# Stop all worker threads
for worker in worker_threads:
worker.stop()
worker_threads.clear()
if not in_pytest:
logger.info("Async workers fast shutdown complete")
def adjust_async_worker_count(new_count, update_q=None, notification_q=None, app=None, datastore=None):
"""
Dynamically adjust the number of async workers.
Args:
new_count: Target number of workers
update_q, notification_q, app, datastore: Required for adding new workers
Returns:
dict: Status of the adjustment operation
"""
global worker_threads
current_count = get_worker_count()
if new_count == current_count:
return {
'status': 'no_change',
'message': f'Worker count already at {current_count}',
'current_count': current_count
}
if new_count > current_count:
# Add workers
workers_to_add = new_count - current_count
logger.info(f"Adding {workers_to_add} async workers (from {current_count} to {new_count})")
if not all([update_q, notification_q, app, datastore]):
return {
'status': 'error',
'message': 'Missing required parameters to add workers',
'current_count': current_count
}
for i in range(workers_to_add):
add_worker(update_q, notification_q, app, datastore)
return {
'status': 'success',
'message': f'Added {workers_to_add} workers',
'previous_count': current_count,
'current_count': len(worker_threads)
}
else:
# Remove workers
workers_to_remove = current_count - new_count
logger.info(f"Removing {workers_to_remove} async workers (from {current_count} to {new_count})")
removed_count = 0
for _ in range(workers_to_remove):
if remove_worker():
removed_count += 1
return {
'status': 'success',
'message': f'Removed {removed_count} workers',
'previous_count': current_count,
'current_count': current_count - removed_count
}
def get_worker_status():
"""Get status information about async workers"""
return {
'worker_type': 'async',
'worker_count': get_worker_count(),
'running_uuids': get_running_uuids(),
'active_threads': sum(1 for w in worker_threads if w.thread and w.thread.is_alive()),
}
def check_worker_health(expected_count, update_q=None, notification_q=None, app=None, datastore=None):
"""
Check if the expected number of async workers are running and restart any missing ones.
Args:
expected_count: Expected number of workers
update_q, notification_q, app, datastore: Required for restarting workers
Returns:
dict: Health check results
"""
global worker_threads
current_count = get_worker_count()
# Check which workers are actually alive
alive_count = sum(1 for w in worker_threads if w.thread and w.thread.is_alive())
if alive_count == expected_count:
return {
'status': 'healthy',
'expected_count': expected_count,
'actual_count': alive_count,
'message': f'All {expected_count} async workers running'
}
# Find dead workers
dead_workers = []
for i, worker in enumerate(worker_threads[:]):
if not worker.thread or not worker.thread.is_alive():
dead_workers.append(i)
logger.warning(f"Async worker {worker.worker_id} thread is dead")
# Remove dead workers from tracking
for i in reversed(dead_workers):
if i < len(worker_threads):
worker_threads.pop(i)
missing_workers = expected_count - alive_count
restarted_count = 0
if missing_workers > 0 and all([update_q, notification_q, app, datastore]):
logger.info(f"Restarting {missing_workers} crashed async workers")
for i in range(missing_workers):
if add_worker(update_q, notification_q, app, datastore):
restarted_count += 1
return {
'status': 'repaired' if restarted_count > 0 else 'degraded',
'expected_count': expected_count,
'actual_count': alive_count,
'dead_workers': len(dead_workers),
'restarted_workers': restarted_count,
'message': f'Found {len(dead_workers)} dead workers, restarted {restarted_count}'
}