diff --git a/config/runtime.exs b/config/runtime.exs index d6d07a8e..3880b9f1 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -393,7 +393,7 @@ config :wanderer_app, :license_manager, # SSE Configuration config :wanderer_app, :sse, - enabled: System.get_env("WANDERER_SSE_ENABLED", "true") == "true", + enabled: config_dir |> get_var_from_path_or_env("WANDERER_SSE_ENABLED", "true") |> String.to_existing_atom(), max_connections_total: config_dir |> get_int_from_path_or_env("WANDERER_SSE_MAX_CONNECTIONS", 1000), max_connections_per_map: config_dir |> get_int_from_path_or_env("SSE_MAX_CONNECTIONS_PER_MAP", 50), max_connections_per_api_key: @@ -403,5 +403,5 @@ config :wanderer_app, :sse, # External Events Configuration config :wanderer_app, :external_events, - webhooks_enabled: System.get_env("WANDERER_WEBHOOKS_ENABLED", "true") == "true", + webhooks_enabled: config_dir |> get_var_from_path_or_env("WANDERER_WEBHOOKS_ENABLED", "true") |> String.to_existing_atom(), webhook_timeout_ms: config_dir |> get_int_from_path_or_env("WANDERER_WEBHOOK_TIMEOUT_MS", 15000) diff --git a/lib/wanderer_app/env.ex b/lib/wanderer_app/env.ex index 1cc8b22f..85651755 100644 --- a/lib/wanderer_app/env.ex +++ b/lib/wanderer_app/env.ex @@ -48,6 +48,26 @@ defmodule WandererApp.Env do ) def restrict_maps_creation?, do: get_key(:restrict_maps_creation, false) + def sse_enabled? do + Application.get_env(@app, :sse, []) + |> Keyword.get(:enabled, false) + |> case do + :true -> true + :false -> false + _ -> false + end + end + + def webhooks_enabled? do + Application.get_env(@app, :external_events, []) + |> Keyword.get(:webhooks_enabled, false) + |> case do + :true -> true + :false -> false + _ -> false + end + end + @decorate cacheable( cache: WandererApp.Cache, key: "map-connection-auto-expire-hours" diff --git a/lib/wanderer_app/external_events/sse_stream_manager.ex b/lib/wanderer_app/external_events/sse_stream_manager.ex index 29514518..f0f0a7e1 100644 --- a/lib/wanderer_app/external_events/sse_stream_manager.ex +++ b/lib/wanderer_app/external_events/sse_stream_manager.ex @@ -58,9 +58,20 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do # Schedule periodic cleanup of dead connections schedule_cleanup() + # Read configuration once during initialization + sse_config = Application.get_env(:wanderer_app, :sse, []) + state = %{ connections: %{}, # map_id => %{api_key => [connection_info]} - monitors: %{} # pid => {map_id, api_key} + monitors: %{}, # pid => {map_id, api_key} + # Configuration + enabled: WandererApp.Env.sse_enabled?() |> then(fn + true -> :true + false -> :false + end), + max_connections_total: Keyword.get(sse_config, :max_connections_total, 1000), + max_connections_per_map: Keyword.get(sse_config, :max_connections_per_map, 50), + max_connections_per_api_key: Keyword.get(sse_config, :max_connections_per_api_key, 10) } Logger.info("SSE Stream Manager started") @@ -70,13 +81,11 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do @impl true def handle_call({:add_client, map_id, api_key, client_pid, event_filter}, _from, state) do # Check if feature is enabled - unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do + unless state.enabled == :true do {:reply, {:error, :sse_disabled}, state} else # Check connection limits - max_connections = Application.get_env(:wanderer_app, :sse, [])[:max_connections_total] || 1000 - - case check_connection_limits(state, map_id, api_key, max_connections) do + case check_connection_limits(state, map_id, api_key, state.max_connections_total) do :ok -> # Monitor the client process ref = Process.monitor(client_pid) @@ -184,18 +193,14 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do if total_connections >= max_total do {:error, :max_connections_reached} else - # Check per-map and per-API-key limits from existing SSE config - sse_config = Application.get_env(:wanderer_app, :sse, []) - max_per_map = sse_config[:max_connections_per_map] || 50 - max_per_key = sse_config[:max_connections_per_api_key] || 10 - + # Check per-map and per-API-key limits from state map_connections = count_map_connections(state, map_id) key_connections = count_api_key_connections(state, map_id, api_key) cond do - map_connections >= max_per_map -> + map_connections >= state.max_connections_per_map -> {:error, :map_connection_limit_reached} - key_connections >= max_per_key -> + key_connections >= state.max_connections_per_api_key -> {:error, :api_key_connection_limit_reached} true -> :ok diff --git a/lib/wanderer_app/external_events/webhook_dispatcher.ex b/lib/wanderer_app/external_events/webhook_dispatcher.ex index 34b7f6ab..b55a00c4 100644 --- a/lib/wanderer_app/external_events/webhook_dispatcher.ex +++ b/lib/wanderer_app/external_events/webhook_dispatcher.ex @@ -55,9 +55,16 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do # Extract the pid from the tuple returned by start_link {:ok, task_supervisor_pid} = Task.Supervisor.start_link(name: WebhookDispatcher.TaskSupervisor) + # Read configuration once during initialization + webhooks_enabled = WandererApp.Env.webhooks_enabled?() |> then(fn + true -> :true + false -> :false + end) + {:ok, %{ task_supervisor: task_supervisor_pid, - delivery_count: 0 + delivery_count: 0, + webhooks_enabled: webhooks_enabled }} end @@ -99,7 +106,7 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do defp process_webhook_delivery(map_id, events, state) do # Check if webhooks are enabled globally and for this map - case webhooks_allowed?(map_id) do + case webhooks_allowed?(map_id, state.webhooks_enabled) do :ok -> # Get active webhook subscriptions for this map case get_active_subscriptions(map_id) do @@ -116,9 +123,17 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do state end - {:error, :webhooks_disabled} -> + {:error, :webhooks_globally_disabled} -> + Logger.debug(fn -> "Webhooks globally disabled" end) + state + + {:error, :webhooks_disabled_for_map} -> Logger.debug(fn -> "Webhooks disabled for map #{map_id}" end) state + + {:error, reason} -> + Logger.debug(fn -> "Webhooks not allowed for map #{map_id}: #{inspect(reason)}" end) + state end |> Map.update(:delivery_count, length(events), &(&1 + length(events))) end @@ -359,13 +374,14 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do round(capped_delay + jitter) end - defp webhooks_allowed?(map_id) do - with true <- Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled], + defp webhooks_allowed?(map_id, webhooks_globally_enabled) do + with :true <- webhooks_globally_enabled, {:ok, map} <- WandererApp.Api.Map.by_id(map_id), true <- map.webhooks_enabled do :ok else - false -> {:error, :webhooks_globally_disabled} + :false -> {:error, :webhooks_globally_disabled} + nil -> {:error, :webhooks_globally_disabled} {:error, :not_found} -> {:error, :map_not_found} %{webhooks_enabled: false} -> {:error, :webhooks_disabled_for_map} {:error, reason} -> {:error, reason} diff --git a/lib/wanderer_app_web/controllers/api/events_controller.ex b/lib/wanderer_app_web/controllers/api/events_controller.ex index b58ba581..3e7f84d5 100644 --- a/lib/wanderer_app_web/controllers/api/events_controller.ex +++ b/lib/wanderer_app_web/controllers/api/events_controller.ex @@ -25,7 +25,7 @@ defmodule WandererAppWeb.Api.EventsController do Logger.info("SSE stream requested for map #{map_identifier}") # Check if SSE is enabled - unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do + unless WandererApp.Env.sse_enabled?() do conn |> put_status(:service_unavailable) |> put_resp_content_type("text/plain")