From 4d75b256c40a7704bdbbd56773beb69bebfbec76 Mon Sep 17 00:00:00 2001 From: guarzo Date: Tue, 1 Jul 2025 01:58:33 -0400 Subject: [PATCH] feat: support webhook and sse --- .env.example | 6 +- config/runtime.exs | 21 +- lib/wanderer_app/api/map.ex | 11 + lib/wanderer_app/application.ex | 41 +- .../external_events/event_filter.ex | 108 ++++++ .../external_events/external_events.ex | 10 +- .../external_events/map_event_relay.ex | 65 ++-- .../external_events/sse_stream_manager.ex | 353 ++++++++++++++++++ .../external_events/webhook_dispatcher.ex | 45 ++- .../channels/map_events_channel.ex | 247 ------------ lib/wanderer_app_web/channels/user_socket.ex | 64 ---- .../controllers/api/events_controller.ex | 210 +++++++++++ .../controllers/map_api_controller.ex | 99 +++++ .../map_webhooks_api_controller.ex | 11 +- lib/wanderer_app_web/endpoint.ex | 4 - lib/wanderer_app_web/router.ex | 6 + priv/posts/2025/06-21-webhooks.md | 217 +++++++---- ...0250701000000_add_map_webhooks_enabled.exs | 19 + 18 files changed, 1084 insertions(+), 453 deletions(-) create mode 100644 lib/wanderer_app/external_events/event_filter.ex create mode 100644 lib/wanderer_app/external_events/sse_stream_manager.ex delete mode 100644 lib/wanderer_app_web/channels/map_events_channel.ex delete mode 100644 lib/wanderer_app_web/channels/user_socket.ex create mode 100644 lib/wanderer_app_web/controllers/api/events_controller.ex create mode 100644 priv/repo/migrations/20250701000000_add_map_webhooks_enabled.exs diff --git a/.env.example b/.env.example index c457622b..08cab255 100644 --- a/.env.example +++ b/.env.example @@ -9,4 +9,8 @@ export WANDERER_INVITES="false" export WANDERER_PUBLIC_API_DISABLED="false" export WANDERER_CHARACTER_API_DISABLED="false" export WANDERER_KILLS_SERVICE_ENABLED="true" -export WANDERER_KILLS_BASE_URL="ws://host.docker.internal:4004" \ No newline at end of file +export WANDERER_KILLS_BASE_URL="ws://host.docker.internal:4004" +export WANDERER_SSE_ENABLED="true" +export WANDERER_WEBHOOKS_ENABLED="true" +export WANDERER_SSE_MAX_CONNECTIONS="1000" +export WANDERER_WEBHOOK_TIMEOUT_MS="15000" \ No newline at end of file diff --git a/config/runtime.exs b/config/runtime.exs index 6cd35655..4aa4f36a 100644 --- a/config/runtime.exs +++ b/config/runtime.exs @@ -72,11 +72,6 @@ map_subscriptions_enabled = |> get_var_from_path_or_env("WANDERER_MAP_SUBSCRIPTIONS_ENABLED", "false") |> String.to_existing_atom() -websocket_events_enabled = - config_dir - |> get_var_from_path_or_env("WANDERER_WEBSOCKET_EVENTS_ENABLED", "false") - |> String.to_existing_atom() - map_subscription_characters_limit = config_dir |> get_int_from_path_or_env("WANDERER_MAP_SUBSCRIPTION_CHARACTERS_LIMIT", 10_000) @@ -148,7 +143,6 @@ config :wanderer_app, wanderer_kills_service_enabled: wanderer_kills_service_enabled, wanderer_kills_base_url: wanderer_kills_base_url, map_subscriptions_enabled: map_subscriptions_enabled, - websocket_events_enabled: websocket_events_enabled, map_connection_auto_expire_hours: map_connection_auto_expire_hours, map_connection_auto_eol_hours: map_connection_auto_eol_hours, map_connection_eol_expire_timeout_mins: map_connection_eol_expire_timeout_mins, @@ -396,3 +390,18 @@ end config :wanderer_app, :license_manager, api_url: System.get_env("LM_API_URL", "http://localhost:4000"), auth_key: System.get_env("LM_AUTH_KEY") + +# SSE Configuration +config :wanderer_app, :sse, + max_connections_per_map: String.to_integer(System.get_env("SSE_MAX_CONNECTIONS_PER_MAP", "50")), + max_connections_per_api_key: + String.to_integer(System.get_env("SSE_MAX_CONNECTIONS_PER_API_KEY", "10")), + keepalive_interval: String.to_integer(System.get_env("SSE_KEEPALIVE_INTERVAL", "30000")), + connection_timeout: String.to_integer(System.get_env("SSE_CONNECTION_TIMEOUT", "300000")) + +# External Events Configuration +config :wanderer_app, :external_events, + sse_enabled: System.get_env("WANDERER_SSE_ENABLED", "true") == "true", + webhooks_enabled: System.get_env("WANDERER_WEBHOOKS_ENABLED", "true") == "true", + sse_max_connections: String.to_integer(System.get_env("WANDERER_SSE_MAX_CONNECTIONS", "1000")), + webhook_timeout_ms: String.to_integer(System.get_env("WANDERER_WEBHOOK_TIMEOUT_MS", "15000")) diff --git a/lib/wanderer_app/api/map.ex b/lib/wanderer_app/api/map.ex index 37f36a25..9b4cf75c 100644 --- a/lib/wanderer_app/api/map.ex +++ b/lib/wanderer_app/api/map.ex @@ -22,6 +22,7 @@ defmodule WandererApp.Api.Map do define(:assign_owner, action: :assign_owner) define(:mark_as_deleted, action: :mark_as_deleted) define(:update_api_key, action: :update_api_key) + define(:toggle_webhooks, action: :toggle_webhooks) define(:by_id, get_by: [:id], @@ -127,6 +128,10 @@ defmodule WandererApp.Api.Map do update :update_api_key do accept [:public_api_key] end + + update :toggle_webhooks do + accept [:webhooks_enabled] + end end attributes do @@ -185,6 +190,12 @@ defmodule WandererApp.Api.Map do allow_nil? true end + attribute :webhooks_enabled, :boolean do + default(false) + allow_nil?(false) + public?(true) + end + create_timestamp(:inserted_at) update_timestamp(:updated_at) end diff --git a/lib/wanderer_app/application.ex b/lib/wanderer_app/application.ex index 3bed8488..d1f147d1 100644 --- a/lib/wanderer_app/application.ex +++ b/lib/wanderer_app/application.ex @@ -61,7 +61,7 @@ defmodule WandererApp.Application do ] ++ maybe_start_corp_wallet_tracker(WandererApp.Env.map_subscriptions_enabled?()) ++ maybe_start_kills_services() ++ - maybe_start_websocket_services(WandererApp.Env.websocket_events_enabled?()) + maybe_start_external_events_services() opts = [strategy: :one_for_one, name: WandererApp.Supervisor] @@ -106,14 +106,37 @@ defmodule WandererApp.Application do end end - defp maybe_start_websocket_services(true) do - Logger.info("Starting WebSocket/Webhook services...") + defp maybe_start_external_events_services do + external_events_config = Application.get_env(:wanderer_app, :external_events, []) + sse_enabled = external_events_config[:sse_enabled] || false + webhooks_enabled = external_events_config[:webhooks_enabled] || false - [ - WandererApp.ExternalEvents.MapEventRelay, - WandererApp.ExternalEvents.WebhookDispatcher - ] + services = [] + + # Always include MapEventRelay if any external events are enabled + services = if sse_enabled || webhooks_enabled do + Logger.info("Starting external events system...") + [WandererApp.ExternalEvents.MapEventRelay | services] + else + services + end + + # Add WebhookDispatcher if webhooks are enabled + services = if webhooks_enabled do + Logger.info("Starting webhook dispatcher...") + [WandererApp.ExternalEvents.WebhookDispatcher | services] + else + services + end + + # Add SseStreamManager if SSE is enabled + services = if sse_enabled do + Logger.info("Starting SSE stream manager...") + [WandererApp.ExternalEvents.SseStreamManager | services] + else + services + end + + Enum.reverse(services) end - - defp maybe_start_websocket_services(_), do: [] end diff --git a/lib/wanderer_app/external_events/event_filter.ex b/lib/wanderer_app/external_events/event_filter.ex new file mode 100644 index 00000000..d11203e5 --- /dev/null +++ b/lib/wanderer_app/external_events/event_filter.ex @@ -0,0 +1,108 @@ +defmodule WandererApp.ExternalEvents.EventFilter do + @moduledoc """ + Event filtering logic for external event streams (WebSocket, SSE, webhooks). + + Handles parsing of event filters from client requests and matching events + against those filters. Supports wildcard ("*") and comma-separated event lists. + """ + + @supported_events [ + # System events + :add_system, + :deleted_system, + :system_renamed, + :system_metadata_changed, + # Connection events + :connection_added, + :connection_removed, + :connection_updated, + # Character events (existing) + :character_added, + :character_removed, + :character_updated, + # Character events (new for SSE) + :character_location_changed, + :character_online_status_changed, + :character_ship_changed, + :character_ready_status_changed, + # Signature events + :signature_added, + :signature_removed, + :signatures_updated, + # Kill events + :map_kill + ] + + @type event_type :: atom() + @type event_filter :: [event_type()] + + @doc """ + Parses event filter from client input. + + ## Examples + + iex> EventFilter.parse(nil) + [:add_system, :deleted_system, ...] # all events + + iex> EventFilter.parse("*") + [:add_system, :deleted_system, ...] # all events + + iex> EventFilter.parse("add_system,character_added") + [:add_system, :character_added] + + iex> EventFilter.parse("invalid,add_system") + [:add_system] # invalid events are filtered out + """ + @spec parse(nil | String.t()) :: event_filter() + def parse(nil), do: @supported_events + def parse("*"), do: @supported_events + def parse(""), do: @supported_events + + def parse(events) when is_binary(events) do + events + |> String.split(",") + |> Enum.map(&String.trim/1) + |> Enum.map(&to_event_atom/1) + |> Enum.filter(&(&1 in @supported_events)) + |> Enum.uniq() + end + + @doc """ + Checks if an event type matches the given filter. + + ## Examples + + iex> EventFilter.matches?(:add_system, [:add_system, :character_added]) + true + + iex> EventFilter.matches?(:map_kill, [:add_system, :character_added]) + false + """ + @spec matches?(event_type(), event_filter()) :: boolean() + def matches?(event_type, filter) when is_atom(event_type) and is_list(filter) do + event_type in filter + end + + @doc """ + Returns all supported event types. + """ + @spec supported_events() :: event_filter() + def supported_events, do: @supported_events + + @doc """ + Validates if an event type is supported. + """ + @spec valid_event?(event_type()) :: boolean() + def valid_event?(event_type) when is_atom(event_type) do + event_type in @supported_events + end + + # Helper to safely convert string to atom, returns nil for invalid atoms + defp to_event_atom(event_string) do + try do + String.to_existing_atom(event_string) + rescue + ArgumentError -> nil + end + end +end \ No newline at end of file diff --git a/lib/wanderer_app/external_events/external_events.ex b/lib/wanderer_app/external_events/external_events.ex index 8e0c8db0..f2e2441d 100644 --- a/lib/wanderer_app/external_events/external_events.ex +++ b/lib/wanderer_app/external_events/external_events.ex @@ -1,12 +1,12 @@ defmodule WandererApp.ExternalEvents do @moduledoc """ - External event system for webhook and WebSocket delivery. - - This system is completely separate from the internal Phoenix PubSub + External event system for SSE and webhook delivery. + + This system is completely separate from the internal Phoenix PubSub event system and does NOT modify any existing event flows. External events are delivered to: - - WebSocket clients via MapEventsChannel + - SSE clients via Server-Sent Events - HTTP webhooks via WebhookDispatcher ## Usage @@ -29,7 +29,7 @@ defmodule WandererApp.ExternalEvents do This does NOT affect internal PubSub or LiveView handlers. It only delivers events to: - - WebSocket clients connected to MapEventsChannel + - SSE clients via Server-Sent Events - Configured webhook endpoints ## Parameters diff --git a/lib/wanderer_app/external_events/map_event_relay.ex b/lib/wanderer_app/external_events/map_event_relay.ex index b5aa2f65..1ee976c9 100644 --- a/lib/wanderer_app/external_events/map_event_relay.ex +++ b/lib/wanderer_app/external_events/map_event_relay.ex @@ -1,13 +1,13 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do @moduledoc """ - GenServer that handles delivery of external events to WebSocket and webhook clients. + GenServer that handles delivery of external events to SSE and webhook clients. This system is completely separate from internal Phoenix PubSub and does NOT modify any existing event flows. It only handles external client delivery. Responsibilities: - Store events in ETS ring buffer for backfill - - Broadcast to external WebSocket clients (via separate topic) + - Broadcast to SSE clients - Dispatch to webhook endpoints - Provide event history for reconnecting clients @@ -36,6 +36,15 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do def get_events_since(map_id, since_datetime, limit \\ 100) do GenServer.call(__MODULE__, {:get_events_since, map_id, since_datetime, limit}) end + + @doc """ + Retrieves events since a given ULID for SSE backfill. + """ + @spec get_events_since_ulid(String.t(), String.t(), pos_integer()) :: + {:ok, [map()]} | {:error, term()} + def get_events_since_ulid(map_id, since_ulid, limit \\ 1_000) do + GenServer.call(__MODULE__, {:get_events_since_ulid, map_id, since_ulid, limit}) + end @impl true def init(_opts) do @@ -78,6 +87,32 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do events = get_events_from_ets(map_id, since_datetime, limit, state.ets_table) {:reply, events, state} end + + @impl true + def handle_call({:get_events_since_ulid, map_id, since_ulid}, from, state) do + handle_call({:get_events_since_ulid, map_id, since_ulid, 1_000}, from, state) + end + + @impl true + def handle_call({:get_events_since_ulid, map_id, since_ulid, limit}, _from, state) do + # Get all events for this map and filter by ULID + try do + # Events are stored as {event_id, map_id, json_data} + # Filter by map_id and event_id (ULID) > since_ulid + events = + :ets.select(state.ets_table, [ + {{:"$1", :"$2", :"$3"}, + [{:andalso, {:>, :"$1", since_ulid}, {:==, :"$2", map_id}}], + [:"$3"]} + ]) + |> Enum.take(limit) + + {:reply, {:ok, events}, state} + catch + _, reason -> + {:reply, {:error, reason}, state} + end + end @impl true def handle_info(:cleanup_events, state) do @@ -105,33 +140,15 @@ defmodule WandererApp.ExternalEvents.MapEventRelay do # 1. Store in ETS for backfill store_event(event, state.ets_table) - # 2. Broadcast to external WebSocket clients - # Use separate topic to avoid conflicts with internal PubSub + # 2. Convert event to JSON for delivery methods event_json = Event.to_json(event) - topic = "external_events:map:#{event.map_id}" - Logger.debug(fn -> "Broadcasting to PubSub topic: #{topic}" end) - - case Phoenix.PubSub.broadcast( - WandererApp.PubSub, - topic, - {:external_event, event_json} - ) do - :ok -> - Logger.debug(fn -> "Successfully broadcast event to topic: #{topic}" end) - - {:error, reason} -> - Logger.error("Failed to broadcast event to topic #{topic}: #{inspect(reason)}") - # Emit error telemetry - :telemetry.execute( - [:wanderer_app, :external_events, :relay, :broadcast_error], - %{count: 1}, - %{map_id: event.map_id, event_type: event.type, reason: reason} - ) - end # 3. Send to webhook subscriptions via WebhookDispatcher WebhookDispatcher.dispatch_event(event.map_id, event) + # 4. Broadcast to SSE clients + WandererApp.ExternalEvents.SseStreamManager.broadcast_event(event.map_id, event_json) + # Emit delivered telemetry :telemetry.execute( [:wanderer_app, :external_events, :relay, :delivered], diff --git a/lib/wanderer_app/external_events/sse_stream_manager.ex b/lib/wanderer_app/external_events/sse_stream_manager.ex new file mode 100644 index 00000000..6bc944fe --- /dev/null +++ b/lib/wanderer_app/external_events/sse_stream_manager.ex @@ -0,0 +1,353 @@ +defmodule WandererApp.ExternalEvents.SseStreamManager do + @moduledoc """ + Manages Server-Sent Events (SSE) connections for maps. + + This GenServer tracks active SSE connections, enforces connection limits, + and broadcasts events to connected clients. + + Connection state is stored as: + %{ + map_id => %{ + api_key => [%{pid: pid, event_filter: filter, connected_at: datetime}, ...] + } + } + """ + + use GenServer + require Logger + + @cleanup_interval :timer.minutes(5) + + def start_link(opts \\ []) do + GenServer.start_link(__MODULE__, opts, name: __MODULE__) + end + + @doc """ + Broadcasts an event to all SSE clients connected to a map. + """ + def broadcast_event(map_id, event_json) do + GenServer.cast(__MODULE__, {:broadcast_event, map_id, event_json}) + end + + @doc """ + Adds a new SSE client connection. + Returns {:ok, ref} on success, {:error, reason} on failure. + """ + def add_client(map_id, api_key, client_pid, event_filter \\ :all) do + GenServer.call(__MODULE__, {:add_client, map_id, api_key, client_pid, event_filter}) + end + + @doc """ + Removes a client connection. + """ + def remove_client(map_id, api_key, client_pid) do + GenServer.cast(__MODULE__, {:remove_client, map_id, api_key, client_pid}) + end + + @doc """ + Gets connection stats for monitoring. + """ + def get_stats do + GenServer.call(__MODULE__, :get_stats) + end + + # GenServer callbacks + + @impl true + def init(_opts) do + # Schedule periodic cleanup of dead connections + schedule_cleanup() + + state = %{ + connections: %{}, # map_id => %{api_key => [connection_info]} + monitors: %{} # pid => {map_id, api_key} + } + + Logger.info("SSE Stream Manager started") + {:ok, state} + end + + @impl true + def handle_call({:add_client, map_id, api_key, client_pid, event_filter}, _from, state) do + # Check if feature is enabled + unless Application.get_env(:wanderer_app, :external_events, [])[:sse_enabled] do + {:reply, {:error, :sse_disabled}, state} + else + # Check connection limits + max_connections = Application.get_env(:wanderer_app, :external_events, [])[:sse_max_connections] || 1000 + + case check_connection_limits(state, map_id, api_key, max_connections) do + :ok -> + # Monitor the client process + ref = Process.monitor(client_pid) + + # Add connection to state + connection_info = %{ + pid: client_pid, + event_filter: event_filter, + connected_at: DateTime.utc_now(), + ref: ref + } + + new_state = add_connection_to_state(state, map_id, api_key, connection_info) + + Logger.debug("SSE client added: map=#{map_id}, api_key=#{String.slice(api_key, 0..7)}..., pid=#{inspect(client_pid)}") + + {:reply, {:ok, ref}, new_state} + + {:error, reason} -> + {:reply, {:error, reason}, state} + end + end + end + + @impl true + def handle_call(:get_stats, _from, state) do + total_connections = + state.connections + |> Enum.flat_map(fn {_map_id, api_keys} -> + Enum.flat_map(api_keys, fn {_api_key, connections} -> connections end) + end) + |> length() + + stats = %{ + total_connections: total_connections, + maps_with_connections: map_size(state.connections), + connections_by_map: + state.connections + |> Enum.map(fn {map_id, api_keys} -> + count = api_keys |> Enum.flat_map(fn {_, conns} -> conns end) |> length() + {map_id, count} + end) + |> Enum.into(%{}) + } + + {:reply, stats, state} + end + + @impl true + def handle_cast({:broadcast_event, map_id, event_json}, state) do + # Get all connections for this map + connections = get_map_connections(state, map_id) + + # Send event to each connection that should receive it + Enum.each(connections, fn connection_info -> + if should_send_event?(event_json, connection_info.event_filter) do + send_sse_event(connection_info.pid, event_json) + end + end) + + Logger.debug("Broadcast SSE event to #{length(connections)} clients for map #{map_id}") + + {:noreply, state} + end + + @impl true + def handle_cast({:remove_client, map_id, api_key, client_pid}, state) do + new_state = remove_connection_from_state(state, map_id, api_key, client_pid) + Logger.debug("SSE client removed: map=#{map_id}, api_key=#{String.slice(api_key, 0..7)}..., pid=#{inspect(client_pid)}") + {:noreply, new_state} + end + + @impl true + def handle_info({:DOWN, _ref, :process, pid, _reason}, state) do + # Handle client process termination + case Map.get(state.monitors, pid) do + {map_id, api_key} -> + new_state = remove_connection_from_state(state, map_id, api_key, pid) + Logger.debug("SSE client process terminated: map=#{map_id}, pid=#{inspect(pid)}") + {:noreply, new_state} + + nil -> + {:noreply, state} + end + end + + @impl true + def handle_info(:cleanup_connections, state) do + new_state = cleanup_dead_connections(state) + schedule_cleanup() + {:noreply, new_state} + end + + @impl true + def handle_info(msg, state) do + Logger.warning("SSE Stream Manager received unexpected message: #{inspect(msg)}") + {:noreply, state} + end + + # Private helper functions + + defp check_connection_limits(state, map_id, api_key, max_total) do + # Check total server connections + total_connections = count_total_connections(state) + if total_connections >= max_total do + {:error, :max_connections_reached} + else + # Check per-map and per-API-key limits from existing SSE config + sse_config = Application.get_env(:wanderer_app, :sse, []) + max_per_map = sse_config[:max_connections_per_map] || 50 + max_per_key = sse_config[:max_connections_per_api_key] || 10 + + map_connections = count_map_connections(state, map_id) + key_connections = count_api_key_connections(state, map_id, api_key) + + cond do + map_connections >= max_per_map -> + {:error, :map_connection_limit_reached} + key_connections >= max_per_key -> + {:error, :api_key_connection_limit_reached} + true -> + :ok + end + end + end + + defp count_total_connections(state) do + state.connections + |> Enum.flat_map(fn {_map_id, api_keys} -> + Enum.flat_map(api_keys, fn {_api_key, connections} -> connections end) + end) + |> length() + end + + defp count_map_connections(state, map_id) do + case Map.get(state.connections, map_id) do + nil -> 0 + api_keys -> + api_keys + |> Enum.flat_map(fn {_api_key, connections} -> connections end) + |> length() + end + end + + defp count_api_key_connections(state, map_id, api_key) do + state.connections + |> get_in([map_id, api_key]) + |> case do + nil -> 0 + connections -> length(connections) + end + end + + defp add_connection_to_state(state, map_id, api_key, connection_info) do + # Add to monitors + monitors = Map.put(state.monitors, connection_info.pid, {map_id, api_key}) + + # Add to connections + connections = + state.connections + |> Map.put_new(map_id, %{}) + |> put_in([map_id, api_key], + get_in(state.connections, [map_id, api_key]) + |> case do + nil -> [connection_info] + existing -> [connection_info | existing] + end) + + %{state | connections: connections, monitors: monitors} + end + + defp remove_connection_from_state(state, map_id, api_key, client_pid) do + # Remove from monitors + monitors = Map.delete(state.monitors, client_pid) + + # Remove from connections + connections = + case get_in(state.connections, [map_id, api_key]) do + nil -> + state.connections + + existing_connections -> + updated_connections = Enum.reject(existing_connections, &(&1.pid == client_pid)) + + # Clean up empty structures + if updated_connections == [] do + api_keys = Map.delete(state.connections[map_id], api_key) + if api_keys == %{} do + Map.delete(state.connections, map_id) + else + Map.put(state.connections, map_id, api_keys) + end + else + put_in(state.connections, [map_id, api_key], updated_connections) + end + end + + %{state | connections: connections, monitors: monitors} + end + + defp get_map_connections(state, map_id) do + case Map.get(state.connections, map_id) do + nil -> [] + api_keys -> + api_keys + |> Enum.flat_map(fn {_api_key, connections} -> connections end) + end + end + + defp send_sse_event(client_pid, event_json) do + try do + send(client_pid, {:sse_event, event_json}) + catch + :error, :badarg -> + # Process is dead, ignore + :ok + end + end + + defp should_send_event?(_event_json, :all), do: true + defp should_send_event?(event_json, event_filter) when is_list(event_filter) do + # Extract event type from JSON + case event_json do + %{"type" => type} when is_binary(type) -> + try do + atom_type = String.to_existing_atom(type) + atom_type in event_filter + rescue + ArgumentError -> false + end + %{"type" => type} when is_atom(type) -> + type in event_filter + _ -> + false + end + end + defp should_send_event?(_event_json, _filter), do: true + + defp cleanup_dead_connections(state) do + # Remove connections for dead processes + alive_connections = + state.connections + |> Enum.map(fn {map_id, api_keys} -> + alive_api_keys = + api_keys + |> Enum.map(fn {api_key, connections} -> + alive_conns = Enum.filter(connections, &Process.alive?(&1.pid)) + {api_key, alive_conns} + end) + |> Enum.reject(fn {_api_key, connections} -> connections == [] end) + |> Enum.into(%{}) + + {map_id, alive_api_keys} + end) + |> Enum.reject(fn {_map_id, api_keys} -> api_keys == %{} end) + |> Enum.into(%{}) + + # Update monitors to match alive connections + alive_monitors = + alive_connections + |> Enum.flat_map(fn {map_id, api_keys} -> + Enum.flat_map(api_keys, fn {api_key, connections} -> + Enum.map(connections, fn conn -> {conn.pid, {map_id, api_key}} end) + end) + end) + |> Enum.into(%{}) + + %{state | connections: alive_connections, monitors: alive_monitors} + end + + defp schedule_cleanup do + Process.send_after(self(), :cleanup_connections, @cleanup_interval) + end +end \ No newline at end of file diff --git a/lib/wanderer_app/external_events/webhook_dispatcher.ex b/lib/wanderer_app/external_events/webhook_dispatcher.ex index e018dca9..0bb1d7b9 100644 --- a/lib/wanderer_app/external_events/webhook_dispatcher.ex +++ b/lib/wanderer_app/external_events/webhook_dispatcher.ex @@ -98,20 +98,29 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do end defp process_webhook_delivery(map_id, events, state) do - # Get active webhook subscriptions for this map - case get_active_subscriptions(map_id) do - {:ok, [_ | _] = subscriptions} -> - Logger.debug(fn -> "Found #{length(subscriptions)} active webhook subscriptions for map #{map_id}" end) - process_active_subscriptions(subscriptions, events, state) + # Check if webhooks are enabled globally and for this map + case webhooks_allowed?(map_id) do + :ok -> + # Get active webhook subscriptions for this map + case get_active_subscriptions(map_id) do + {:ok, [_ | _] = subscriptions} -> + Logger.debug(fn -> "Found #{length(subscriptions)} active webhook subscriptions for map #{map_id}" end) + process_active_subscriptions(subscriptions, events, state) + + {:ok, []} -> + Logger.debug(fn -> "No webhook subscriptions found for map #{map_id}" end) + state + + {:error, reason} -> + Logger.error("Failed to get webhook subscriptions for map #{map_id}: #{inspect(reason)}") + state + end - {:ok, []} -> - Logger.debug(fn -> "No webhook subscriptions found for map #{map_id}" end) - - {:error, reason} -> - Logger.error("Failed to get webhook subscriptions for map #{map_id}: #{inspect(reason)}") + {:error, :webhooks_disabled} -> + Logger.debug(fn -> "Webhooks disabled for map #{map_id}" end) + state end - - %{state | delivery_count: state.delivery_count + length(events)} + |> Map.update(:delivery_count, length(events), &(&1 + length(events))) end defp process_active_subscriptions(subscriptions, events, state) do @@ -349,4 +358,16 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do round(capped_delay + jitter) end + + defp webhooks_allowed?(map_id) do + with true <- Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled], + {:ok, map} <- WandererApp.Api.Map.by_id(map_id), + true <- map.webhooks_enabled do + :ok + else + false -> {:error, :webhooks_disabled} + {:error, :not_found} -> {:error, :webhooks_disabled} + _ -> {:error, :webhooks_disabled} + end + end end \ No newline at end of file diff --git a/lib/wanderer_app_web/channels/map_events_channel.ex b/lib/wanderer_app_web/channels/map_events_channel.ex deleted file mode 100644 index 5b098ed3..00000000 --- a/lib/wanderer_app_web/channels/map_events_channel.ex +++ /dev/null @@ -1,247 +0,0 @@ -defmodule WandererAppWeb.MapEventsChannel do - @moduledoc """ - WebSocket channel for external map events. - - This channel delivers events from the external event system to WebSocket clients. - It uses separate topics from the internal PubSub system to avoid conflicts. - - ## Topic Format - - Clients subscribe to: `external_events:map:MAP_ID` - - ## Usage - - ```javascript - // Connect with API key authentication - const socket = new Phoenix.Socket("/socket/websocket", { - params: { api_key: "your_map_api_key_here" } - }) - socket.connect() - - const channel = socket.channel("external_events:map:123", {}) - channel.join() - .receive("ok", resp => { console.log("Joined successfully", resp) }) - .receive("error", resp => { console.log("Unable to join", resp) }) - - channel.on("external_event", payload => { - console.log("Received event:", payload) - }) - ``` - """ - - use WandererAppWeb, :channel - - require Logger - - # Log when module is loaded - Logger.info("MapEventsChannel module loaded") - - @impl true - def join("external_events:map:" <> map_id, payload, socket) do - Logger.info("Attempting to join external events channel for map: #{map_id}") - - with {:ok, api_key} <- get_api_key(socket), - {:ok, map} <- validate_map_access(map_id, api_key) do - handle_successful_join(map_id, map, payload, socket) - else - {:error, :missing_api_key} -> - Logger.warning("WebSocket join failed: missing API key") - {:error, %{reason: "Authentication required. Provide api_key parameter."}} - - {:error, :map_not_found} -> - Logger.warning("WebSocket join failed: map not found - #{map_id}") - {:error, %{reason: "Map not found"}} - - {:error, :unauthorized} -> - Logger.warning("WebSocket join failed: unauthorized for map #{map_id}") - {:error, %{reason: "Unauthorized: Invalid API key for this map"}} - - error -> - Logger.error("WebSocket join failed: #{inspect(error)}") - {:error, %{reason: "Authentication failed"}} - end - end - - def join(topic, _payload, _socket) do - Logger.warning("Attempted to join invalid external events topic: #{topic}") - {:error, %{reason: "Invalid topic format. Use: external_events:map:MAP_ID"}} - end - - defp handle_successful_join(map_id, map, payload, socket) do - Logger.info("Client authenticated and joined external events for map #{map_id}") - - # Parse event filters from join payload - event_filter = parse_event_filter(payload) - Logger.debug(fn -> "Event filter: #{inspect(event_filter)}" end) - - # Subscribe to external events for this map - topic = "external_events:map:#{map_id}" - Phoenix.PubSub.subscribe(WandererApp.PubSub, topic) - Logger.debug(fn -> "Subscribed to PubSub topic: #{topic}" end) - - # Store map information and event filter in socket assigns - socket = socket - |> assign(:map_id, map_id) - |> assign(:map, map) - |> assign(:event_filter, event_filter) - - # Send initial connection acknowledgment - {:ok, %{status: "connected", map_id: map_id, map_name: map.name, event_filter: event_filter}, socket} - end - - @impl true - def handle_info({:external_event, event}, socket) do - # Check if this event should be sent based on the client's filter - if should_send_event?(event, socket.assigns[:event_filter]) do - # Forward external events to WebSocket clients - # The event is a map that needs to be sent directly - push(socket, "external_event", event) - end - {:noreply, socket} - end - - @impl true - def handle_info(_msg, socket) do - # Silently ignore other messages - this can happen when multiple - # channels are subscribed to the same PubSub topic - {:noreply, socket} - end - - @impl true - def handle_in("ping", payload, socket) do - # Simple ping/pong for client heartbeat testing - {:reply, {:ok, %{pong: payload}}, socket} - end - - @impl true - def handle_in(event, payload, socket) do - Logger.debug(fn -> "Unhandled incoming event: #{event} with payload: #{inspect(payload)}" end) - {:noreply, socket} - end - - @impl true - def terminate(reason, socket) do - map_id = socket.assigns[:map_id] - Logger.debug(fn -> "Client disconnected from external events for map #{map_id}, reason: #{inspect(reason)}" end) - :ok - end - - # Private helper functions for authentication - - defp get_api_key(socket) do - case socket.assigns[:api_key] do - api_key when is_binary(api_key) and api_key != "" -> - {:ok, api_key} - _ -> - {:error, :missing_api_key} - end - end - - defp validate_map_access(map_id, api_key) do - Logger.debug(fn -> "Validating map access for map_id: #{map_id}" end) - alias WandererApp.Api.Map, as: ApiMap - alias Plug.Crypto - - case resolve_map_identifier(map_id) do - {:ok, map} -> - Logger.info("Map found: #{map.name}, checking API key...") - Logger.info("Map public_api_key present: #{not is_nil(map.public_api_key)}") - Logger.info("Provided API key: #{String.slice(api_key, 0..7)}...") - - if is_binary(map.public_api_key) && - Crypto.secure_compare(map.public_api_key, api_key) do - Logger.info("API key matches, access granted") - {:ok, map} - else - Logger.info("API key mismatch or invalid") - Logger.info("Map has public_api_key: #{is_binary(map.public_api_key)}") - {:error, :unauthorized} - end - {:error, :not_found} -> - Logger.debug(fn -> "Map not found" end) - {:error, :map_not_found} - error -> - Logger.error("Map validation error: #{inspect(error)}") - {:error, :validation_failed} - end - end - - # Try to resolve map identifier - could be map_id or slug - defp resolve_map_identifier(identifier) do - Logger.debug(fn -> "Resolving map identifier: #{identifier}" end) - alias WandererApp.Api.Map, as: ApiMap - - # Try ID lookup first - Logger.debug(fn -> "Trying ID lookup..." end) - - case ApiMap.by_id(identifier) do - {:ok, map} -> - Logger.debug(fn -> "Found by ID: #{map.name}" end) - {:ok, map} - - error -> - Logger.debug(fn -> "ID lookup failed: #{inspect(error)}, trying slug lookup..." end) - resolve_by_slug(identifier) - end - end - - defp resolve_by_slug(identifier) do - alias WandererApp.Api.Map, as: ApiMap - - case ApiMap.get_map_by_slug(identifier) do - {:ok, map} -> - Logger.debug(fn -> "Found by slug: #{map.name}" end) - {:ok, map} - - error -> - Logger.debug(fn -> "Slug lookup failed: #{inspect(error)}" end) - {:error, :not_found} - end - end - - # Event filtering helper functions - - defp parse_event_filter(%{"events" => events}) when is_list(events) do - # Convert string event types to atoms and validate them - events - |> Enum.map(&parse_event_type/1) - |> Enum.filter(& &1) # Remove nil values from invalid event types - |> case do - [] -> :all # If no valid events specified, default to all - valid_events -> valid_events - end - end - - defp parse_event_filter(%{"events" => "*"}), do: :all - defp parse_event_filter(%{"events" => ["*"]}), do: :all - defp parse_event_filter(_), do: :all # Default to all events if no filter specified - - defp parse_event_type(event_type) when is_binary(event_type) do - alias WandererApp.ExternalEvents.Event - - # Convert string to atom if it's a valid event type - try do - atom = String.to_existing_atom(event_type) - if Event.valid_event_type?(atom), do: atom, else: nil - rescue - ArgumentError -> nil - end - end - - defp parse_event_type(_), do: nil - - defp should_send_event?(_event, :all), do: true - - defp should_send_event?(event, event_filter) when is_list(event_filter) do - # Extract event type from the event map - event_type = case event do - %{"type" => type} when is_binary(type) -> String.to_existing_atom(type) - %{"type" => type} when is_atom(type) -> type - _ -> nil - end - - event_type in event_filter - end - - defp should_send_event?(_event, _filter), do: true # Default to sending if filter format is unexpected -end \ No newline at end of file diff --git a/lib/wanderer_app_web/channels/user_socket.ex b/lib/wanderer_app_web/channels/user_socket.ex deleted file mode 100644 index b8cb8f0e..00000000 --- a/lib/wanderer_app_web/channels/user_socket.ex +++ /dev/null @@ -1,64 +0,0 @@ -defmodule WandererAppWeb.UserSocket do - use Phoenix.Socket - require Logger - - # External events channel for webhooks/WebSocket delivery - channel "external_events:map:*", WandererAppWeb.MapEventsChannel - - @impl true - def connect(params, socket, connect_info) do - # Check if websocket events are enabled - unless WandererApp.Env.websocket_events_enabled?() do - remote_ip = get_remote_ip(connect_info) - Logger.info("WebSocket connection rejected - websocket events disabled from #{remote_ip}") - :error - else - - # Extract API key from connection params - # Client should connect with: /socket/websocket?api_key= - - # Log connection attempt for security auditing - remote_ip = get_remote_ip(connect_info) - Logger.info("WebSocket connection attempt from #{remote_ip}") - - case params["api_key"] do - api_key when is_binary(api_key) and api_key != "" -> - # Store the API key in socket assigns for channel authentication - # Full validation happens in channel join where we have the map context - socket = socket - |> assign(:api_key, api_key) - |> assign(:remote_ip, remote_ip) - - Logger.info("WebSocket connection accepted from #{remote_ip}, pending channel authentication") - {:ok, socket} - - _ -> - # Require API key for external events - Logger.warning("WebSocket connection rejected - missing API key from #{remote_ip}") - :error - end - end - end - - # Extract remote IP from connection info - defp get_remote_ip(connect_info) do - case connect_info do - %{peer_data: %{address: {a, b, c, d}}} -> - "#{a}.#{b}.#{c}.#{d}" - - %{x_headers: headers} -> - # Check for X-Forwarded-For or X-Real-IP headers (for proxied connections) - Enum.find_value(headers, "unknown", fn - {"x-forwarded-for", ip} -> String.split(ip, ",") |> List.first() |> String.trim() - {"x-real-ip", ip} -> ip - _ -> nil - end) - - _ -> - "unknown" - end - end - - @impl true - def id(_socket), do: nil -end diff --git a/lib/wanderer_app_web/controllers/api/events_controller.ex b/lib/wanderer_app_web/controllers/api/events_controller.ex new file mode 100644 index 00000000..5a41e5a5 --- /dev/null +++ b/lib/wanderer_app_web/controllers/api/events_controller.ex @@ -0,0 +1,210 @@ +defmodule WandererAppWeb.Api.EventsController do + @moduledoc """ + Controller for Server-Sent Events (SSE) streaming. + + Provides real-time event streaming for map updates to external clients. + """ + + use WandererAppWeb, :controller + + alias WandererApp.ExternalEvents.{SseConnectionTracker, EventFilter, MapEventRelay} + alias WandererApp.Api.Map, as: ApiMap + alias WandererAppWeb.SSE + alias Plug.Crypto + + require Logger + + @doc """ + Establishes an SSE connection for streaming map events. + + Query parameters: + - events: Comma-separated list of event types to filter (optional) + - last_event_id: ULID of last received event for backfill (optional) + """ + def stream(conn, %{"map_identifier" => map_identifier} = params) do + Logger.info("SSE stream requested for map #{map_identifier}") + + # Check if SSE is enabled + unless Application.get_env(:wanderer_app, :external_events, [])[:sse_enabled] do + conn + |> put_status(:service_unavailable) + |> json(%{error: "Server-Sent Events are disabled on this server"}) + else + + # Validate API key and get map + case validate_api_key(conn, map_identifier) do + {:ok, map, api_key} -> + # Check connection limits + case SseConnectionTracker.check_limits(map.id, api_key) do + :ok -> + establish_sse_connection(conn, map.id, api_key, params) + + {:error, :map_limit_exceeded} -> + conn + |> put_status(:too_many_requests) + |> json(%{ + error: "Too many connections to this map", + code: "MAP_CONNECTION_LIMIT" + }) + + {:error, :api_key_limit_exceeded} -> + conn + |> put_status(:too_many_requests) + |> json(%{ + error: "Too many connections for this API key", + code: "API_KEY_CONNECTION_LIMIT" + }) + end + + {:error, status, message} -> + conn + |> put_status(status) + |> json(%{error: message}) + end + end + end + + defp establish_sse_connection(conn, map_id, api_key, params) do + # Parse event filter if provided + event_filter = + case Map.get(params, "events") do + nil -> :all + events -> EventFilter.parse(events) + end + + # Send SSE headers + conn = SSE.send_headers(conn) + + # Track the connection + :ok = SseConnectionTracker.track_connection(map_id, api_key, self()) + + # Send initial connection event + conn = SSE.send_event(conn, %{ + id: Ulid.generate(), + event: "connected", + data: %{ + map_id: map_id, + server_time: DateTime.utc_now() |> DateTime.to_iso8601() + } + }) + + # Handle backfill if last_event_id is provided + conn = + case Map.get(params, "last_event_id") do + nil -> + conn + + last_event_id -> + send_backfill_events(conn, map_id, last_event_id, event_filter) + end + + # Subscribe to map events + Phoenix.PubSub.subscribe(WandererApp.PubSub, "external_events:map:#{map_id}") + + # Start streaming loop + stream_events(conn, map_id, api_key, event_filter) + end + + defp send_backfill_events(conn, map_id, last_event_id, event_filter) do + case MapEventRelay.get_events_since_ulid(map_id, last_event_id) do + {:ok, events} -> + # Filter and send each event + Enum.reduce(events, conn, fn event_json, acc_conn -> + event = Jason.decode!(event_json) + + if EventFilter.matches?(event["type"], event_filter) do + SSE.send_event(acc_conn, event) + else + acc_conn + end + end) + + {:error, reason} -> + Logger.error("Failed to backfill events: #{inspect(reason)}") + conn + end + end + + defp stream_events(conn, map_id, api_key, event_filter) do + receive do + {:external_event, event_json} -> + # Parse and check if event matches filter + event = Jason.decode!(event_json) + + conn = + if EventFilter.matches?(event["type"], event_filter) do + SSE.send_event(conn, event) + else + conn + end + + # Continue streaming + stream_events(conn, map_id, api_key, event_filter) + + :keepalive -> + # Send keepalive + conn = SSE.send_keepalive(conn) + + # Continue streaming + stream_events(conn, map_id, api_key, event_filter) + + _ -> + # Unknown message, continue + stream_events(conn, map_id, api_key, event_filter) + + after + 30_000 -> + # Send keepalive every 30 seconds + conn = SSE.send_keepalive(conn) + stream_events(conn, map_id, api_key, event_filter) + end + rescue + _ -> + # Connection closed, cleanup + Logger.info("SSE connection closed for map #{map_id}") + SseConnectionTracker.remove_connection(map_id, api_key, self()) + conn + end + + defp validate_api_key(conn, map_identifier) do + with ["Bearer " <> token] <- get_req_header(conn, "authorization"), + {:ok, map} <- resolve_map(map_identifier), + true <- is_binary(map.public_api_key) && + Crypto.secure_compare(map.public_api_key, token) + do + {:ok, map, token} + else + [] -> + Logger.warning("Missing or invalid 'Bearer' token") + {:error, :unauthorized, "Missing or invalid 'Bearer' token"} + + {:error, :not_found} -> + Logger.warning("Map not found: #{map_identifier}") + {:error, :not_found, "Map not found"} + + false -> + Logger.warning("Unauthorized: invalid token for map #{map_identifier}") + {:error, :unauthorized, "Unauthorized (invalid token for map)"} + + error -> + Logger.error("Unexpected error validating API key: #{inspect(error)}") + {:error, :internal_server_error, "Unexpected error"} + end + end + + defp resolve_map(identifier) do + case ApiMap.by_id(identifier) do + {:ok, map} -> + {:ok, map} + + _ -> + case ApiMap.get_map_by_slug(identifier) do + {:ok, map} -> + {:ok, map} + + _ -> + {:error, :not_found} + end + end + end +end \ No newline at end of file diff --git a/lib/wanderer_app_web/controllers/map_api_controller.ex b/lib/wanderer_app_web/controllers/map_api_controller.ex index 506a0d2e..70d19f7d 100644 --- a/lib/wanderer_app_web/controllers/map_api_controller.ex +++ b/lib/wanderer_app_web/controllers/map_api_controller.ex @@ -833,4 +833,103 @@ defmodule WandererAppWeb.MapAPIController do |> json(%{error: "Could not fetch connections: #{APIUtils.format_error(reason)}"}) end end + + @doc """ + Toggle webhooks for a map. + """ + operation :toggle_webhooks, + summary: "Toggle webhooks for a map", + parameters: [ + map_id: [ + in: :path, + schema: %OpenApiSpex.Schema{type: :string}, + required: true, + description: "Map identifier (slug or ID)" + ] + ], + request_body: { + "Webhook toggle request", + "application/json", + %OpenApiSpex.Schema{ + type: :object, + properties: %{ + enabled: %OpenApiSpex.Schema{type: :boolean, description: "Enable or disable webhooks"} + }, + required: ["enabled"] + } + }, + responses: %{ + 200 => { + "Webhook status updated", + "application/json", + %OpenApiSpex.Schema{ + type: :object, + properties: %{ + webhooks_enabled: %OpenApiSpex.Schema{type: :boolean} + } + } + }, + 400 => ResponseSchemas.bad_request(), + 404 => ResponseSchemas.not_found(), + 503 => ResponseSchemas.internal_server_error("Service unavailable") + } + + def toggle_webhooks(conn, %{"map_id" => map_identifier, "enabled" => enabled}) do + with :ok <- check_global_webhooks_enabled(), + {:ok, map} <- resolve_map_identifier(map_identifier), + :ok <- check_map_owner(conn, map), + {:ok, updated_map} <- WandererApp.Api.Map.toggle_webhooks(map, %{webhooks_enabled: enabled}) do + json(conn, %{webhooks_enabled: updated_map.webhooks_enabled}) + else + {:error, :webhooks_disabled} -> + conn + |> put_status(:service_unavailable) + |> json(%{error: "Webhooks are disabled on this server"}) + + {:error, :map_not_found} -> + conn + |> put_status(:not_found) + |> json(%{error: "Map not found"}) + + {:error, :unauthorized} -> + conn + |> put_status(:forbidden) + |> json(%{error: "Only the map owner can toggle webhooks"}) + + {:error, reason} -> + conn + |> put_status(:bad_request) + |> json(%{error: "Failed to update webhook settings: #{APIUtils.format_error(reason)}"}) + end + end + + # Helper functions for webhook toggle + + defp check_global_webhooks_enabled do + if Application.get_env(:wanderer_app, :external_events)[:webhooks_enabled] do + :ok + else + {:error, :webhooks_disabled} + end + end + + defp resolve_map_identifier(identifier) do + case WandererApp.Api.Map.by_id(identifier) do + {:ok, map} -> {:ok, map} + {:error, _} -> + case WandererApp.Api.Map.get_map_by_slug(identifier) do + {:ok, map} -> {:ok, map} + {:error, _} -> {:error, :map_not_found} + end + end + end + + defp check_map_owner(conn, map) do + current_user = conn.assigns[:current_character] + if current_user && current_user.id == map.owner_id do + :ok + else + {:error, :unauthorized} + end + end end diff --git a/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex b/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex index 546b1082..edd5b450 100644 --- a/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex +++ b/lib/wanderer_app_web/controllers/map_webhooks_api_controller.ex @@ -340,8 +340,14 @@ defmodule WandererAppWeb.MapWebhooksAPIController do end def create(conn, %{"map_identifier" => map_identifier} = params) do - with {:ok, map} <- get_map(conn, map_identifier), - {:ok, webhook_params} <- validate_create_params(params, map.id) do + # Check if webhooks are enabled + unless Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled] do + conn + |> put_status(:service_unavailable) + |> json(%{error: "Webhooks are disabled on this server"}) + else + with {:ok, map} <- get_map(conn, map_identifier), + {:ok, webhook_params} <- validate_create_params(params, map.id) do case MapWebhookSubscription.create(webhook_params) do {:ok, webhook} -> @@ -378,6 +384,7 @@ defmodule WandererAppWeb.MapWebhooksAPIController do |> put_status(:internal_server_error) |> json(%{error: "Internal server error"}) end + end end def update(conn, %{"map_identifier" => map_identifier, "id" => webhook_id} = params) do diff --git a/lib/wanderer_app_web/endpoint.ex b/lib/wanderer_app_web/endpoint.ex index f9c6505d..425e6965 100644 --- a/lib/wanderer_app_web/endpoint.ex +++ b/lib/wanderer_app_web/endpoint.ex @@ -42,10 +42,6 @@ defmodule WandererAppWeb.Endpoint do socket "/live", Phoenix.LiveView.Socket, websocket: [compress: true, connect_info: [session: @session_options]] - socket "/socket", WandererAppWeb.UserSocket, - websocket: true, - longpoll: false - plug PhoenixDDoS # Serve at "/" the static files from "priv/static" directory. diff --git a/lib/wanderer_app_web/router.ex b/lib/wanderer_app_web/router.ex index 6a3c6f95..779dd3b6 100644 --- a/lib/wanderer_app_web/router.ex +++ b/lib/wanderer_app_web/router.ex @@ -230,6 +230,9 @@ defmodule WandererAppWeb.Router do scope "/api/maps/:map_identifier", WandererAppWeb do pipe_through [:api, :api_map] + # SSE endpoint for real-time events + get "/events/stream", Api.EventsController, :stream + patch "/connections", MapConnectionAPIController, :update delete "/connections", MapConnectionAPIController, :delete delete "/systems", MapSystemAPIController, :delete @@ -256,6 +259,9 @@ defmodule WandererAppWeb.Router do resources "/webhooks", MapWebhooksAPIController, except: [:new, :edit] do post "/rotate-secret", MapWebhooksAPIController, :rotate_secret end + + # Webhook control endpoint + put "/webhooks/toggle", MapAPIController, :toggle_webhooks end # diff --git a/priv/posts/2025/06-21-webhooks.md b/priv/posts/2025/06-21-webhooks.md index 9a448205..f295ecfb 100644 --- a/priv/posts/2025/06-21-webhooks.md +++ b/priv/posts/2025/06-21-webhooks.md @@ -1,26 +1,26 @@ %{ -title: "Real-Time Events API: WebSockets and Webhooks for Wanderer", +title: "Real-Time Events API: Server-Sent Events and Webhooks for Wanderer", author: "Wanderer Team", cover_image_uri: "/images/news/06-21-webhooks/webhooks-hero.png", -tags: ~w(api webhooks websocket real-time discord integration developer), -description: "Connect to Wanderer's real-time events using WebSockets or webhooks. Learn how to receive instant notifications for map changes, kills, and more - including a complete Discord integration guide." +tags: ~w(api webhooks sse server-sent-events real-time discord integration developer), +description: "Connect to Wanderer's real-time events using Server-Sent Events (SSE) or webhooks. Learn how to receive instant notifications for map changes, kills, and more - including a complete Discord integration guide." } --- -# Real-Time Events API: WebSockets and Webhooks for Wanderer +# Real-Time Events API: Server-Sent Events and Webhooks for Wanderer -We're excited to announce the launch of Wanderer's Real-Time Events API, giving developers and power users instant access to map events as they happen. Whether you're building a Discord bot, creating custom alerts, or integrating with external tools, our new API provides two powerful methods to receive real-time updates: WebSockets for persistent connections and webhooks for HTTP-based integrations. +We're excited to announce the launch of Wanderer's Real-Time Events API, giving developers and power users instant access to map events as they happen. Whether you're building a Discord bot, creating custom alerts, or integrating with external tools, our new API provides two powerful methods to receive real-time updates: Server-Sent Events (SSE) for persistent streaming connections and webhooks for HTTP-based integrations. In the dynamic world of EVE Online wormhole mapping, every second counts. When a new signature appears, when a hostile kill occurs in your chain, or when a scout reports a new connection - having this information delivered instantly to your tools and teams can make all the difference. Our Real-Time Events API eliminates the need for polling and provides sub-second delivery of critical map events. ## What's New? -### WebSocket Connections -- **Persistent real-time streaming** of map events +### Server-Sent Events (SSE) +- **Persistent real-time streaming** of map events over HTTP - **Event filtering** to receive only the events you care about -- **Automatic reconnection** support with event backfill -- **Lightweight protocol** using Phoenix Channels V2 +- **Automatic backfill** support using event IDs +- **Simple HTTP-based protocol** with built-in browser support ### Webhook Delivery - **HTTP POST notifications** to your endpoints @@ -42,64 +42,99 @@ In the dynamic world of EVE Online wormhole mapping, every second counts. When a - Basic programming knowledge for integration ### Authentication -Both WebSocket and webhook APIs use your existing map API token for authentication. This token should be kept secure and never exposed in client-side code. +Both SSE and webhook APIs use your existing map API token for authentication. This token should be kept secure and never exposed in client-side code. -## WebSocket Quick Start +## Server-Sent Events (SSE) Quick Start -Connect to Wanderer's WebSocket endpoint to receive a real-time stream of events: +Connect to Wanderer's SSE endpoint to receive a real-time stream of events: ### JavaScript Example ```javascript -import { Socket } from "phoenix"; +// Connect to SSE endpoint +const mapId = "your-map-id-or-slug"; +const apiToken = "your-map-api-token"; -// Initialize connection -const socket = new Socket("wss://wanderer.ltd/socket/events", { - params: { token: "your-map-api-token" } +// Optional: Filter specific events +const eventTypes = ["add_system", "map_kill"].join(","); +const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream?events=${eventTypes}`; + +const eventSource = new EventSource(url, { + headers: { + 'Authorization': `Bearer ${apiToken}` + } }); -socket.connect(); +// Handle connection opened +eventSource.onopen = () => { + console.log("Connected to events stream"); +}; -// Join your map's event channel -const channel = socket.channel(`events:map:${mapId}`, { - // Optional: Filter specific events - events: ["add_system", "map_kill"] -}); +// Handle incoming events +eventSource.onmessage = (event) => { + const eventData = JSON.parse(event.data); + console.log(`Received ${eventData.type} event:`, eventData); + + // Handle specific event types + switch(eventData.type) { + case 'add_system': + console.log("New system added:", eventData.payload); + break; + case 'map_kill': + console.log("Kill detected:", eventData.payload); + break; + } +}; -// Handle events -channel.on("add_system", (event) => { - console.log("New system added:", event); -}); +// Handle errors +eventSource.onerror = (error) => { + console.error("SSE connection error:", error); +}; -channel.on("map_kill", (event) => { - console.log("Kill detected:", event); -}); - -// Connect to the channel -channel.join() - .receive("ok", () => console.log("Connected to events")) - .receive("error", (err) => console.error("Connection failed:", err)); +// Cleanup when done +// eventSource.close(); ``` ### Event Filtering -You can subscribe to specific events or use `"*"` to receive all events: +You can subscribe to specific events or omit the `events` parameter to receive all events: ```javascript // Subscribe to specific events only -const channel = socket.channel(`events:map:${mapId}`, { - events: ["add_system", "connection_added", "map_kill"] -}); +const eventTypes = ["add_system", "connection_added", "map_kill"].join(","); +const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream?events=${eventTypes}`; -// Or subscribe to all events -const channel = socket.channel(`events:map:${mapId}`, { - events: "*" +// Or subscribe to all events (no events parameter) +const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream`; +``` + +### Event Backfill +SSE supports automatic backfill using the `Last-Event-ID` header: + +```javascript +// Reconnect with backfill from last received event +const eventSource = new EventSource(url, { + headers: { + 'Authorization': `Bearer ${apiToken}`, + 'Last-Event-ID': lastEventId // Will receive events since this ID + } }); ``` ## Webhook Setup -Webhooks provide an alternative to WebSockets, delivering events via HTTP POST to your endpoint: +Webhooks provide an alternative to SSE, delivering events via HTTP POST to your endpoint: -### 1. Create a Webhook Subscription +### 1. Enable Webhooks for Your Map + +First, enable webhooks for your map (map owners only): + +```bash +curl -X PUT https://wanderer.ltd/api/maps/${MAP_ID}/webhooks/toggle \ + -H "Authorization: Bearer ${API_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"enabled": true}' +``` + +### 2. Create a Webhook Subscription ```bash curl -X POST https://wanderer.ltd/api/maps/${MAP_ID}/webhooks \ @@ -112,7 +147,7 @@ curl -X POST https://wanderer.ltd/api/maps/${MAP_ID}/webhooks \ }' ``` -### 2. Handle Incoming Webhooks +### 3. Handle Incoming Webhooks Your endpoint will receive POST requests with events: @@ -139,7 +174,7 @@ app.post('/webhook', (req, res) => { }); ``` -### 3. Signature Verification +### 4. Signature Verification Verify webhook authenticity using HMAC-SHA256: @@ -352,9 +387,16 @@ services: ### Step 3: Register Your Webhook -Register your transformer service with Wanderer: +First, enable webhooks for your map, then register your transformer service: ```bash +# Enable webhooks for the map +curl -X PUT https://wanderer.ltd/api/maps/${MAP_ID}/webhooks/toggle \ + -H "Authorization: Bearer ${API_TOKEN}" \ + -H "Content-Type: application/json" \ + -d '{"enabled": true}' + +# Register webhook subscription curl -X POST https://wanderer.ltd/api/maps/${MAP_ID}/webhooks \ -H "Authorization: Bearer ${API_TOKEN}" \ -H "Content-Type: application/json" \ @@ -408,11 +450,12 @@ Your Discord channel will now receive formatted notifications for all map events ## Best Practices -### For WebSocket Connections +### For SSE Connections - **Implement reconnection logic** with exponential backoff -- **Handle connection drops** gracefully +- **Handle connection drops** gracefully using the `onerror` event - **Use event filtering** to reduce bandwidth -- **Process events asynchronously** to avoid blocking +- **Store the `Last-Event-ID`** for seamless reconnection with backfill +- **Process events asynchronously** to avoid blocking the event loop ### For Webhooks - **Respond quickly** (within 3 seconds) to webhook deliveries @@ -430,21 +473,23 @@ Your Discord channel will now receive formatted notifications for all map events ## API Reference -### WebSocket Endpoints -- **Connection URL**: `wss://wanderer.ltd/socket/events` -- **Channel Topic**: `events:map:{map_id}` -- **Authentication**: Bearer token in connection params +### SSE Endpoints +- **Stream URL**: `https://wanderer.ltd/api/maps/{map_id}/events/stream` +- **Authentication**: Bearer token in Authorization header +- **Query Parameters**: + - `events`: Comma-separated list of event types (optional) + - `last_event_id`: ULID for backfill (optional) ### REST API Endpoints +- **Enable/Disable Webhooks**: `PUT /api/maps/{map_id}/webhooks/toggle` - **List Webhooks**: `GET /api/maps/{map_id}/webhooks` - **Create Webhook**: `POST /api/maps/{map_id}/webhooks` - **Update Webhook**: `PUT /api/maps/{map_id}/webhooks/{id}` - **Delete Webhook**: `DELETE /api/maps/{map_id}/webhooks/{id}` - **Rotate Secret**: `POST /api/maps/{map_id}/webhooks/{id}/rotate-secret` -- **Get Events** (backfill): `GET /api/maps/{map_id}/events?since={timestamp}` ### Rate Limits -- **WebSocket Connections**: 10 per map +- **SSE Connections**: Configurable per server (default: 50 per map, 10 per API key) - **Webhook Subscriptions**: 5 per map - **Event Delivery**: No limit (all events delivered) - **API Requests**: 100 per minute @@ -456,19 +501,24 @@ Connect to multiple maps simultaneously: ```javascript const maps = ['map-id-1', 'map-id-2', 'map-id-3']; -const channels = {}; +const eventSources = {}; maps.forEach(mapId => { - const channel = socket.channel(`events:map:${mapId}`, { - events: "*" + const url = `https://wanderer.ltd/api/maps/${mapId}/events/stream`; + const eventSource = new EventSource(url, { + headers: { 'Authorization': `Bearer ${apiToken}` } }); - channel.on("*", (eventType, event) => { - console.log(`[${mapId}] ${eventType}:`, event); - }); + eventSource.onmessage = (event) => { + const eventData = JSON.parse(event.data); + console.log(`[${mapId}] ${eventData.type}:`, eventData); + }; - channel.join(); - channels[mapId] = channel; + eventSource.onerror = (error) => { + console.error(`[${mapId}] SSE error:`, error); + }; + + eventSources[mapId] = eventSource; }); ``` @@ -504,27 +554,34 @@ const activityTracker = { Create sophisticated alert conditions: ```javascript -// Alert on high-value kills -channel.on("map_kill", (event) => { - if (event.payload.value > 1000000000) { // 1B+ ISK +// Set up SSE connection for alerts +const eventSource = new EventSource(`https://wanderer.ltd/api/maps/${mapId}/events/stream`, { + headers: { 'Authorization': `Bearer ${apiToken}` } +}); + +eventSource.onmessage = (event) => { + const eventData = JSON.parse(event.data); + + // Alert on high-value kills + if (eventData.type === 'map_kill' && eventData.payload.value > 1000000000) { sendUrgentAlert({ title: "High Value Kill Detected!", - message: `${event.payload.victim.ship} worth ${event.payload.value / 1e9}B ISK destroyed`, + message: `${eventData.payload.victim.ship} worth ${eventData.payload.value / 1e9}B ISK destroyed`, priority: "high" }); } -}); - -// Alert on new connections to specific systems -channel.on("connection_added", (event) => { - const watchedSystems = ["J123456", "J234567"]; - if (watchedSystems.includes(event.payload.to_name)) { - sendAlert({ - title: "Connection to Watched System", - message: `New connection to ${event.payload.to_name} from ${event.payload.from_name}` - }); + + // Alert on new connections to specific systems + if (eventData.type === 'connection_added') { + const watchedSystems = ["J123456", "J234567"]; + if (watchedSystems.includes(eventData.payload.to_name)) { + sendAlert({ + title: "Connection to Watched System", + message: `New connection to ${eventData.payload.to_name} from ${eventData.payload.from_name}` + }); + } } -}); +}; ``` ## Coming Soon @@ -548,8 +605,10 @@ Need help with the Real-Time Events API? The Real-Time Events API opens up endless possibilities for integrating Wanderer with your tools and workflows. Whether you're sending notifications to Discord, building custom dashboards, or creating advanced alerting systems, you now have instant access to everything happening in your maps. +Server-Sent Events provide a simple, HTTP-based streaming solution that works in all modern browsers and environments, while webhooks offer reliable HTTP-based delivery for server-to-server integrations. With per-map webhook controls, map owners have fine-grained control over their integrations. + Start building with real-time events today and take your wormhole operations to the next level! --- -*The Real-Time Events API is available now for all Wanderer maps. No additional subscription required - if you have API access to a map, you can use webhooks and WebSockets.* \ No newline at end of file +*The Real-Time Events API is available now for all Wanderer maps. No additional subscription required - if you have API access to a map, you can use SSE and webhooks. Webhook delivery requires map owner activation.* \ No newline at end of file diff --git a/priv/repo/migrations/20250701000000_add_map_webhooks_enabled.exs b/priv/repo/migrations/20250701000000_add_map_webhooks_enabled.exs new file mode 100644 index 00000000..7dc8c017 --- /dev/null +++ b/priv/repo/migrations/20250701000000_add_map_webhooks_enabled.exs @@ -0,0 +1,19 @@ +defmodule WandererApp.Repo.Migrations.AddMapWebhooksEnabled do + @moduledoc """ + Add webhooks_enabled field to maps table for per-map webhook control. + """ + + use Ecto.Migration + + def up do + alter table(:maps_v1) do + add :webhooks_enabled, :boolean, null: false, default: false + end + end + + def down do + alter table(:maps_v1) do + remove :webhooks_enabled + end + end +end \ No newline at end of file