mirror of
https://github.com/dgtlmoon/changedetection.io.git
synced 2026-04-08 03:58:04 +00:00
Some checks failed
Build and push containers / metadata (push) Has been cancelled
Build and push containers / build-push-containers (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Build distribution 📦 (push) Has been cancelled
ChangeDetection.io App Test / lint-code (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Test the built package works basically. (push) Has been cancelled
Publish Python 🐍distribution 📦 to PyPI and TestPyPI / Publish Python 🐍 distribution 📦 to PyPI (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-10 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-11 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-12 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-13 (push) Has been cancelled
ChangeDetection.io App Test / test-application-3-14 (push) Has been cancelled
CodeQL / Analyze (javascript) (push) Has been cancelled
CodeQL / Analyze (python) (push) Has been cancelled
694 lines
31 KiB
Python
694 lines
31 KiB
Python
#!/usr/bin/env python3
|
||
|
||
# Read more https://github.com/dgtlmoon/changedetection.io/wiki
|
||
# Semver means never use .01, or 00. Should be .1.
|
||
__version__ = '0.54.6'
|
||
|
||
from changedetectionio.strtobool import strtobool
|
||
from json.decoder import JSONDecodeError
|
||
|
||
from loguru import logger
|
||
import getopt
|
||
import logging
|
||
import os
|
||
import platform
|
||
import signal
|
||
import threading
|
||
import time
|
||
|
||
# Eventlet completely removed - using threading mode for SocketIO
|
||
# This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts
|
||
# Note: store and changedetection_app are imported inside main() to avoid
|
||
# initialization before argument parsing (allows --help to work without loading everything)
|
||
|
||
# ==============================================================================
|
||
# Multiprocessing Configuration - CRITICAL for Thread Safety
|
||
# ==============================================================================
|
||
#
|
||
# PROBLEM: Python 3.12+ warns about fork() with multi-threaded processes:
|
||
# "This process is multi-threaded, use of fork() may lead to deadlocks"
|
||
#
|
||
# WHY IT'S DANGEROUS:
|
||
# 1. This Flask app has multiple threads (HTTP handlers, workers, SocketIO)
|
||
# 2. fork() copies ONLY the calling thread to the child process
|
||
# 3. BUT fork() also copies all locks/mutexes in their current state
|
||
# 4. If another thread held a lock during fork() → child has locked lock with no owner
|
||
# 5. Result: PERMANENT DEADLOCK if child tries to acquire that lock
|
||
#
|
||
# SOLUTION: Use 'spawn' instead of 'fork'
|
||
# - spawn starts a fresh Python interpreter (no inherited threads or locks)
|
||
# - Slower (~200ms vs ~1ms) but safe with multi-threaded parent
|
||
# - Consistent across all platforms (Windows already uses spawn by default)
|
||
#
|
||
# IMPLEMENTATION:
|
||
# 1. Explicit contexts everywhere (primary protection):
|
||
# - playwright.py: ctx = multiprocessing.get_context('spawn')
|
||
# - puppeteer.py: ctx = multiprocessing.get_context('spawn')
|
||
# - isolated_opencv.py: ctx = multiprocessing.get_context('spawn')
|
||
# - isolated_libvips.py: ctx = multiprocessing.get_context('spawn')
|
||
#
|
||
# 2. Global default (defense-in-depth, below):
|
||
# - Safety net if future code forgets explicit context
|
||
# - Protects against third-party libraries using Process()
|
||
# - Costs nothing (explicit contexts always override it)
|
||
#
|
||
# WHY BOTH?
|
||
# - Explicit contexts: Clear, self-documenting, always works
|
||
# - Global default: Safety net for forgotten contexts or library code
|
||
# - If someone writes "Process()" instead of "ctx.Process()", still safe!
|
||
#
|
||
# See: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods
|
||
# ==============================================================================
|
||
|
||
import multiprocessing
|
||
import os
|
||
import sys
|
||
|
||
# Limit glibc malloc arena count to prevent RSS growth from concurrent requests.
|
||
# Default: glibc creates up to 8×CPU_cores arenas. Each concurrent thread/connection
|
||
# can trigger a new arena, and freed memory stays mapped in those arenas as RSS forever.
|
||
# With MALLOC_ARENA_MAX=2, at most 2 arenas are used; freed pages return to the OS faster.
|
||
# Must be set before worker threads start; env var is read lazily by glibc on first arena creation.
|
||
if 'MALLOC_ARENA_MAX' not in os.environ:
|
||
os.environ['MALLOC_ARENA_MAX'] = '2'
|
||
try:
|
||
import ctypes as _ctypes
|
||
_ctypes.CDLL('libc.so.6').mallopt(-8, 2) # M_ARENA_MAX = -8
|
||
except Exception:
|
||
pass
|
||
|
||
# Set spawn as global default (safety net - all our code uses explicit contexts anyway)
|
||
# Skip in tests to avoid breaking pytest-flask's LiveServer fixture (uses unpicklable local functions)
|
||
if 'pytest' not in sys.modules:
|
||
try:
|
||
if multiprocessing.get_start_method(allow_none=True) is None:
|
||
multiprocessing.set_start_method('spawn', force=False)
|
||
logger.debug("Set multiprocessing default to 'spawn' for thread safety (explicit contexts used everywhere)")
|
||
except RuntimeError:
|
||
logger.debug(f"Multiprocessing start method already set: {multiprocessing.get_start_method()}")
|
||
|
||
# Only global so we can access it in the signal handler
|
||
app = None
|
||
datastore = None
|
||
|
||
def get_version():
|
||
return __version__
|
||
|
||
# Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown
|
||
def sigshutdown_handler(_signo, _stack_frame):
|
||
name = signal.Signals(_signo).name
|
||
logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated')
|
||
|
||
# Set exit flag immediately to stop all loops
|
||
app.config.exit.set()
|
||
datastore.stop_thread = True
|
||
|
||
# Log memory consumption before shutting down workers (cross-platform)
|
||
try:
|
||
import psutil
|
||
process = psutil.Process()
|
||
mem_info = process.memory_info()
|
||
rss_mb = mem_info.rss / 1024 / 1024
|
||
vms_mb = mem_info.vms / 1024 / 1024
|
||
logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB")
|
||
except Exception as e:
|
||
logger.warning(f"Could not retrieve memory stats: {str(e)}")
|
||
|
||
# Shutdown workers and queues immediately
|
||
try:
|
||
from changedetectionio import worker_pool
|
||
worker_pool.shutdown_workers()
|
||
except Exception as e:
|
||
logger.error(f"Error shutting down workers: {str(e)}")
|
||
|
||
# Close janus queues properly
|
||
try:
|
||
from changedetectionio.flask_app import update_q, notification_q
|
||
update_q.close()
|
||
notification_q.close()
|
||
logger.debug("Queues closed successfully")
|
||
except Exception as e:
|
||
logger.critical(f"CRITICAL: Failed to close queues: {e}")
|
||
|
||
# Shutdown socketio server fast
|
||
from changedetectionio.flask_app import socketio_server
|
||
if socketio_server and hasattr(socketio_server, 'shutdown'):
|
||
try:
|
||
socketio_server.shutdown()
|
||
except Exception as e:
|
||
logger.error(f"Error shutting down Socket.IO server: {str(e)}")
|
||
|
||
# With immediate persistence, all data is already saved
|
||
logger.success('All data already persisted (immediate commits enabled).')
|
||
|
||
sys.exit()
|
||
|
||
def print_help():
|
||
"""Print help text for command line options"""
|
||
print('Usage: changedetection.py [options]')
|
||
print('')
|
||
print('Standard options:')
|
||
print(' -s SSL enable')
|
||
print(' -h HOST Listen host (default: 0.0.0.0)')
|
||
print(' -p PORT Listen port (default: 5000)')
|
||
print(' -d PATH Datastore path')
|
||
print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)')
|
||
print(' -c Cleanup unused snapshots')
|
||
print(' -C Create datastore directory if it doesn\'t exist')
|
||
print(' -P true/false Set all watches paused (true) or active (false)')
|
||
print('')
|
||
print('Add URLs on startup:')
|
||
print(' -u URL Add URL to watch (can be used multiple times)')
|
||
print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')')
|
||
print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)')
|
||
print(' -u2 \'JSON\' Set options for third -u URL, etc.')
|
||
print(' Available options: processor, fetch_backend, headers, method, etc.')
|
||
print(' See model/Watch.py for all available options')
|
||
print('')
|
||
print('Recheck on startup:')
|
||
print(' -r all Queue all watches for recheck on startup')
|
||
print(' -r UUID,... Queue specific watches (comma-separated UUIDs)')
|
||
print(' -r all N Queue all watches, wait for completion, repeat N times')
|
||
print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times')
|
||
print('')
|
||
print('Batch mode:')
|
||
print(' -b Run in batch mode (process queue then exit)')
|
||
print(' Useful for CI/CD, cron jobs, or one-time checks')
|
||
print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use')
|
||
print(' Use -p PORT to specify a different port if needed')
|
||
print('')
|
||
|
||
def main():
|
||
global datastore
|
||
global app
|
||
|
||
# Early help/version check before any initialization
|
||
if '--help' in sys.argv or '-help' in sys.argv:
|
||
print_help()
|
||
sys.exit(0)
|
||
|
||
if '--version' in sys.argv or '-v' in sys.argv:
|
||
print(f'changedetection.io {__version__}')
|
||
sys.exit(0)
|
||
|
||
# Import heavy modules after help/version checks to keep startup fast for those flags
|
||
from changedetectionio import store
|
||
from changedetectionio.flask_app import changedetection_app
|
||
|
||
datastore_path = None
|
||
# Set a default logger level
|
||
logger_level = 'DEBUG'
|
||
include_default_watches = True
|
||
all_paused = None # None means don't change, True/False to set
|
||
|
||
host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip()
|
||
port = int(os.environ.get('PORT', 5000))
|
||
ssl_mode = False
|
||
|
||
# Lists for multiple URLs and their options
|
||
urls_to_add = []
|
||
url_options = {} # Key: index (0-based), Value: dict of options
|
||
recheck_watches = None # None, 'all', or list of UUIDs
|
||
recheck_repeat_count = 1 # Number of times to repeat recheck cycle
|
||
batch_mode = False # Run once then exit when queue is empty
|
||
|
||
# On Windows, create and use a default path.
|
||
if os.name == 'nt':
|
||
datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io')
|
||
os.makedirs(datastore_path, exist_ok=True)
|
||
else:
|
||
# Must be absolute so that send_from_directory doesnt try to make it relative to backend/
|
||
datastore_path = os.path.join(os.getcwd(), "../datastore")
|
||
|
||
# Pre-process arguments to extract -u, -u<N>, and -r options before getopt
|
||
# This allows unlimited -u0, -u1, -u2, ... options without predefining them
|
||
cleaned_argv = ['changedetection.py'] # Start with program name
|
||
i = 1
|
||
while i < len(sys.argv):
|
||
arg = sys.argv[i]
|
||
|
||
# Handle -u (add URL)
|
||
if arg == '-u' and i + 1 < len(sys.argv):
|
||
urls_to_add.append(sys.argv[i + 1])
|
||
i += 2
|
||
continue
|
||
|
||
# Handle -u<N> (set options for URL at index N)
|
||
if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit():
|
||
idx = int(arg[2:])
|
||
if i + 1 < len(sys.argv):
|
||
try:
|
||
import json
|
||
url_options[idx] = json.loads(sys.argv[i + 1])
|
||
except json.JSONDecodeError as e:
|
||
print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}')
|
||
print(f'JSON decode error: {e}')
|
||
sys.exit(2)
|
||
i += 2
|
||
continue
|
||
|
||
# Handle -r (recheck watches)
|
||
if arg == '-r' and i + 1 < len(sys.argv):
|
||
recheck_arg = sys.argv[i + 1]
|
||
if recheck_arg.lower() == 'all':
|
||
recheck_watches = 'all'
|
||
else:
|
||
# Parse comma-separated list of UUIDs
|
||
recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()]
|
||
|
||
# Check for optional repeat count as third argument
|
||
if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit():
|
||
recheck_repeat_count = int(sys.argv[i + 2])
|
||
if recheck_repeat_count < 1:
|
||
print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}')
|
||
sys.exit(2)
|
||
i += 3
|
||
else:
|
||
i += 2
|
||
continue
|
||
|
||
# Handle -b (batch mode - run once and exit)
|
||
if arg == '-b':
|
||
batch_mode = True
|
||
i += 1
|
||
continue
|
||
|
||
# Keep other arguments for getopt
|
||
cleaned_argv.append(arg)
|
||
i += 1
|
||
|
||
try:
|
||
opts, args = getopt.getopt(cleaned_argv[1:], "6Csd:h:p:l:P:", "port")
|
||
except getopt.GetoptError as e:
|
||
print_help()
|
||
print(f'Error: {e}')
|
||
sys.exit(2)
|
||
|
||
create_datastore_dir = False
|
||
|
||
# Set a logger level via shell env variable
|
||
# Used: Dockerfile for CICD
|
||
# To set logger level for pytest, see the app function in tests/conftest.py
|
||
if os.getenv("LOGGER_LEVEL"):
|
||
level = os.getenv("LOGGER_LEVEL")
|
||
logger_level = int(level) if level.isdigit() else level.upper()
|
||
|
||
for opt, arg in opts:
|
||
if opt == '-s':
|
||
ssl_mode = True
|
||
|
||
if opt == '-h':
|
||
host = arg
|
||
|
||
if opt == '-p':
|
||
port = int(arg)
|
||
|
||
if opt == '-d':
|
||
datastore_path = arg
|
||
|
||
# Create the datadir if it doesnt exist
|
||
if opt == '-C':
|
||
create_datastore_dir = True
|
||
|
||
if opt == '-l':
|
||
logger_level = int(arg) if arg.isdigit() else arg.upper()
|
||
|
||
if opt == '-P':
|
||
try:
|
||
all_paused = bool(strtobool(arg))
|
||
except ValueError:
|
||
print(f'Error: Invalid value for -P option: {arg}')
|
||
print('Expected: true, false, yes, no, 1, or 0')
|
||
sys.exit(2)
|
||
|
||
# If URLs are provided, don't include default watches
|
||
if urls_to_add:
|
||
include_default_watches = False
|
||
|
||
|
||
logger.success(f"changedetection.io version {get_version()} starting.")
|
||
# Launch using SocketIO run method for proper integration (if enabled)
|
||
ssl_cert_file = os.getenv("SSL_CERT_FILE", 'cert.pem')
|
||
ssl_privkey_file = os.getenv("SSL_PRIVKEY_FILE", 'privkey.pem')
|
||
if os.getenv("SSL_CERT_FILE") and os.getenv("SSL_PRIVKEY_FILE"):
|
||
ssl_mode = True
|
||
|
||
# SSL mode could have been set by -s too, therefor fallback to default values
|
||
if ssl_mode:
|
||
if not os.path.isfile(ssl_cert_file) or not os.path.isfile(ssl_privkey_file):
|
||
logger.critical(f"Cannot start SSL/HTTPS mode, Please be sure that {ssl_cert_file}' and '{ssl_privkey_file}' exist in in {os.getcwd()}")
|
||
os._exit(2)
|
||
|
||
# Without this, a logger will be duplicated
|
||
logger.remove()
|
||
try:
|
||
log_level_for_stdout = { 'TRACE', 'DEBUG', 'INFO', 'SUCCESS' }
|
||
logger.configure(handlers=[
|
||
{"sink": sys.stdout, "level": logger_level,
|
||
"filter" : lambda record: record['level'].name in log_level_for_stdout},
|
||
{"sink": sys.stderr, "level": logger_level,
|
||
"filter": lambda record: record['level'].name not in log_level_for_stdout},
|
||
])
|
||
# Catch negative number or wrong log level name
|
||
except ValueError:
|
||
print("Available log level names: TRACE, DEBUG(default), INFO, SUCCESS,"
|
||
" WARNING, ERROR, CRITICAL")
|
||
sys.exit(2)
|
||
|
||
# Disable verbose pyppeteer logging to prevent memory leaks from large CDP messages
|
||
# Set both parent and child loggers since pyppeteer hardcodes DEBUG level
|
||
logging.getLogger('pyppeteer.connection').setLevel(logging.WARNING)
|
||
logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING)
|
||
|
||
# isnt there some @thingy to attach to each route to tell it, that this route needs a datastore
|
||
app_config = {
|
||
'datastore_path': datastore_path,
|
||
'batch_mode': batch_mode,
|
||
'recheck_watches': recheck_watches,
|
||
'recheck_repeat_count': recheck_repeat_count
|
||
}
|
||
|
||
if not os.path.isdir(app_config['datastore_path']):
|
||
if create_datastore_dir:
|
||
os.makedirs(app_config['datastore_path'], exist_ok=True)
|
||
else:
|
||
logger.critical(
|
||
f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'"
|
||
f" does not exist, cannot start, please make sure the"
|
||
f" directory exists or specify a directory with the -d option.\n"
|
||
f"Or use the -C parameter to create the directory.")
|
||
sys.exit(2)
|
||
|
||
try:
|
||
datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__, include_default_watches=include_default_watches)
|
||
except JSONDecodeError as e:
|
||
# Dont' start if the JSON DB looks corrupt
|
||
logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.")
|
||
logger.critical(str(e))
|
||
sys.exit(1)
|
||
|
||
# Testing mode: Exit cleanly after datastore initialization (for CI/CD upgrade tests)
|
||
if os.environ.get('TESTING_SHUTDOWN_AFTER_DATASTORE_LOAD'):
|
||
logger.success(f"TESTING MODE: Datastore loaded successfully from {app_config['datastore_path']}")
|
||
logger.success(f"TESTING MODE: Schema version: {datastore.data['settings']['application'].get('schema_version', 'unknown')}")
|
||
logger.success(f"TESTING MODE: Loaded {len(datastore.data['watching'])} watches")
|
||
logger.success("TESTING MODE: Exiting cleanly (TESTING_SHUTDOWN_AFTER_DATASTORE_LOAD is set)")
|
||
sys.exit(0)
|
||
|
||
# Apply all_paused setting if specified via CLI
|
||
if all_paused is not None:
|
||
datastore.data['settings']['application']['all_paused'] = all_paused
|
||
logger.info(f"Setting all watches paused: {all_paused}")
|
||
|
||
# Inject datastore into plugins that need access to settings
|
||
from changedetectionio.pluggy_interface import inject_datastore_into_plugins
|
||
inject_datastore_into_plugins(datastore)
|
||
|
||
# Step 1: Add URLs with their options (if provided via -u flags)
|
||
added_watch_uuids = []
|
||
if urls_to_add:
|
||
logger.info(f"Adding {len(urls_to_add)} URL(s) from command line")
|
||
for idx, url in enumerate(urls_to_add):
|
||
extras = url_options.get(idx, {})
|
||
if extras:
|
||
logger.debug(f"Adding watch {idx}: {url} with options: {extras}")
|
||
else:
|
||
logger.debug(f"Adding watch {idx}: {url}")
|
||
|
||
new_uuid = datastore.add_watch(url=url, extras=extras)
|
||
if new_uuid:
|
||
added_watch_uuids.append(new_uuid)
|
||
logger.success(f"Added watch: {url} (UUID: {new_uuid})")
|
||
else:
|
||
logger.error(f"Failed to add watch: {url}")
|
||
|
||
app = changedetection_app(app_config, datastore)
|
||
|
||
# Step 2: Queue newly added watches (if -u was provided in batch mode)
|
||
# This must happen AFTER app initialization so update_q is available
|
||
if batch_mode and added_watch_uuids:
|
||
from changedetectionio.flask_app import update_q
|
||
from changedetectionio import queuedWatchMetaData, worker_pool
|
||
|
||
logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches")
|
||
for watch_uuid in added_watch_uuids:
|
||
try:
|
||
worker_pool.queue_item_async_safe(
|
||
update_q,
|
||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||
)
|
||
logger.debug(f"Queued newly added watch: {watch_uuid}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||
|
||
# Step 3: Queue watches for recheck (if -r was provided)
|
||
# This must happen AFTER app initialization so update_q is available
|
||
if recheck_watches is not None:
|
||
from changedetectionio.flask_app import update_q
|
||
from changedetectionio import queuedWatchMetaData, worker_pool
|
||
|
||
watches_to_queue = []
|
||
if recheck_watches == 'all':
|
||
# Queue all watches, excluding those already queued in batch mode
|
||
all_watches = list(datastore.data['watching'].keys())
|
||
if batch_mode and added_watch_uuids:
|
||
# Exclude newly added watches that were already queued in batch mode
|
||
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||
logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)")
|
||
else:
|
||
watches_to_queue = all_watches
|
||
logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck")
|
||
else:
|
||
# Queue specific UUIDs
|
||
watches_to_queue = recheck_watches
|
||
logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck")
|
||
|
||
queued_count = 0
|
||
for watch_uuid in watches_to_queue:
|
||
if watch_uuid in datastore.data['watching']:
|
||
try:
|
||
worker_pool.queue_item_async_safe(
|
||
update_q,
|
||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||
)
|
||
queued_count += 1
|
||
logger.debug(f"Queued watch for recheck: {watch_uuid}")
|
||
except Exception as e:
|
||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||
else:
|
||
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||
|
||
logger.success(f"Successfully queued {queued_count} watches for recheck")
|
||
|
||
# Step 4: Setup batch mode monitor (if -b was provided)
|
||
if batch_mode:
|
||
from changedetectionio.flask_app import update_q
|
||
|
||
# Safety check: Ensure Flask app is not already running on this port
|
||
# Batch mode should never run alongside the web server
|
||
import socket
|
||
test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
|
||
try:
|
||
# Try to bind to the configured host:port (no SO_REUSEADDR - strict check)
|
||
test_socket.bind((host, port))
|
||
test_socket.close()
|
||
logger.debug(f"Batch mode: Port {port} is available (Flask app not running)")
|
||
except OSError as e:
|
||
test_socket.close()
|
||
# errno 98 = EADDRINUSE (Linux)
|
||
# errno 48 = EADDRINUSE (macOS)
|
||
# errno 10048 = WSAEADDRINUSE (Windows)
|
||
if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower():
|
||
logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use")
|
||
logger.critical(f"The Flask web server appears to be running on {host}:{port}")
|
||
logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)")
|
||
logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT")
|
||
sys.exit(1)
|
||
else:
|
||
# Some other socket error - log but continue (might be network configuration issue)
|
||
logger.warning(f"Port availability check failed with unexpected error: {e}")
|
||
logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts")
|
||
|
||
def queue_watches_for_recheck(datastore, iteration):
|
||
"""Helper function to queue watches for recheck"""
|
||
watches_to_queue = []
|
||
if recheck_watches == 'all':
|
||
all_watches = list(datastore.data['watching'].keys())
|
||
if batch_mode and added_watch_uuids and iteration == 1:
|
||
# Only exclude newly added watches on first iteration
|
||
watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids]
|
||
else:
|
||
watches_to_queue = all_watches
|
||
logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches")
|
||
elif recheck_watches:
|
||
watches_to_queue = recheck_watches
|
||
logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches")
|
||
|
||
queued_count = 0
|
||
for watch_uuid in watches_to_queue:
|
||
if watch_uuid in datastore.data['watching']:
|
||
try:
|
||
worker_pool.queue_item_async_safe(
|
||
update_q,
|
||
queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid})
|
||
)
|
||
queued_count += 1
|
||
except Exception as e:
|
||
logger.error(f"Failed to queue watch {watch_uuid}: {e}")
|
||
else:
|
||
logger.warning(f"Watch UUID not found in datastore: {watch_uuid}")
|
||
logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches")
|
||
return queued_count
|
||
|
||
def batch_mode_monitor():
|
||
"""Monitor queue and workers, shutdown or repeat when work is complete"""
|
||
import time
|
||
|
||
# Track iterations if repeat mode is enabled
|
||
current_iteration = 1
|
||
total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1
|
||
|
||
if total_iterations > 1:
|
||
logger.info(f"Batch mode: Will repeat recheck {total_iterations} times")
|
||
else:
|
||
logger.info("Batch mode: Waiting for all queued items to complete...")
|
||
|
||
# Wait a bit for workers to start processing
|
||
time.sleep(3)
|
||
|
||
try:
|
||
while current_iteration <= total_iterations:
|
||
logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...")
|
||
|
||
# Use the shared wait_for_all_checks function
|
||
completed = worker_pool.wait_for_all_checks(update_q, timeout=300)
|
||
|
||
if not completed:
|
||
logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds")
|
||
|
||
logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed")
|
||
|
||
# Check if we need to repeat
|
||
if current_iteration < total_iterations:
|
||
logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...")
|
||
current_iteration += 1
|
||
|
||
# Re-queue watches for next iteration
|
||
queue_watches_for_recheck(datastore, current_iteration)
|
||
|
||
# Brief pause before continuing
|
||
time.sleep(2)
|
||
else:
|
||
# All iterations complete
|
||
logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown")
|
||
# Trigger shutdown
|
||
import os, signal
|
||
os.kill(os.getpid(), signal.SIGTERM)
|
||
return
|
||
|
||
except Exception as e:
|
||
logger.error(f"Batch mode monitor error: {e}")
|
||
logger.error(f"Initiating emergency shutdown")
|
||
import os, signal
|
||
os.kill(os.getpid(), signal.SIGTERM)
|
||
|
||
# Start monitor in background thread
|
||
monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor")
|
||
monitor_thread.start()
|
||
logger.info("Batch mode enabled: Will exit after all queued items are processed")
|
||
|
||
# Get the SocketIO instance from the Flask app (created in flask_app.py)
|
||
from changedetectionio.flask_app import socketio_server
|
||
global socketio
|
||
socketio = socketio_server
|
||
|
||
signal.signal(signal.SIGTERM, sigshutdown_handler)
|
||
signal.signal(signal.SIGINT, sigshutdown_handler)
|
||
|
||
# Custom signal handler for memory cleanup
|
||
def sigusr_clean_handler(_signo, _stack_frame):
|
||
from changedetectionio.gc_cleanup import memory_cleanup
|
||
logger.info('SIGUSR1 received: Running memory cleanup')
|
||
return memory_cleanup(app)
|
||
|
||
# Register the SIGUSR1 signal handler
|
||
# Only register the signal handler if running on Linux
|
||
if platform.system() == "Linux":
|
||
signal.signal(signal.SIGUSR1, sigusr_clean_handler)
|
||
else:
|
||
logger.info("SIGUSR1 handler only registered on Linux, skipped.")
|
||
|
||
app.config['datastore_path'] = datastore_path
|
||
|
||
|
||
@app.context_processor
|
||
def inject_template_globals():
|
||
return dict(right_sticky="v"+__version__,
|
||
new_version_available=app.config['NEW_VERSION_AVAILABLE'],
|
||
has_password=datastore.data['settings']['application']['password'] != False,
|
||
socket_io_enabled=datastore.data['settings']['application'].get('ui', {}).get('socket_io_enabled', True),
|
||
all_paused=datastore.data['settings']['application'].get('all_paused', False),
|
||
all_muted=datastore.data['settings']['application'].get('all_muted', False)
|
||
)
|
||
|
||
# Monitored websites will not receive a Referer header when a user clicks on an outgoing link.
|
||
@app.after_request
|
||
def hide_referrer(response):
|
||
if strtobool(os.getenv("HIDE_REFERER", 'false')):
|
||
response.headers["Referrer-Policy"] = "same-origin"
|
||
|
||
return response
|
||
|
||
# Proxy sub-directory support
|
||
# Set environment var USE_X_SETTINGS=1 on this script
|
||
# And then in your proxy_pass settings
|
||
#
|
||
# proxy_set_header Host "localhost";
|
||
# proxy_set_header X-Forwarded-Prefix /app;
|
||
|
||
|
||
if os.getenv('USE_X_SETTINGS'):
|
||
logger.info("USE_X_SETTINGS is ENABLED")
|
||
from werkzeug.middleware.proxy_fix import ProxyFix
|
||
app.wsgi_app = ProxyFix(
|
||
app.wsgi_app,
|
||
x_for=1, # X-Forwarded-For (client IP)
|
||
x_proto=1, # X-Forwarded-Proto (http/https)
|
||
x_host=1, # X-Forwarded-Host (original host)
|
||
x_port=1, # X-Forwarded-Port (original port)
|
||
x_prefix=1 # X-Forwarded-Prefix (URL prefix)
|
||
)
|
||
|
||
|
||
# In batch mode, skip starting the HTTP server - just keep workers running
|
||
if batch_mode:
|
||
logger.info("Batch mode: Skipping HTTP server startup, workers will process queue")
|
||
logger.info("Batch mode: Main thread will wait for shutdown signal")
|
||
# Keep main thread alive until batch monitor triggers shutdown
|
||
try:
|
||
while True:
|
||
time.sleep(1)
|
||
except KeyboardInterrupt:
|
||
logger.info("Batch mode: Keyboard interrupt received")
|
||
pass
|
||
else:
|
||
# Normal mode: Start HTTP server
|
||
# SocketIO instance is already initialized in flask_app.py
|
||
if socketio_server:
|
||
if ssl_mode:
|
||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||
socketio.run(app, host=host, port=int(port), debug=False,
|
||
ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True)
|
||
else:
|
||
socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True)
|
||
else:
|
||
# Run Flask app without Socket.IO if disabled
|
||
logger.info("Starting Flask app without Socket.IO server")
|
||
if ssl_mode:
|
||
logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}")
|
||
app.run(host=host, port=int(port), debug=False,
|
||
ssl_context=(ssl_cert_file, ssl_privkey_file))
|
||
else:
|
||
app.run(host=host, port=int(port), debug=False)
|