#!/usr/bin/env python3 # Read more https://github.com/dgtlmoon/changedetection.io/wiki # Semver means never use .01, or 00. Should be .1. __version__ = '0.54.6' from changedetectionio.strtobool import strtobool from json.decoder import JSONDecodeError from loguru import logger import getopt import logging import os import platform import signal import threading import time # Eventlet completely removed - using threading mode for SocketIO # This provides better Python 3.12+ compatibility and eliminates eventlet/asyncio conflicts # Note: store and changedetection_app are imported inside main() to avoid # initialization before argument parsing (allows --help to work without loading everything) # ============================================================================== # Multiprocessing Configuration - CRITICAL for Thread Safety # ============================================================================== # # PROBLEM: Python 3.12+ warns about fork() with multi-threaded processes: # "This process is multi-threaded, use of fork() may lead to deadlocks" # # WHY IT'S DANGEROUS: # 1. This Flask app has multiple threads (HTTP handlers, workers, SocketIO) # 2. fork() copies ONLY the calling thread to the child process # 3. BUT fork() also copies all locks/mutexes in their current state # 4. If another thread held a lock during fork() → child has locked lock with no owner # 5. Result: PERMANENT DEADLOCK if child tries to acquire that lock # # SOLUTION: Use 'spawn' instead of 'fork' # - spawn starts a fresh Python interpreter (no inherited threads or locks) # - Slower (~200ms vs ~1ms) but safe with multi-threaded parent # - Consistent across all platforms (Windows already uses spawn by default) # # IMPLEMENTATION: # 1. Explicit contexts everywhere (primary protection): # - playwright.py: ctx = multiprocessing.get_context('spawn') # - puppeteer.py: ctx = multiprocessing.get_context('spawn') # - isolated_opencv.py: ctx = multiprocessing.get_context('spawn') # - isolated_libvips.py: ctx = multiprocessing.get_context('spawn') # # 2. Global default (defense-in-depth, below): # - Safety net if future code forgets explicit context # - Protects against third-party libraries using Process() # - Costs nothing (explicit contexts always override it) # # WHY BOTH? # - Explicit contexts: Clear, self-documenting, always works # - Global default: Safety net for forgotten contexts or library code # - If someone writes "Process()" instead of "ctx.Process()", still safe! # # See: https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods # ============================================================================== import multiprocessing import os import sys # Limit glibc malloc arena count to prevent RSS growth from concurrent requests. # Default: glibc creates up to 8×CPU_cores arenas. Each concurrent thread/connection # can trigger a new arena, and freed memory stays mapped in those arenas as RSS forever. # With MALLOC_ARENA_MAX=2, at most 2 arenas are used; freed pages return to the OS faster. # Must be set before worker threads start; env var is read lazily by glibc on first arena creation. if 'MALLOC_ARENA_MAX' not in os.environ: os.environ['MALLOC_ARENA_MAX'] = '2' try: import ctypes as _ctypes _ctypes.CDLL('libc.so.6').mallopt(-8, 2) # M_ARENA_MAX = -8 except Exception: pass # Set spawn as global default (safety net - all our code uses explicit contexts anyway) # Skip in tests to avoid breaking pytest-flask's LiveServer fixture (uses unpicklable local functions) if 'pytest' not in sys.modules: try: if multiprocessing.get_start_method(allow_none=True) is None: multiprocessing.set_start_method('spawn', force=False) logger.debug("Set multiprocessing default to 'spawn' for thread safety (explicit contexts used everywhere)") except RuntimeError: logger.debug(f"Multiprocessing start method already set: {multiprocessing.get_start_method()}") # Only global so we can access it in the signal handler app = None datastore = None def get_version(): return __version__ # Parent wrapper or OS sends us a SIGTERM/SIGINT, do everything required for a clean shutdown def sigshutdown_handler(_signo, _stack_frame): name = signal.Signals(_signo).name logger.critical(f'Shutdown: Got Signal - {name} ({_signo}), Fast shutdown initiated') # Set exit flag immediately to stop all loops app.config.exit.set() datastore.stop_thread = True # Log memory consumption before shutting down workers (cross-platform) try: import psutil process = psutil.Process() mem_info = process.memory_info() rss_mb = mem_info.rss / 1024 / 1024 vms_mb = mem_info.vms / 1024 / 1024 logger.info(f"Memory consumption before worker shutdown: RSS={rss_mb:,.2f} MB, VMS={vms_mb:,.2f} MB") except Exception as e: logger.warning(f"Could not retrieve memory stats: {str(e)}") # Shutdown workers and queues immediately try: from changedetectionio import worker_pool worker_pool.shutdown_workers() except Exception as e: logger.error(f"Error shutting down workers: {str(e)}") # Close janus queues properly try: from changedetectionio.flask_app import update_q, notification_q update_q.close() notification_q.close() logger.debug("Queues closed successfully") except Exception as e: logger.critical(f"CRITICAL: Failed to close queues: {e}") # Shutdown socketio server fast from changedetectionio.flask_app import socketio_server if socketio_server and hasattr(socketio_server, 'shutdown'): try: socketio_server.shutdown() except Exception as e: logger.error(f"Error shutting down Socket.IO server: {str(e)}") # With immediate persistence, all data is already saved logger.success('All data already persisted (immediate commits enabled).') sys.exit() def print_help(): """Print help text for command line options""" print('Usage: changedetection.py [options]') print('') print('Standard options:') print(' -s SSL enable') print(' -h HOST Listen host (default: 0.0.0.0)') print(' -p PORT Listen port (default: 5000)') print(' -d PATH Datastore path') print(' -l LEVEL Log level (TRACE, DEBUG, INFO, SUCCESS, WARNING, ERROR, CRITICAL)') print(' -c Cleanup unused snapshots') print(' -C Create datastore directory if it doesn\'t exist') print(' -P true/false Set all watches paused (true) or active (false)') print('') print('Add URLs on startup:') print(' -u URL Add URL to watch (can be used multiple times)') print(' -u0 \'JSON\' Set options for first -u URL (e.g. \'{"processor":"text_json_diff"}\')') print(' -u1 \'JSON\' Set options for second -u URL (0-indexed)') print(' -u2 \'JSON\' Set options for third -u URL, etc.') print(' Available options: processor, fetch_backend, headers, method, etc.') print(' See model/Watch.py for all available options') print('') print('Recheck on startup:') print(' -r all Queue all watches for recheck on startup') print(' -r UUID,... Queue specific watches (comma-separated UUIDs)') print(' -r all N Queue all watches, wait for completion, repeat N times') print(' -r UUID,... N Queue specific watches, wait for completion, repeat N times') print('') print('Batch mode:') print(' -b Run in batch mode (process queue then exit)') print(' Useful for CI/CD, cron jobs, or one-time checks') print(' NOTE: Batch mode checks if Flask is running and aborts if port is in use') print(' Use -p PORT to specify a different port if needed') print('') def main(): global datastore global app # Early help/version check before any initialization if '--help' in sys.argv or '-help' in sys.argv: print_help() sys.exit(0) if '--version' in sys.argv or '-v' in sys.argv: print(f'changedetection.io {__version__}') sys.exit(0) # Import heavy modules after help/version checks to keep startup fast for those flags from changedetectionio import store from changedetectionio.flask_app import changedetection_app datastore_path = None # Set a default logger level logger_level = 'DEBUG' include_default_watches = True all_paused = None # None means don't change, True/False to set host = os.environ.get("LISTEN_HOST", "0.0.0.0").strip() port = int(os.environ.get('PORT', 5000)) ssl_mode = False # Lists for multiple URLs and their options urls_to_add = [] url_options = {} # Key: index (0-based), Value: dict of options recheck_watches = None # None, 'all', or list of UUIDs recheck_repeat_count = 1 # Number of times to repeat recheck cycle batch_mode = False # Run once then exit when queue is empty # On Windows, create and use a default path. if os.name == 'nt': datastore_path = os.path.expandvars(r'%APPDATA%\changedetection.io') os.makedirs(datastore_path, exist_ok=True) else: # Must be absolute so that send_from_directory doesnt try to make it relative to backend/ datastore_path = os.path.join(os.getcwd(), "../datastore") # Pre-process arguments to extract -u, -u, and -r options before getopt # This allows unlimited -u0, -u1, -u2, ... options without predefining them cleaned_argv = ['changedetection.py'] # Start with program name i = 1 while i < len(sys.argv): arg = sys.argv[i] # Handle -u (add URL) if arg == '-u' and i + 1 < len(sys.argv): urls_to_add.append(sys.argv[i + 1]) i += 2 continue # Handle -u (set options for URL at index N) if arg.startswith('-u') and len(arg) > 2 and arg[2:].isdigit(): idx = int(arg[2:]) if i + 1 < len(sys.argv): try: import json url_options[idx] = json.loads(sys.argv[i + 1]) except json.JSONDecodeError as e: print(f'Error: Invalid JSON for {arg}: {sys.argv[i + 1]}') print(f'JSON decode error: {e}') sys.exit(2) i += 2 continue # Handle -r (recheck watches) if arg == '-r' and i + 1 < len(sys.argv): recheck_arg = sys.argv[i + 1] if recheck_arg.lower() == 'all': recheck_watches = 'all' else: # Parse comma-separated list of UUIDs recheck_watches = [uuid.strip() for uuid in recheck_arg.split(',') if uuid.strip()] # Check for optional repeat count as third argument if i + 2 < len(sys.argv) and sys.argv[i + 2].isdigit(): recheck_repeat_count = int(sys.argv[i + 2]) if recheck_repeat_count < 1: print(f'Error: Repeat count must be at least 1, got {recheck_repeat_count}') sys.exit(2) i += 3 else: i += 2 continue # Handle -b (batch mode - run once and exit) if arg == '-b': batch_mode = True i += 1 continue # Keep other arguments for getopt cleaned_argv.append(arg) i += 1 try: opts, args = getopt.getopt(cleaned_argv[1:], "6Csd:h:p:l:P:", "port") except getopt.GetoptError as e: print_help() print(f'Error: {e}') sys.exit(2) create_datastore_dir = False # Set a logger level via shell env variable # Used: Dockerfile for CICD # To set logger level for pytest, see the app function in tests/conftest.py if os.getenv("LOGGER_LEVEL"): level = os.getenv("LOGGER_LEVEL") logger_level = int(level) if level.isdigit() else level.upper() for opt, arg in opts: if opt == '-s': ssl_mode = True if opt == '-h': host = arg if opt == '-p': port = int(arg) if opt == '-d': datastore_path = arg # Create the datadir if it doesnt exist if opt == '-C': create_datastore_dir = True if opt == '-l': logger_level = int(arg) if arg.isdigit() else arg.upper() if opt == '-P': try: all_paused = bool(strtobool(arg)) except ValueError: print(f'Error: Invalid value for -P option: {arg}') print('Expected: true, false, yes, no, 1, or 0') sys.exit(2) # If URLs are provided, don't include default watches if urls_to_add: include_default_watches = False logger.success(f"changedetection.io version {get_version()} starting.") # Launch using SocketIO run method for proper integration (if enabled) ssl_cert_file = os.getenv("SSL_CERT_FILE", 'cert.pem') ssl_privkey_file = os.getenv("SSL_PRIVKEY_FILE", 'privkey.pem') if os.getenv("SSL_CERT_FILE") and os.getenv("SSL_PRIVKEY_FILE"): ssl_mode = True # SSL mode could have been set by -s too, therefor fallback to default values if ssl_mode: if not os.path.isfile(ssl_cert_file) or not os.path.isfile(ssl_privkey_file): logger.critical(f"Cannot start SSL/HTTPS mode, Please be sure that {ssl_cert_file}' and '{ssl_privkey_file}' exist in in {os.getcwd()}") os._exit(2) # Without this, a logger will be duplicated logger.remove() try: log_level_for_stdout = { 'TRACE', 'DEBUG', 'INFO', 'SUCCESS' } logger.configure(handlers=[ {"sink": sys.stdout, "level": logger_level, "filter" : lambda record: record['level'].name in log_level_for_stdout}, {"sink": sys.stderr, "level": logger_level, "filter": lambda record: record['level'].name not in log_level_for_stdout}, ]) # Catch negative number or wrong log level name except ValueError: print("Available log level names: TRACE, DEBUG(default), INFO, SUCCESS," " WARNING, ERROR, CRITICAL") sys.exit(2) # Disable verbose pyppeteer logging to prevent memory leaks from large CDP messages # Set both parent and child loggers since pyppeteer hardcodes DEBUG level logging.getLogger('pyppeteer.connection').setLevel(logging.WARNING) logging.getLogger('pyppeteer.connection.Connection').setLevel(logging.WARNING) # isnt there some @thingy to attach to each route to tell it, that this route needs a datastore app_config = { 'datastore_path': datastore_path, 'batch_mode': batch_mode, 'recheck_watches': recheck_watches, 'recheck_repeat_count': recheck_repeat_count } if not os.path.isdir(app_config['datastore_path']): if create_datastore_dir: os.makedirs(app_config['datastore_path'], exist_ok=True) else: logger.critical( f"ERROR: Directory path for the datastore '{app_config['datastore_path']}'" f" does not exist, cannot start, please make sure the" f" directory exists or specify a directory with the -d option.\n" f"Or use the -C parameter to create the directory.") sys.exit(2) try: datastore = store.ChangeDetectionStore(datastore_path=app_config['datastore_path'], version_tag=__version__, include_default_watches=include_default_watches) except JSONDecodeError as e: # Dont' start if the JSON DB looks corrupt logger.critical(f"ERROR: JSON DB or Proxy List JSON at '{app_config['datastore_path']}' appears to be corrupt, aborting.") logger.critical(str(e)) sys.exit(1) # Testing mode: Exit cleanly after datastore initialization (for CI/CD upgrade tests) if os.environ.get('TESTING_SHUTDOWN_AFTER_DATASTORE_LOAD'): logger.success(f"TESTING MODE: Datastore loaded successfully from {app_config['datastore_path']}") logger.success(f"TESTING MODE: Schema version: {datastore.data['settings']['application'].get('schema_version', 'unknown')}") logger.success(f"TESTING MODE: Loaded {len(datastore.data['watching'])} watches") logger.success("TESTING MODE: Exiting cleanly (TESTING_SHUTDOWN_AFTER_DATASTORE_LOAD is set)") sys.exit(0) # Apply all_paused setting if specified via CLI if all_paused is not None: datastore.data['settings']['application']['all_paused'] = all_paused logger.info(f"Setting all watches paused: {all_paused}") # Inject datastore into plugins that need access to settings from changedetectionio.pluggy_interface import inject_datastore_into_plugins inject_datastore_into_plugins(datastore) # Step 1: Add URLs with their options (if provided via -u flags) added_watch_uuids = [] if urls_to_add: logger.info(f"Adding {len(urls_to_add)} URL(s) from command line") for idx, url in enumerate(urls_to_add): extras = url_options.get(idx, {}) if extras: logger.debug(f"Adding watch {idx}: {url} with options: {extras}") else: logger.debug(f"Adding watch {idx}: {url}") new_uuid = datastore.add_watch(url=url, extras=extras) if new_uuid: added_watch_uuids.append(new_uuid) logger.success(f"Added watch: {url} (UUID: {new_uuid})") else: logger.error(f"Failed to add watch: {url}") app = changedetection_app(app_config, datastore) # Step 2: Queue newly added watches (if -u was provided in batch mode) # This must happen AFTER app initialization so update_q is available if batch_mode and added_watch_uuids: from changedetectionio.flask_app import update_q from changedetectionio import queuedWatchMetaData, worker_pool logger.info(f"Batch mode: Queuing {len(added_watch_uuids)} newly added watches") for watch_uuid in added_watch_uuids: try: worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) logger.debug(f"Queued newly added watch: {watch_uuid}") except Exception as e: logger.error(f"Failed to queue watch {watch_uuid}: {e}") # Step 3: Queue watches for recheck (if -r was provided) # This must happen AFTER app initialization so update_q is available if recheck_watches is not None: from changedetectionio.flask_app import update_q from changedetectionio import queuedWatchMetaData, worker_pool watches_to_queue = [] if recheck_watches == 'all': # Queue all watches, excluding those already queued in batch mode all_watches = list(datastore.data['watching'].keys()) if batch_mode and added_watch_uuids: # Exclude newly added watches that were already queued in batch mode watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids] logger.info(f"Queuing {len(watches_to_queue)} existing watches for recheck ({len(added_watch_uuids)} newly added watches already queued)") else: watches_to_queue = all_watches logger.info(f"Queuing all {len(watches_to_queue)} watches for recheck") else: # Queue specific UUIDs watches_to_queue = recheck_watches logger.info(f"Queuing {len(watches_to_queue)} specific watches for recheck") queued_count = 0 for watch_uuid in watches_to_queue: if watch_uuid in datastore.data['watching']: try: worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) queued_count += 1 logger.debug(f"Queued watch for recheck: {watch_uuid}") except Exception as e: logger.error(f"Failed to queue watch {watch_uuid}: {e}") else: logger.warning(f"Watch UUID not found in datastore: {watch_uuid}") logger.success(f"Successfully queued {queued_count} watches for recheck") # Step 4: Setup batch mode monitor (if -b was provided) if batch_mode: from changedetectionio.flask_app import update_q # Safety check: Ensure Flask app is not already running on this port # Batch mode should never run alongside the web server import socket test_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) try: # Try to bind to the configured host:port (no SO_REUSEADDR - strict check) test_socket.bind((host, port)) test_socket.close() logger.debug(f"Batch mode: Port {port} is available (Flask app not running)") except OSError as e: test_socket.close() # errno 98 = EADDRINUSE (Linux) # errno 48 = EADDRINUSE (macOS) # errno 10048 = WSAEADDRINUSE (Windows) if e.errno in (48, 98, 10048) or "Address already in use" in str(e) or "already in use" in str(e).lower(): logger.critical(f"ERROR: Batch mode cannot run - port {port} is already in use") logger.critical(f"The Flask web server appears to be running on {host}:{port}") logger.critical(f"Batch mode is designed for standalone operation (CI/CD, cron jobs, etc.)") logger.critical(f"Please either stop the Flask web server, or use a different port with -p PORT") sys.exit(1) else: # Some other socket error - log but continue (might be network configuration issue) logger.warning(f"Port availability check failed with unexpected error: {e}") logger.warning(f"Continuing with batch mode anyway - be aware of potential conflicts") def queue_watches_for_recheck(datastore, iteration): """Helper function to queue watches for recheck""" watches_to_queue = [] if recheck_watches == 'all': all_watches = list(datastore.data['watching'].keys()) if batch_mode and added_watch_uuids and iteration == 1: # Only exclude newly added watches on first iteration watches_to_queue = [uuid for uuid in all_watches if uuid not in added_watch_uuids] else: watches_to_queue = all_watches logger.info(f"Batch mode (iteration {iteration}): Queuing all {len(watches_to_queue)} watches") elif recheck_watches: watches_to_queue = recheck_watches logger.info(f"Batch mode (iteration {iteration}): Queuing {len(watches_to_queue)} specific watches") queued_count = 0 for watch_uuid in watches_to_queue: if watch_uuid in datastore.data['watching']: try: worker_pool.queue_item_async_safe( update_q, queuedWatchMetaData.PrioritizedItem(priority=1, item={'uuid': watch_uuid}) ) queued_count += 1 except Exception as e: logger.error(f"Failed to queue watch {watch_uuid}: {e}") else: logger.warning(f"Watch UUID not found in datastore: {watch_uuid}") logger.success(f"Batch mode (iteration {iteration}): Successfully queued {queued_count} watches") return queued_count def batch_mode_monitor(): """Monitor queue and workers, shutdown or repeat when work is complete""" import time # Track iterations if repeat mode is enabled current_iteration = 1 total_iterations = recheck_repeat_count if recheck_watches and recheck_repeat_count > 1 else 1 if total_iterations > 1: logger.info(f"Batch mode: Will repeat recheck {total_iterations} times") else: logger.info("Batch mode: Waiting for all queued items to complete...") # Wait a bit for workers to start processing time.sleep(3) try: while current_iteration <= total_iterations: logger.info(f"Batch mode: Waiting for iteration {current_iteration}/{total_iterations} to complete...") # Use the shared wait_for_all_checks function completed = worker_pool.wait_for_all_checks(update_q, timeout=300) if not completed: logger.warning(f"Batch mode: Iteration {current_iteration} timed out after 300 seconds") logger.success(f"Batch mode: Iteration {current_iteration}/{total_iterations} completed") # Check if we need to repeat if current_iteration < total_iterations: logger.info(f"Batch mode: Starting iteration {current_iteration + 1}...") current_iteration += 1 # Re-queue watches for next iteration queue_watches_for_recheck(datastore, current_iteration) # Brief pause before continuing time.sleep(2) else: # All iterations complete logger.success(f"Batch mode: All {total_iterations} iterations completed, initiating shutdown") # Trigger shutdown import os, signal os.kill(os.getpid(), signal.SIGTERM) return except Exception as e: logger.error(f"Batch mode monitor error: {e}") logger.error(f"Initiating emergency shutdown") import os, signal os.kill(os.getpid(), signal.SIGTERM) # Start monitor in background thread monitor_thread = threading.Thread(target=batch_mode_monitor, daemon=True, name="BatchModeMonitor") monitor_thread.start() logger.info("Batch mode enabled: Will exit after all queued items are processed") # Get the SocketIO instance from the Flask app (created in flask_app.py) from changedetectionio.flask_app import socketio_server global socketio socketio = socketio_server signal.signal(signal.SIGTERM, sigshutdown_handler) signal.signal(signal.SIGINT, sigshutdown_handler) # Custom signal handler for memory cleanup def sigusr_clean_handler(_signo, _stack_frame): from changedetectionio.gc_cleanup import memory_cleanup logger.info('SIGUSR1 received: Running memory cleanup') return memory_cleanup(app) # Register the SIGUSR1 signal handler # Only register the signal handler if running on Linux if platform.system() == "Linux": signal.signal(signal.SIGUSR1, sigusr_clean_handler) else: logger.info("SIGUSR1 handler only registered on Linux, skipped.") app.config['datastore_path'] = datastore_path @app.context_processor def inject_template_globals(): return dict(right_sticky="v"+__version__, new_version_available=app.config['NEW_VERSION_AVAILABLE'], has_password=datastore.data['settings']['application']['password'] != False, socket_io_enabled=datastore.data['settings']['application'].get('ui', {}).get('socket_io_enabled', True), all_paused=datastore.data['settings']['application'].get('all_paused', False), all_muted=datastore.data['settings']['application'].get('all_muted', False) ) # Monitored websites will not receive a Referer header when a user clicks on an outgoing link. @app.after_request def hide_referrer(response): if strtobool(os.getenv("HIDE_REFERER", 'false')): response.headers["Referrer-Policy"] = "same-origin" return response # Proxy sub-directory support # Set environment var USE_X_SETTINGS=1 on this script # And then in your proxy_pass settings # # proxy_set_header Host "localhost"; # proxy_set_header X-Forwarded-Prefix /app; if os.getenv('USE_X_SETTINGS'): logger.info("USE_X_SETTINGS is ENABLED") from werkzeug.middleware.proxy_fix import ProxyFix app.wsgi_app = ProxyFix( app.wsgi_app, x_for=1, # X-Forwarded-For (client IP) x_proto=1, # X-Forwarded-Proto (http/https) x_host=1, # X-Forwarded-Host (original host) x_port=1, # X-Forwarded-Port (original port) x_prefix=1 # X-Forwarded-Prefix (URL prefix) ) # In batch mode, skip starting the HTTP server - just keep workers running if batch_mode: logger.info("Batch mode: Skipping HTTP server startup, workers will process queue") logger.info("Batch mode: Main thread will wait for shutdown signal") # Keep main thread alive until batch monitor triggers shutdown try: while True: time.sleep(1) except KeyboardInterrupt: logger.info("Batch mode: Keyboard interrupt received") pass else: # Normal mode: Start HTTP server # SocketIO instance is already initialized in flask_app.py if socketio_server: if ssl_mode: logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") socketio.run(app, host=host, port=int(port), debug=False, ssl_context=(ssl_cert_file, ssl_privkey_file), allow_unsafe_werkzeug=True) else: socketio.run(app, host=host, port=int(port), debug=False, allow_unsafe_werkzeug=True) else: # Run Flask app without Socket.IO if disabled logger.info("Starting Flask app without Socket.IO server") if ssl_mode: logger.success(f"SSL mode enabled, attempting to start with '{ssl_cert_file}' and '{ssl_privkey_file}' in {os.getcwd()}") app.run(host=host, port=int(port), debug=False, ssl_context=(ssl_cert_file, ssl_privkey_file)) else: app.run(host=host, port=int(port), debug=False)