#!/usr/bin/env python3 import flask_login import locale import os import queue import sys import threading import time import timeago from blinker import signal from pathlib import Path from changedetectionio.strtobool import strtobool from threading import Event from changedetectionio.queue_handlers import RecheckPriorityQueue, NotificationQueue from changedetectionio import worker_handler from flask import ( Flask, abort, flash, redirect, render_template, request, send_from_directory, session, url_for, ) from urllib.parse import urlparse from flask_compress import Compress as FlaskCompress from flask_login import current_user from flask_restful import abort, Api from flask_cors import CORS # Create specific signals for application events # Make this a global singleton to avoid multiple signal objects watch_check_update = signal('watch_check_update', doc='Signal sent when a watch check is completed') from flask_wtf import CSRFProtect from flask_babel import Babel, gettext, get_locale from loguru import logger from changedetectionio import __version__ from changedetectionio import queuedWatchMetaData from changedetectionio.api import Watch, WatchHistory, WatchSingleHistory, WatchHistoryDiff, CreateWatch, Import, SystemInfo, Tag, Tags, Notifications, WatchFavicon from changedetectionio.api.Search import Search from .time_handler import is_within_schedule from changedetectionio.languages import get_available_languages, get_language_codes, get_flag_for_locale, get_timeago_locale datastore = None # Local ticker_thread = None extra_stylesheets = [] # Use bulletproof janus-based queues for sync/async reliability update_q = RecheckPriorityQueue() notification_q = NotificationQueue() MAX_QUEUE_SIZE = 2000 app = Flask(__name__, static_url_path="", static_folder="static", template_folder="templates") # Will be initialized in changedetection_app socketio_server = None # Enable CORS, especially useful for the Chrome extension to operate from anywhere CORS(app) # Super handy for compressing large BrowserSteps responses and others FlaskCompress(app) app.config['COMPRESS_MIN_SIZE'] = 4096 app.config['COMPRESS_MIMETYPES'] = ['text/html', 'text/css', 'text/javascript', 'application/json', 'application/javascript', 'image/svg+xml'] app.config['TEMPLATES_AUTO_RELOAD'] = False # Stop browser caching of assets app.config['SEND_FILE_MAX_AGE_DEFAULT'] = 0 app.config.exit = Event() app.config['NEW_VERSION_AVAILABLE'] = False if os.getenv('FLASK_SERVER_NAME'): app.config['SERVER_NAME'] = os.getenv('FLASK_SERVER_NAME') # Babel/i18n configuration app.config['BABEL_TRANSLATION_DIRECTORIES'] = str(Path(__file__).parent / 'translations') app.config['BABEL_DEFAULT_LOCALE'] = 'en_GB' #app.config["EXPLAIN_TEMPLATE_LOADING"] = True app.jinja_env.add_extension('jinja2.ext.loopcontrols') # Configure Jinja2 to search for templates in plugin directories def _configure_plugin_templates(): """Configure Jinja2 loader to include plugin template directories.""" from jinja2 import ChoiceLoader, FileSystemLoader from changedetectionio.pluggy_interface import get_plugin_template_paths # Get plugin template paths plugin_template_paths = get_plugin_template_paths() if plugin_template_paths: # Create a ChoiceLoader that searches app templates first, then plugin templates loaders = [app.jinja_loader] # Keep the default app loader first for path in plugin_template_paths: loaders.append(FileSystemLoader(path)) app.jinja_loader = ChoiceLoader(loaders) logger.info(f"Configured Jinja2 to search {len(plugin_template_paths)} plugin template directories") # Configure plugin templates (called after plugins are loaded) _configure_plugin_templates() csrf = CSRFProtect() csrf.init_app(app) notification_debug_log=[] # Locale for correct presentation of prices etc default_locale = locale.getdefaultlocale() logger.info(f"System locale default is {default_locale}") try: locale.setlocale(locale.LC_ALL, default_locale) except locale.Error: logger.warning(f"Unable to set locale {default_locale}, locale is not installed maybe?") watch_api = Api(app, decorators=[csrf.exempt]) def init_app_secret(datastore_path): secret = "" path = os.path.join(datastore_path, "secret.txt") try: with open(path, "r", encoding='utf-8') as f: secret = f.read() except FileNotFoundError: import secrets with open(path, "w", encoding='utf-8') as f: secret = secrets.token_hex(32) f.write(secret) return secret @app.template_global() def get_darkmode_state(): css_dark_mode = request.cookies.get('css_dark_mode', 'false') return 'true' if css_dark_mode and strtobool(css_dark_mode) else 'false' @app.template_global() def get_css_version(): return __version__ @app.template_global() def get_socketio_path(): """Generate the correct Socket.IO path prefix for the client""" # If behind a proxy with a sub-path, we need to respect that path prefix = "" if os.getenv('USE_X_SETTINGS') and 'X-Forwarded-Prefix' in request.headers: prefix = request.headers['X-Forwarded-Prefix'] # Socket.IO will be available at {prefix}/socket.io/ return prefix @app.template_global('is_safe_valid_url') def _is_safe_valid_url(test_url): from .validate_url import is_safe_valid_url return is_safe_valid_url(test_url) @app.template_filter('format_number_locale') def _jinja2_filter_format_number_locale(value: float) -> str: "Formats for example 4000.10 to the local locale default of 4,000.10" # Format the number with two decimal places (locale format string will return 6 decimal) formatted_value = locale.format_string("%.2f", value, grouping=True) return formatted_value @app.template_global('is_checking_now') def _watch_is_checking_now(watch_obj, format="%Y-%m-%d %H:%M:%S"): return worker_handler.is_watch_running(watch_obj['uuid']) @app.template_global('get_watch_queue_position') def _get_watch_queue_position(watch_obj): """Get the position of a watch in the queue""" uuid = watch_obj['uuid'] return update_q.get_uuid_position(uuid) @app.template_global('get_current_worker_count') def _get_current_worker_count(): """Get the current number of operational workers""" return worker_handler.get_worker_count() @app.template_global('get_worker_status_info') def _get_worker_status_info(): """Get detailed worker status information for display""" status = worker_handler.get_worker_status() running_uuids = worker_handler.get_running_uuids() return { 'count': status['worker_count'], 'type': status['worker_type'], 'active_workers': len(running_uuids), 'processing_watches': running_uuids, 'loop_running': status.get('async_loop_running', None) } # We use the whole watch object from the store/JSON so we can see if there's some related status in terms of a thread # running or something similar. @app.template_filter('format_last_checked_time') def _jinja2_filter_datetime(watch_obj, format="%Y-%m-%d %H:%M:%S"): if watch_obj['last_checked'] == 0: return gettext('Not yet') locale = get_timeago_locale(str(get_locale())) try: return timeago.format(int(watch_obj['last_checked']), time.time(), locale) except: # Fallback to English if locale not supported by timeago return timeago.format(int(watch_obj['last_checked']), time.time(), 'en') @app.template_filter('format_timestamp_timeago') def _jinja2_filter_datetimestamp(timestamp, format="%Y-%m-%d %H:%M:%S"): if not timestamp: return gettext('Not yet') locale = get_timeago_locale(str(get_locale())) try: return timeago.format(int(timestamp), time.time(), locale) except: # Fallback to English if locale not supported by timeago return timeago.format(int(timestamp), time.time(), 'en') @app.template_filter('pagination_slice') def _jinja2_filter_pagination_slice(arr, skip): per_page = datastore.data['settings']['application'].get('pager_size', 50) if per_page: return arr[skip:skip + per_page] return arr @app.template_filter('format_seconds_ago') def _jinja2_filter_seconds_precise(timestamp): if timestamp == False: return gettext('Not yet') return format(int(time.time()-timestamp), ',d') @app.template_filter('fetcher_status_icons') def _jinja2_filter_fetcher_status_icons(fetcher_name): """Get status icon HTML for a given fetcher. This filter checks both built-in fetchers and plugin fetchers for status icons. Args: fetcher_name: The fetcher name (e.g., 'html_webdriver', 'html_js_zyte') Returns: str: HTML string containing status icon elements """ from changedetectionio import content_fetchers from changedetectionio.pluggy_interface import collect_fetcher_status_icons from markupsafe import Markup from flask import url_for icon_data = None # First check if it's a plugin fetcher (plugins have priority) plugin_icon_data = collect_fetcher_status_icons(fetcher_name) if plugin_icon_data: icon_data = plugin_icon_data # Check if it's a built-in fetcher elif hasattr(content_fetchers, fetcher_name): fetcher_class = getattr(content_fetchers, fetcher_name) if hasattr(fetcher_class, 'get_status_icon_data'): icon_data = fetcher_class.get_status_icon_data() # Build HTML from icon data if icon_data and isinstance(icon_data, dict): # Use 'group' from icon_data if specified, otherwise default to 'images' group = icon_data.get('group', 'images') # Try to use url_for, but fall back to manual URL building if endpoint not registered yet try: icon_url = url_for('static_content', group=group, filename=icon_data['filename']) except: # Fallback: build URL manually respecting APPLICATION_ROOT from flask import request app_root = request.script_root if hasattr(request, 'script_root') else '' icon_url = f"{app_root}/static/{group}/{icon_data['filename']}" style_attr = f' style="{icon_data["style"]}"' if icon_data.get('style') else '' html = f'{icon_data[' return Markup(html) return '' @app.template_filter('sanitize_tag_class') def _jinja2_filter_sanitize_tag_class(tag_title): """Sanitize a tag title to create a valid CSS class name. Removes all non-alphanumeric characters and converts to lowercase. Args: tag_title: The tag title string Returns: str: A sanitized string suitable for use as a CSS class name """ import re # Remove all non-alphanumeric characters and convert to lowercase sanitized = re.sub(r'[^a-zA-Z0-9]', '', tag_title).lower() # Ensure it starts with a letter (CSS requirement) if sanitized and not sanitized[0].isalpha(): sanitized = 'tag' + sanitized return sanitized if sanitized else 'tag' # Import login_optionally_required from auth_decorator from changedetectionio.auth_decorator import login_optionally_required # When nobody is logged in Flask-Login's current_user is set to an AnonymousUser object. class User(flask_login.UserMixin): id=None def set_password(self, password): return True def get_user(self, email="defaultuser@changedetection.io"): return self def is_authenticated(self): return True def is_active(self): return True def is_anonymous(self): return False def get_id(self): return str(self.id) # Compare given password against JSON store or Env var def check_password(self, password): import base64 import hashlib # Can be stored in env (for deployments) or in the general configs raw_salt_pass = os.getenv("SALTED_PASS", False) if not raw_salt_pass: raw_salt_pass = datastore.data['settings']['application'].get('password') raw_salt_pass = base64.b64decode(raw_salt_pass) salt_from_storage = raw_salt_pass[:32] # 32 is the length of the salt # Use the exact same setup you used to generate the key, but this time put in the password to check new_key = hashlib.pbkdf2_hmac( 'sha256', password.encode('utf-8'), # Convert the password to bytes salt_from_storage, 100000 ) new_key = salt_from_storage + new_key return new_key == raw_salt_pass pass def changedetection_app(config=None, datastore_o=None): logger.trace("TRACE log is enabled") global datastore, socketio_server datastore = datastore_o # Import and create a wrapper for is_safe_url that has access to app from changedetectionio.is_safe_url import is_safe_url as _is_safe_url def is_safe_url(target): """Wrapper for is_safe_url that passes the app instance""" return _is_safe_url(target, app) # so far just for read-only via tests, but this will be moved eventually to be the main source # (instead of the global var) app.config['DATASTORE'] = datastore_o # Store the signal in the app config to ensure it's accessible everywhere app.config['watch_check_update_SIGNAL'] = watch_check_update login_manager = flask_login.LoginManager(app) login_manager.login_view = 'login' app.secret_key = init_app_secret(config['datastore_path']) # Initialize Flask-Babel for i18n support available_languages = get_available_languages() language_codes = get_language_codes() def get_locale(): # 1. Try to get locale from session (user explicitly selected) if 'locale' in session: return session['locale'] # 2. Fall back to Accept-Language header return request.accept_languages.best_match(language_codes) # Initialize Babel with locale selector babel = Babel(app, locale_selector=get_locale) # Make i18n functions available to templates app.jinja_env.globals.update( _=gettext, get_locale=get_locale, get_flag_for_locale=get_flag_for_locale, available_languages=available_languages ) # Set up a request hook to check authentication for all routes @app.before_request def check_authentication(): has_password_enabled = datastore.data['settings']['application'].get('password') or os.getenv("SALTED_PASS", False) if has_password_enabled and not flask_login.current_user.is_authenticated: # Permitted if request.endpoint and request.endpoint == 'static_content' and request.view_args: # Handled by static_content handler return None # Permitted - static flag icons need to load on login page elif request.endpoint and request.endpoint == 'static_flags': return None # Permitted - language selection should work on login page elif request.endpoint and request.endpoint == 'set_language': return None # Permitted elif request.endpoint and 'login' in request.endpoint: return None elif request.endpoint and 'diff_history_page' in request.endpoint and datastore.data['settings']['application'].get('shared_diff_access'): return None elif request.method in flask_login.config.EXEMPT_METHODS: return None elif app.config.get('LOGIN_DISABLED'): return None # RSS access with token is allowed elif request.endpoint and 'rss.feed' in request.endpoint: return None # Socket.IO routes - need separate handling elif request.path.startswith('/socket.io/'): return None # API routes - use their own auth mechanism (@auth.check_token) elif request.path.startswith('/api/'): return None else: return login_manager.unauthorized() watch_api.add_resource(WatchHistoryDiff, '/api/v1/watch//difference//', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(WatchSingleHistory, '/api/v1/watch//history/', resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) watch_api.add_resource(WatchFavicon, '/api/v1/watch//favicon', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(WatchHistory, '/api/v1/watch//history', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(CreateWatch, '/api/v1/watch', resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) watch_api.add_resource(Watch, '/api/v1/watch/', resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) watch_api.add_resource(SystemInfo, '/api/v1/systeminfo', resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) watch_api.add_resource(Import, '/api/v1/import', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(Tags, '/api/v1/tags', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(Tag, '/api/v1/tag', '/api/v1/tag/', resource_class_kwargs={'datastore': datastore, 'update_q': update_q}) watch_api.add_resource(Search, '/api/v1/search', resource_class_kwargs={'datastore': datastore}) watch_api.add_resource(Notifications, '/api/v1/notifications', resource_class_kwargs={'datastore': datastore}) @login_manager.user_loader def user_loader(email): user = User() user.get_user(email) return user @login_manager.unauthorized_handler def unauthorized_handler(): # Pass the current request path so users are redirected back after login return redirect(url_for('login', redirect=request.path)) @app.route('/logout') def logout(): flask_login.logout_user() # Check if there's a redirect parameter to return to after re-login redirect_url = request.args.get('redirect') # If redirect is provided and safe, pass it to login page if redirect_url and is_safe_url(redirect_url): return redirect(url_for('login', redirect=redirect_url)) # Otherwise just go to watchlist return redirect(url_for('watchlist.index')) @app.route('/set-language/') def set_language(locale): """Set the user's preferred language in the session""" if not request.cookies: logger.error("Cannot set language without session cookie") flash("Cannot set language without session cookie", 'error') return redirect(url_for('watchlist.index')) # Validate the locale against available languages if locale in language_codes: session['locale'] = locale # CRITICAL: Flask-Babel caches the locale in the request context (ctx.babel_locale) # We must refresh to clear this cache so the new locale takes effect immediately # This is especially important for tests where multiple requests happen rapidly from flask_babel import refresh refresh() else: logger.error(f"Invalid locale {locale}, available: {language_codes}") # Check if there's a redirect parameter to return to the same page redirect_url = request.args.get('redirect') # If redirect is provided and safe, use it if redirect_url and is_safe_url(redirect_url): return redirect(redirect_url) # Otherwise redirect to watchlist return redirect(url_for('watchlist.index')) # https://github.com/pallets/flask/blob/93dd1709d05a1cf0e886df6223377bdab3b077fb/examples/tutorial/flaskr/__init__.py#L39 # You can divide up the stuff like this @app.route('/login', methods=['GET', 'POST']) def login(): # Extract and validate the redirect parameter redirect_url = request.args.get('redirect') or request.form.get('redirect') # Validate the redirect URL - default to watchlist if invalid if redirect_url and is_safe_url(redirect_url): validated_redirect = redirect_url else: validated_redirect = url_for('watchlist.index') if request.method == 'GET': if flask_login.current_user.is_authenticated: # Already logged in - redirect immediately to the target flash(gettext("Already logged in")) return redirect(validated_redirect) flash(gettext("You must be logged in, please log in."), 'error') output = render_template("login.html", redirect_url=validated_redirect) return output user = User() user.id = "defaultuser@changedetection.io" password = request.form.get('password') if (user.check_password(password)): flask_login.login_user(user, remember=True) # Redirect to the validated URL after successful login return redirect(validated_redirect) else: flash(gettext('Incorrect password'), 'error') return redirect(url_for('login', redirect=redirect_url if redirect_url else None)) @app.before_request def before_request_handle_cookie_x_settings(): # Set the auth cookie path if we're running as X-settings/X-Forwarded-Prefix if os.getenv('USE_X_SETTINGS') and 'X-Forwarded-Prefix' in request.headers: app.config['REMEMBER_COOKIE_PATH'] = request.headers['X-Forwarded-Prefix'] app.config['SESSION_COOKIE_PATH'] = request.headers['X-Forwarded-Prefix'] return None @app.route("/static/flags/", methods=['GET']) def static_flags(flag_path): """Handle flag icon files with subdirectories""" from flask import make_response import re # flag_path comes in as "1x1/de.svg" or "4x3/de.svg" if re.match(r'^(1x1|4x3)/[a-z0-9-]+\.svg$', flag_path.lower()): # Reconstruct the path safely with additional validation parts = flag_path.lower().split('/') if len(parts) != 2: abort(404) subdir = parts[0] svg_file = parts[1] # Extra validation: ensure subdir is exactly 1x1 or 4x3 if subdir not in ['1x1', '4x3']: abort(404) # Extra validation: ensure svg_file only contains safe characters if not re.match(r'^[a-z0-9-]+\.svg$', svg_file): abort(404) try: response = make_response(send_from_directory(f"static/flags/{subdir}", svg_file)) response.headers['Content-type'] = 'image/svg+xml' response.headers['Cache-Control'] = 'max-age=86400, public' # Cache for 24 hours return response except FileNotFoundError: abort(404) else: abort(404) @app.route("/static//", methods=['GET']) def static_content(group, filename): from flask import make_response import re group = re.sub(r'[^\w.-]+', '', group.lower()) filename = re.sub(r'[^\w.-]+', '', filename.lower()) if group == 'screenshot': # Could be sensitive, follow password requirements if datastore.data['settings']['application']['password'] and not flask_login.current_user.is_authenticated: if not datastore.data['settings']['application'].get('shared_diff_access'): abort(403) screenshot_filename = "last-screenshot.png" if not request.args.get('error_screenshot') else "last-error-screenshot.png" # These files should be in our subdirectory try: # set nocache, set content-type response = make_response(send_from_directory(os.path.join(datastore_o.datastore_path, filename), screenshot_filename)) response.headers['Content-type'] = 'image/png' response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = 0 return response except FileNotFoundError: abort(404) if group == 'favicon': # Could be sensitive, follow password requirements if datastore.data['settings']['application']['password'] and not flask_login.current_user.is_authenticated: abort(403) # Get the watch object watch = datastore.data['watching'].get(filename) if not watch: abort(404) favicon_filename = watch.get_favicon_filename() if favicon_filename: try: import magic mime = magic.from_file( os.path.join(watch.watch_data_dir, favicon_filename), mime=True ) except ImportError: # Fallback, no python-magic import mimetypes mime, encoding = mimetypes.guess_type(favicon_filename) response = make_response(send_from_directory(watch.watch_data_dir, favicon_filename)) response.headers['Content-type'] = mime response.headers['Cache-Control'] = 'max-age=300, must-revalidate' # Cache for 5 minutes, then revalidate return response if group == 'visual_selector_data': # Could be sensitive, follow password requirements if datastore.data['settings']['application']['password'] and not flask_login.current_user.is_authenticated: abort(403) # These files should be in our subdirectory try: # set nocache, set content-type, # `filename` is actually directory UUID of the watch watch_directory = str(os.path.join(datastore_o.datastore_path, filename)) response = None if os.path.isfile(os.path.join(watch_directory, "elements.deflate")): response = make_response(send_from_directory(watch_directory, "elements.deflate")) response.headers['Content-Type'] = 'application/json' response.headers['Content-Encoding'] = 'deflate' else: logger.error(f'Request elements.deflate at "{watch_directory}" but was not found.') abort(404) if response: response.headers['Cache-Control'] = 'no-cache, no-store, must-revalidate' response.headers['Pragma'] = 'no-cache' response.headers['Expires'] = "0" return response except FileNotFoundError: abort(404) # Handle plugin group specially if group == 'plugin': # Serve files from plugin static directories from changedetectionio.pluggy_interface import plugin_manager import os as os_check for plugin_name, plugin_obj in plugin_manager.list_name_plugin(): if hasattr(plugin_obj, 'plugin_static_path'): try: static_path = plugin_obj.plugin_static_path() if static_path and os_check.path.isdir(static_path): # Check if file exists in plugin's static directory plugin_file_path = os_check.path.join(static_path, filename) if os_check.path.isfile(plugin_file_path): # Found the file in a plugin response = make_response(send_from_directory(static_path, filename)) response.headers['Cache-Control'] = 'max-age=3600, public' # Cache for 1 hour return response except Exception as e: logger.debug(f"Error checking plugin {plugin_name} for static file: {e}") pass # File not found in any plugin abort(404) # These files should be in our subdirectory try: return send_from_directory(f"static/{group}", path=filename) except FileNotFoundError: abort(404) import changedetectionio.blueprint.browser_steps as browser_steps app.register_blueprint(browser_steps.construct_blueprint(datastore), url_prefix='/browser-steps') from changedetectionio.blueprint.imports import construct_blueprint as construct_import_blueprint app.register_blueprint(construct_import_blueprint(datastore, update_q, queuedWatchMetaData), url_prefix='/imports') import changedetectionio.blueprint.price_data_follower as price_data_follower app.register_blueprint(price_data_follower.construct_blueprint(datastore, update_q), url_prefix='/price_data_follower') import changedetectionio.blueprint.tags as tags app.register_blueprint(tags.construct_blueprint(datastore), url_prefix='/tags') import changedetectionio.blueprint.check_proxies as check_proxies app.register_blueprint(check_proxies.construct_blueprint(datastore=datastore), url_prefix='/check_proxy') import changedetectionio.blueprint.backups as backups app.register_blueprint(backups.construct_blueprint(datastore), url_prefix='/backups') import changedetectionio.blueprint.settings as settings app.register_blueprint(settings.construct_blueprint(datastore), url_prefix='/settings') import changedetectionio.conditions.blueprint as conditions app.register_blueprint(conditions.construct_blueprint(datastore), url_prefix='/conditions') import changedetectionio.blueprint.rss.blueprint as rss app.register_blueprint(rss.construct_blueprint(datastore), url_prefix='/rss') # watchlist UI buttons etc import changedetectionio.blueprint.ui as ui app.register_blueprint(ui.construct_blueprint(datastore, update_q, worker_handler, queuedWatchMetaData, watch_check_update)) import changedetectionio.blueprint.watchlist as watchlist app.register_blueprint(watchlist.construct_blueprint(datastore=datastore, update_q=update_q, queuedWatchMetaData=queuedWatchMetaData), url_prefix='') # Initialize Socket.IO server conditionally based on settings socket_io_enabled = datastore.data['settings']['application']['ui'].get('socket_io_enabled', True) if socket_io_enabled: from changedetectionio.realtime.socket_server import init_socketio global socketio_server socketio_server = init_socketio(app, datastore) logger.info("Socket.IO server initialized") else: logger.info("Socket.IO server disabled via settings") socketio_server = None # Memory cleanup endpoint @app.route('/gc-cleanup', methods=['GET']) @login_optionally_required def gc_cleanup(): from changedetectionio.gc_cleanup import memory_cleanup from flask import jsonify result = memory_cleanup(app) return jsonify({"status": "success", "message": "Memory cleanup completed", "result": result}) # Worker health check endpoint @app.route('/worker-health', methods=['GET']) @login_optionally_required def worker_health(): from flask import jsonify expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) # Get basic status status = worker_handler.get_worker_status() # Perform health check health_result = worker_handler.check_worker_health( expected_count=expected_workers, update_q=update_q, notification_q=notification_q, app=app, datastore=datastore ) return jsonify({ "status": "success", "worker_status": status, "health_check": health_result, "expected_workers": expected_workers }) # Queue status endpoint @app.route('/queue-status', methods=['GET']) @login_optionally_required def queue_status(): from flask import jsonify, request # Get specific UUID position if requested target_uuid = request.args.get('uuid') if target_uuid: position_info = update_q.get_uuid_position(target_uuid) return jsonify({ "status": "success", "uuid": target_uuid, "queue_position": position_info }) else: # Get pagination parameters limit = request.args.get('limit', type=int) offset = request.args.get('offset', type=int, default=0) summary_only = request.args.get('summary', type=bool, default=False) if summary_only: # Fast summary for large queues summary = update_q.get_queue_summary() return jsonify({ "status": "success", "queue_summary": summary }) else: # Get queued items with pagination support if limit is None: # Default limit for large queues to prevent performance issues queue_size = update_q.qsize() if queue_size > 100: limit = 50 logger.warning(f"Large queue ({queue_size} items) detected, limiting to {limit} items. Use ?limit=N for more.") all_queued = update_q.get_all_queued_uuids(limit=limit, offset=offset) return jsonify({ "status": "success", "queue_size": update_q.qsize(), "queued_data": all_queued }) # Start the async workers during app initialization # Can be overridden by ENV or use the default settings n_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) logger.info(f"Starting {n_workers} workers during app initialization") worker_handler.start_workers(n_workers, update_q, notification_q, app, datastore) # @todo handle ctrl break ticker_thread = threading.Thread(target=ticker_thread_check_time_launch_checks, daemon=True, name="TickerThread-ScheduleChecker").start() threading.Thread(target=notification_runner, daemon=True, name="NotificationRunner").start() in_pytest = "pytest" in sys.modules or "PYTEST_CURRENT_TEST" in os.environ # Check for new release version, but not when running in test/build or pytest if not os.getenv("GITHUB_REF", False) and not strtobool(os.getenv('DISABLE_VERSION_CHECK', 'no')) and not in_pytest: threading.Thread(target=check_for_new_version, daemon=True, name="VersionChecker").start() # Return the Flask app - the Socket.IO will be attached to it but initialized separately # This avoids circular dependencies return app # Check for new version and anonymous stats def check_for_new_version(): import requests import urllib3 urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) while not app.config.exit.is_set(): try: r = requests.post("https://changedetection.io/check-ver.php", data={'version': __version__, 'app_guid': datastore.data['app_guid'], 'watch_count': len(datastore.data['watching']) }, verify=False) except: pass try: if "new_version" in r.text: app.config['NEW_VERSION_AVAILABLE'] = True except: pass # Check daily app.config.exit.wait(86400) def notification_runner(): global notification_debug_log from datetime import datetime import json with app.app_context(): while not app.config.exit.is_set(): try: # At the moment only one thread runs (single runner) n_object = notification_q.get(block=False) except queue.Empty: app.config.exit.wait(1) else: now = datetime.now() sent_obj = None try: from changedetectionio.notification.handler import process_notification # Fallback to system config if not set if not n_object.get('notification_body') and datastore.data['settings']['application'].get('notification_body'): n_object['notification_body'] = datastore.data['settings']['application'].get('notification_body') if not n_object.get('notification_title') and datastore.data['settings']['application'].get('notification_title'): n_object['notification_title'] = datastore.data['settings']['application'].get('notification_title') if not n_object.get('notification_format') and datastore.data['settings']['application'].get('notification_format'): n_object['notification_format'] = datastore.data['settings']['application'].get('notification_format') if n_object.get('notification_urls', {}): sent_obj = process_notification(n_object, datastore) except Exception as e: logger.error(f"Watch URL: {n_object['watch_url']} Error {str(e)}") # UUID wont be present when we submit a 'test' from the global settings if 'uuid' in n_object: datastore.update_watch(uuid=n_object['uuid'], update_obj={'last_notification_error': "Notification error detected, goto notification log."}) log_lines = str(e).splitlines() notification_debug_log += log_lines with app.app_context(): app.config['watch_check_update_SIGNAL'].send(app_context=app, watch_uuid=n_object.get('uuid')) # Process notifications notification_debug_log+= ["{} - SENDING - {}".format(now.strftime("%c"), json.dumps(sent_obj))] # Trim the log length notification_debug_log = notification_debug_log[-100:] # Threaded runner, look for new watches to feed into the Queue. def ticker_thread_check_time_launch_checks(): import random proxy_last_called_time = {} last_health_check = 0 recheck_time_minimum_seconds = int(os.getenv('MINIMUM_SECONDS_RECHECK_TIME', 3)) logger.debug(f"System env MINIMUM_SECONDS_RECHECK_TIME {recheck_time_minimum_seconds}") # Workers are now started during app initialization, not here while not app.config.exit.is_set(): # Periodic worker health check (every 60 seconds) now = time.time() if now - last_health_check > 60: expected_workers = int(os.getenv("FETCH_WORKERS", datastore.data['settings']['requests']['workers'])) health_result = worker_handler.check_worker_health( expected_count=expected_workers, update_q=update_q, notification_q=notification_q, app=app, datastore=datastore ) if health_result['status'] != 'healthy': logger.warning(f"Worker health check: {health_result['message']}") last_health_check = now # Get a list of watches by UUID that are currently fetching data running_uuids = worker_handler.get_running_uuids() # Re #232 - Deepcopy the data incase it changes while we're iterating through it all watch_uuid_list = [] while True: try: # Get a list of watches sorted by last_checked, [1] because it gets passed a tuple # This is so we examine the most over-due first for k in sorted(datastore.data['watching'].items(), key=lambda item: item[1].get('last_checked',0)): watch_uuid_list.append(k[0]) except RuntimeError as e: # RuntimeError: dictionary changed size during iteration time.sleep(0.1) watch_uuid_list = [] else: break # Re #438 - Don't place more watches in the queue to be checked if the queue is already large while update_q.qsize() >= 2000: logger.warning(f"Recheck watches queue size limit reached ({MAX_QUEUE_SIZE}), skipping adding more items") app.config.exit.wait(10.0) recheck_time_system_seconds = int(datastore.threshold_seconds) # Check for watches outside of the time threshold to put in the thread queue. for uuid in watch_uuid_list: now = time.time() watch = datastore.data['watching'].get(uuid) if not watch: logger.error(f"Watch: {uuid} no longer present.") continue # No need todo further processing if it's paused if watch['paused']: continue # @todo - Maybe make this a hook? # Time schedule limit - Decide between watch or global settings scheduler_source = None if watch.get('time_between_check_use_default'): time_schedule_limit = datastore.data['settings']['requests'].get('time_schedule_limit', {}) scheduler_source = 'system/global settings' else: time_schedule_limit = watch.get('time_schedule_limit') scheduler_source = 'watch' tz_name = datastore.data['settings']['application'].get('scheduler_timezone_default', os.getenv('TZ', 'UTC').strip()) if time_schedule_limit and time_schedule_limit.get('enabled'): logger.trace(f"{uuid} Time scheduler - Using scheduler settings from {scheduler_source}") try: result = is_within_schedule(time_schedule_limit=time_schedule_limit, default_tz=tz_name ) if not result: logger.trace(f"{uuid} Time scheduler - not within schedule skipping.") continue except Exception as e: logger.error( f"{uuid} - Recheck scheduler, error handling timezone, check skipped - TZ name '{tz_name}' - {str(e)}") return False # If they supplied an individual entry minutes to threshold. threshold = recheck_time_system_seconds if watch.get('time_between_check_use_default') else watch.threshold_seconds() # #580 - Jitter plus/minus amount of time to make the check seem more random to the server jitter = datastore.data['settings']['requests'].get('jitter_seconds', 0) if jitter > 0: if watch.jitter_seconds == 0: watch.jitter_seconds = random.uniform(-abs(jitter), jitter) seconds_since_last_recheck = now - watch['last_checked'] if seconds_since_last_recheck >= (threshold + watch.jitter_seconds) and seconds_since_last_recheck >= recheck_time_minimum_seconds: if not uuid in running_uuids and uuid not in [q_uuid.item['uuid'] for q_uuid in update_q.queue]: # Proxies can be set to have a limit on seconds between which they can be called watch_proxy = datastore.get_preferred_proxy_for_watch(uuid=uuid) if watch_proxy and watch_proxy in list(datastore.proxy_list.keys()): # Proxy may also have some threshold minimum proxy_list_reuse_time_minimum = int(datastore.proxy_list.get(watch_proxy, {}).get('reuse_time_minimum', 0)) if proxy_list_reuse_time_minimum: proxy_last_used_time = proxy_last_called_time.get(watch_proxy, 0) time_since_proxy_used = int(time.time() - proxy_last_used_time) if time_since_proxy_used < proxy_list_reuse_time_minimum: # Not enough time difference reached, skip this watch logger.debug(f"> Skipped UUID {uuid} " f"using proxy '{watch_proxy}', not " f"enough time between proxy requests " f"{time_since_proxy_used}s/{proxy_list_reuse_time_minimum}s") continue else: # Record the last used time proxy_last_called_time[watch_proxy] = int(time.time()) # Use Epoch time as priority, so we get a "sorted" PriorityQueue, but we can still push a priority 1 into it. priority = int(time.time()) # Into the queue with you queued_successfully = worker_handler.queue_item_async_safe(update_q, queuedWatchMetaData.PrioritizedItem(priority=priority, item={'uuid': uuid}) ) if queued_successfully: logger.debug( f"> Queued watch UUID {uuid} " f"last checked at {watch['last_checked']} " f"queued at {now:0.2f} priority {priority} " f"jitter {watch.jitter_seconds:0.2f}s, " f"{now - watch['last_checked']:0.2f}s since last checked") else: logger.critical(f"CRITICAL: Failed to queue watch UUID {uuid} in ticker thread!") # Reset for next time watch.jitter_seconds = 0 # Should be low so we can break this out in testing app.config.exit.wait(1)