diff --git a/config/runtime.exs b/config/runtime.exs index 4aa4f36a..d6d07a8e 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -393,15 +393,15 @@ config :wanderer_app, :license_manager, # SSE Configuration config :wanderer_app, :sse, - max_connections_per_map: String.to_integer(System.get_env("SSE_MAX_CONNECTIONS_PER_MAP", "50")), + enabled: System.get_env("WANDERER_SSE_ENABLED", "true") == "true", + max_connections_total: config_dir |> get_int_from_path_or_env("WANDERER_SSE_MAX_CONNECTIONS", 1000), + max_connections_per_map: config_dir |> get_int_from_path_or_env("SSE_MAX_CONNECTIONS_PER_MAP", 50), max_connections_per_api_key: - String.to_integer(System.get_env("SSE_MAX_CONNECTIONS_PER_API_KEY", "10")), - keepalive_interval: String.to_integer(System.get_env("SSE_KEEPALIVE_INTERVAL", "30000")), - connection_timeout: String.to_integer(System.get_env("SSE_CONNECTION_TIMEOUT", "300000")) + config_dir |> get_int_from_path_or_env("SSE_MAX_CONNECTIONS_PER_API_KEY", 10), + keepalive_interval: config_dir |> get_int_from_path_or_env("SSE_KEEPALIVE_INTERVAL", 30000), + connection_timeout: config_dir |> get_int_from_path_or_env("SSE_CONNECTION_TIMEOUT", 300000) # External Events Configuration config :wanderer_app, :external_events, - sse_enabled: System.get_env("WANDERER_SSE_ENABLED", "true") == "true", webhooks_enabled: System.get_env("WANDERER_WEBHOOKS_ENABLED", "true") == "true", - sse_max_connections: String.to_integer(System.get_env("WANDERER_SSE_MAX_CONNECTIONS", "1000")), - webhook_timeout_ms: String.to_integer(System.get_env("WANDERER_WEBHOOK_TIMEOUT_MS", "15000")) + webhook_timeout_ms: config_dir |> get_int_from_path_or_env("WANDERER_WEBHOOK_TIMEOUT_MS", 15000) diff --git a/lib/wanderer_app/external_events/map_event_relay.ex b/lib/wanderer_app/external_events/map_event_relay.ex index 1ee976c9..8501ce0d 100644 --- a/lib/wanderer_app/external_events/map_event_relay.ex +++ b/lib/wanderer_app/external_events/map_event_relay.ex @@ -96,21 +96,27 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do @impl true def handle_call({:get_events_since_ulid, map_id, since_ulid, limit}, _from, state) do # Get all events for this map and filter by ULID - try do - # Events are stored as {event_id, map_id, json_data} - # Filter by map_id and event_id (ULID) > since_ulid - events = - :ets.select(state.ets_table, [ - {{:"$1", :"$2", :"$3"}, - [{:andalso, {:>, :"$1", since_ulid}, {:==, :"$2", map_id}}], - [:"$3"]} - ]) - |> Enum.take(limit) + case validate_ulid(since_ulid) do + :ok -> + try do + # Events are stored as {event_id, map_id, json_data} + # Filter by map_id and event_id (ULID) > since_ulid + events = + :ets.select(state.ets_table, [ + {{:"$1", :"$2", :"$3"}, + [{:andalso, {:>, :"$1", since_ulid}, {:==, :"$2", map_id}}], + [:"$3"]} + ]) + |> Enum.take(limit) - {:reply, {:ok, events}, state} - catch - _, reason -> - {:reply, {:error, reason}, state} + {:reply, {:ok, events}, state} + rescue + error in [ArgumentError] -> + {:reply, {:error, {:ets_error, error}}, state} + end + + {:error, :invalid_ulid} -> + {:reply, {:error, :invalid_ulid}, state} end end @@ -189,6 +195,21 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do end end + defp validate_ulid(ulid) when is_binary(ulid) do + # ULID format validation: 26 characters, [0-9A-Z] excluding I, L, O, U + case byte_size(ulid) do + 26 -> + if ulid =~ ~r/^[0123456789ABCDEFGHJKMNPQRSTVWXYZ]{26}$/ do + :ok + else + {:error, :invalid_ulid} + end + _ -> + {:error, :invalid_ulid} + end + end + defp validate_ulid(_), do: {:error, :invalid_ulid} + defp cleanup_old_events(ets_table) do cutoff_time = DateTime.add(DateTime.utc_now(), -@event_retention_minutes, :minute) cutoff_ulid = datetime_to_ulid(cutoff_time) diff --git a/lib/wanderer_app/external_events/sse_stream_manager.ex b/lib/wanderer_app/external_events/sse_stream_manager.ex index 6bc944fe..29514518 100644 --- a/lib/wanderer_app/external_events/sse_stream_manager.ex +++ b/lib/wanderer_app/external_events/sse_stream_manager.ex @@ -70,11 +70,11 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do @impl true def handle_call({:add_client, map_id, api_key, client_pid, event_filter}, _from, state) do # Check if feature is enabled - unless Application.get_env(:wanderer_app, :external_events, [])[:sse_enabled] do + unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do {:reply, {:error, :sse_disabled}, state} else # Check connection limits - max_connections = Application.get_env(:wanderer_app, :external_events, [])[:sse_max_connections] || 1000 + max_connections = Application.get_env(:wanderer_app, :sse, [])[:max_connections_total] || 1000 case check_connection_limits(state, map_id, api_key, max_connections) do :ok -> diff --git a/lib/wanderer_app/external_events/webhook_dispatcher.ex b/lib/wanderer_app/external_events/webhook_dispatcher.ex index 0bb1d7b9..34b7f6ab 100644 --- a/lib/wanderer_app/external_events/webhook_dispatcher.ex +++ b/lib/wanderer_app/external_events/webhook_dispatcher.ex @@ -143,7 +143,7 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do {:ok, subscriptions} rescue # Catch specific Ash errors - error in [Ash.Error.Query.NotFound] -> + _error in [Ash.Error.Query.NotFound] -> {:ok, []} error in [Ash.Error.Invalid] -> @@ -365,9 +365,11 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do true <- map.webhooks_enabled do :ok else - false -> {:error, :webhooks_disabled} - {:error, :not_found} -> {:error, :webhooks_disabled} - _ -> {:error, :webhooks_disabled} + false -> {:error, :webhooks_globally_disabled} + {:error, :not_found} -> {:error, :map_not_found} + %{webhooks_enabled: false} -> {:error, :webhooks_disabled_for_map} + {:error, reason} -> {:error, reason} + error -> {:error, {:unexpected_error, error}} end end end \ No newline at end of file diff --git a/lib/wanderer_app_web/controllers/api/events_controller.ex b/lib/wanderer_app_web/controllers/api/events_controller.ex index 5a41e5a5..b58ba581 100644 --- a/lib/wanderer_app_web/controllers/api/events_controller.ex +++ b/lib/wanderer_app_web/controllers/api/events_controller.ex @@ -7,7 +7,7 @@ defmodule WandererAppWeb.Api.EventsController do use WandererAppWeb, :controller - alias WandererApp.ExternalEvents.{SseConnectionTracker, EventFilter, MapEventRelay} + alias WandererApp.ExternalEvents.{SseStreamManager, EventFilter, MapEventRelay} alias WandererApp.Api.Map, as: ApiMap alias WandererAppWeb.SSE alias Plug.Crypto @@ -25,36 +25,17 @@ defmodule WandererAppWeb.Api.EventsController do Logger.info("SSE stream requested for map #{map_identifier}") # Check if SSE is enabled - unless Application.get_env(:wanderer_app, :external_events, [])[:sse_enabled] do + unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do conn |> put_status(:service_unavailable) - |> json(%{error: "Server-Sent Events are disabled on this server"}) + |> put_resp_content_type("text/plain") + |> send_resp(503, "Server-Sent Events are disabled on this server") else # Validate API key and get map case validate_api_key(conn, map_identifier) do {:ok, map, api_key} -> - # Check connection limits - case SseConnectionTracker.check_limits(map.id, api_key) do - :ok -> - establish_sse_connection(conn, map.id, api_key, params) - - {:error, :map_limit_exceeded} -> - conn - |> put_status(:too_many_requests) - |> json(%{ - error: "Too many connections to this map", - code: "MAP_CONNECTION_LIMIT" - }) - - {:error, :api_key_limit_exceeded} -> - conn - |> put_status(:too_many_requests) - |> json(%{ - error: "Too many connections for this API key", - code: "API_KEY_CONNECTION_LIMIT" - }) - end + establish_sse_connection(conn, map.id, api_key, params) {:error, status, message} -> conn @@ -76,33 +57,56 @@ defmodule WandererAppWeb.Api.EventsController do conn = SSE.send_headers(conn) # Track the connection - :ok = SseConnectionTracker.track_connection(map_id, api_key, self()) - - # Send initial connection event - conn = SSE.send_event(conn, %{ - id: Ulid.generate(), - event: "connected", - data: %{ - map_id: map_id, - server_time: DateTime.utc_now() |> DateTime.to_iso8601() - } - }) - - # Handle backfill if last_event_id is provided - conn = - case Map.get(params, "last_event_id") do - nil -> - conn - - last_event_id -> - send_backfill_events(conn, map_id, last_event_id, event_filter) - end - - # Subscribe to map events - Phoenix.PubSub.subscribe(WandererApp.PubSub, "external_events:map:#{map_id}") - - # Start streaming loop - stream_events(conn, map_id, api_key, event_filter) + case SseStreamManager.add_client(map_id, api_key, self(), event_filter) do + {:ok, _} -> + # Send initial connection event + conn = SSE.send_event(conn, %{ + id: Ulid.generate(), + event: "connected", + data: %{ + map_id: map_id, + server_time: DateTime.utc_now() |> DateTime.to_iso8601() + } + }) + + # Handle backfill if last_event_id is provided + conn = + case Map.get(params, "last_event_id") do + nil -> + conn + + last_event_id -> + send_backfill_events(conn, map_id, last_event_id, event_filter) + end + + # Subscribe to map events + Phoenix.PubSub.subscribe(WandererApp.PubSub, "external_events:map:#{map_id}") + + # Start streaming loop + stream_events(conn, map_id, api_key, event_filter) + + {:error, :map_limit_exceeded} -> + conn + |> put_status(:too_many_requests) + |> json(%{ + error: "Too many connections to this map", + code: "MAP_CONNECTION_LIMIT" + }) + + {:error, :api_key_limit_exceeded} -> + conn + |> put_status(:too_many_requests) + |> json(%{ + error: "Too many connections for this API key", + code: "API_KEY_CONNECTION_LIMIT" + }) + + {:error, reason} -> + Logger.error("Failed to add SSE client: #{inspect(reason)}") + conn + |> put_status(:internal_server_error) + |> send_resp(500, "Internal server error") + end end defp send_backfill_events(conn, map_id, last_event_id, event_filter) do @@ -110,12 +114,17 @@ defmodule WandererAppWeb.Api.EventsController do {:ok, events} -> # Filter and send each event Enum.reduce(events, conn, fn event_json, acc_conn -> - event = Jason.decode!(event_json) - - if EventFilter.matches?(event["type"], event_filter) do - SSE.send_event(acc_conn, event) - else - acc_conn + case Jason.decode(event_json) do + {:ok, event} -> + if EventFilter.matches?(event["type"], event_filter) do + SSE.send_event(acc_conn, event) + else + acc_conn + end + + {:error, reason} -> + Logger.error("Failed to decode event during backfill: #{inspect(reason)}") + acc_conn end end) @@ -129,13 +138,18 @@ defmodule WandererAppWeb.Api.EventsController do receive do {:external_event, event_json} -> # Parse and check if event matches filter - event = Jason.decode!(event_json) - conn = - if EventFilter.matches?(event["type"], event_filter) do - SSE.send_event(conn, event) - else - conn + case Jason.decode(event_json) do + {:ok, event} -> + if EventFilter.matches?(event["type"], event_filter) do + SSE.send_event(conn, event) + else + conn + end + + {:error, reason} -> + Logger.error("Failed to decode event in stream: #{inspect(reason)}") + conn end # Continue streaming @@ -159,11 +173,17 @@ defmodule WandererAppWeb.Api.EventsController do stream_events(conn, map_id, api_key, event_filter) end rescue - _ -> + _error in [Plug.Conn.WrapperError, DBConnection.ConnectionError] -> # Connection closed, cleanup Logger.info("SSE connection closed for map #{map_id}") - SseConnectionTracker.remove_connection(map_id, api_key, self()) + SseStreamManager.remove_client(map_id, api_key, self()) conn + + error -> + # Log unexpected errors before cleanup + Logger.error("Unexpected error in SSE stream: #{inspect(error)}") + SseStreamManager.remove_client(map_id, api_key, self()) + reraise error, __STACKTRACE__ end defp validate_api_key(conn, map_identifier) do diff --git a/lib/wanderer_app_web/controllers/map_api_controller.ex b/lib/wanderer_app_web/controllers/map_api_controller.ex index 70d19f7d..681c844c 100644 --- a/lib/wanderer_app_web/controllers/map_api_controller.ex +++ b/lib/wanderer_app_web/controllers/map_api_controller.ex @@ -875,12 +875,18 @@ defmodule WandererAppWeb.MapAPIController do } def toggle_webhooks(conn, %{"map_id" => map_identifier, "enabled" => enabled}) do - with :ok <- check_global_webhooks_enabled(), + with {:ok, enabled_boolean} <- validate_boolean_param(enabled, "enabled"), + :ok <- check_global_webhooks_enabled(), {:ok, map} <- resolve_map_identifier(map_identifier), :ok <- check_map_owner(conn, map), - {:ok, updated_map} <- WandererApp.Api.Map.toggle_webhooks(map, %{webhooks_enabled: enabled}) do + {:ok, updated_map} <- WandererApp.Api.Map.toggle_webhooks(map, %{webhooks_enabled: enabled_boolean}) do json(conn, %{webhooks_enabled: updated_map.webhooks_enabled}) else + {:error, :invalid_boolean} -> + conn + |> put_status(:bad_request) + |> json(%{error: "The 'enabled' parameter must be a boolean value"}) + {:error, :webhooks_disabled} -> conn |> put_status(:service_unavailable) @@ -905,6 +911,11 @@ defmodule WandererAppWeb.MapAPIController do # Helper functions for webhook toggle + defp validate_boolean_param(value, _param_name) when is_boolean(value), do: {:ok, value} + defp validate_boolean_param("true", _param_name), do: {:ok, true} + defp validate_boolean_param("false", _param_name), do: {:ok, false} + defp validate_boolean_param(_, _param_name), do: {:error, :invalid_boolean} + defp check_global_webhooks_enabled do if Application.get_env(:wanderer_app, :external_events)[:webhooks_enabled] do :ok diff --git a/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex b/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex index edd5b450..5653650c 100644 --- a/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex +++ b/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex @@ -341,13 +341,18 @@ defmodule WandererAppWeb.MapWebhooksAPIController do def create(conn, %{"map_identifier" => map_identifier} = params) do # Check if webhooks are enabled - unless Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled] do + if not Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled] do conn |> put_status(:service_unavailable) |> json(%{error: "Webhooks are disabled on this server"}) else - with {:ok, map} <- get_map(conn, map_identifier), - {:ok, webhook_params} <- validate_create_params(params, map.id) do + do_create_webhook(conn, map_identifier, params) + end + end + + defp do_create_webhook(conn, map_identifier, params) do + with {:ok, map} <- get_map(conn, map_identifier), + {:ok, webhook_params} <- validate_create_params(params, map.id) do case MapWebhookSubscription.create(webhook_params) do {:ok, webhook} -> @@ -384,7 +389,6 @@ defmodule WandererAppWeb.MapWebhooksAPIController do |> put_status(:internal_server_error) |> json(%{error: "Internal server error"}) end - end end def update(conn, %{"map_identifier" => map_identifier, "id" => webhook_id} = params) do diff --git a/priv/posts/2025/06-21-webhooks.md b/priv/posts/2025/06-21-webhooks.md index f295ecfb..856e2ad6 100644 --- a/priv/posts/2025/06-21-webhooks.md +++ b/priv/posts/2025/06-21-webhooks.md @@ -56,12 +56,20 @@ const apiToken = "your-map-api-token"; // Optional: Filter specific events const eventTypes = ["add_system", "map_kill"].join(","); -const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream?events=${eventTypes}`; -const eventSource = new EventSource(url, { - headers: { - 'Authorization': `Bearer ${apiToken}` - } +// Note: Native EventSource doesn't support custom headers +// You have two options: + +// Option 1: Include the API token as a query parameter +const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream?events=${eventTypes}&token=${apiToken}`; +const eventSource = new EventSource(url); + +// Option 2: Use an EventSource polyfill that supports headers + import { EventSourcePolyfill } from 'event-source-polyfill'; + const eventSource = new EventSourcePolyfill(url, { + headers: { + 'Authorization': `Bearer ${apiToken}` + } }); // Handle connection opened @@ -107,16 +115,13 @@ const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream`; ``` ### Event Backfill -SSE supports automatic backfill using the `Last-Event-ID` header: +SSE supports automatic backfill when reconnecting: ```javascript // Reconnect with backfill from last received event -const eventSource = new EventSource(url, { - headers: { - 'Authorization': `Bearer ${apiToken}`, - 'Last-Event-ID': lastEventId // Will receive events since this ID - } -}); +// Add the last_event_id as a query parameter +const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream?token=${apiToken}&last_event_id=${lastEventId}`; +const eventSource = new EventSource(url); ``` ## Webhook Setup