diff --git a/lib/wanderer_app_web/controllers/api/events_controller.ex b/lib/wanderer_app_web/controllers/api/events_controller.ex index 3e7f84d5..f68311ea 100644 --- a/lib/wanderer_app_web/controllers/api/events_controller.ex +++ b/lib/wanderer_app_web/controllers/api/events_controller.ex @@ -9,7 +9,6 @@ defmodule WandererAppWeb.Api.EventsController do alias WandererApp.ExternalEvents.{SseStreamManager, EventFilter, MapEventRelay} alias WandererApp.Api.Map, as: ApiMap - alias WandererAppWeb.SSE alias Plug.Crypto require Logger @@ -54,13 +53,13 @@ defmodule WandererAppWeb.Api.EventsController do end # Send SSE headers - conn = SSE.send_headers(conn) + conn = send_headers(conn) # Track the connection case SseStreamManager.add_client(map_id, api_key, self(), event_filter) do {:ok, _} -> # Send initial connection event - conn = SSE.send_event(conn, %{ + conn = send_event(conn, %{ id: Ulid.generate(), event: "connected", data: %{ @@ -117,7 +116,7 @@ defmodule WandererAppWeb.Api.EventsController do case Jason.decode(event_json) do {:ok, event} -> if EventFilter.matches?(event["type"], event_filter) do - SSE.send_event(acc_conn, event) + send_event(acc_conn, event) else acc_conn end @@ -142,7 +141,7 @@ defmodule WandererAppWeb.Api.EventsController do case Jason.decode(event_json) do {:ok, event} -> if EventFilter.matches?(event["type"], event_filter) do - SSE.send_event(conn, event) + send_event(conn, event) else conn end @@ -157,7 +156,7 @@ defmodule WandererAppWeb.Api.EventsController do :keepalive -> # Send keepalive - conn = SSE.send_keepalive(conn) + conn = send_keepalive(conn) # Continue streaming stream_events(conn, map_id, api_key, event_filter) @@ -169,7 +168,7 @@ defmodule WandererAppWeb.Api.EventsController do after 30_000 -> # Send keepalive every 30 seconds - conn = SSE.send_keepalive(conn) + conn = send_keepalive(conn) stream_events(conn, map_id, api_key, event_filter) end rescue @@ -227,4 +226,80 @@ defmodule WandererAppWeb.Api.EventsController do end end end + + # SSE helper functions + + defp send_headers(conn) do + conn + |> put_resp_content_type("text/event-stream") + |> put_resp_header("cache-control", "no-cache") + |> put_resp_header("connection", "keep-alive") + |> put_resp_header("access-control-allow-origin", "*") + |> put_resp_header("access-control-allow-headers", "Cache-Control") + |> send_chunked(200) + end + + defp send_event(conn, event) when is_map(event) do + sse_data = format_sse_event(event) + + case chunk(conn, sse_data) do + {:ok, conn} -> + conn + + {:error, reason} -> + Logger.error("Failed to send SSE event: #{inspect(reason)}") + # Return the connection as-is since we can't recover from chunk errors + # The error will be caught by the stream_events rescue clause + conn + end + end + + defp send_keepalive(conn) do + case chunk(conn, ": keepalive\n\n") do + {:ok, conn} -> + conn + + {:error, reason} -> + Logger.error("Failed to send SSE keepalive: #{inspect(reason)}") + # Return the connection as-is since we can't recover from chunk errors + # The error will be caught by the stream_events rescue clause + conn + end + end + + defp format_sse_event(event) do + data = [] + + # Add event type if present + data = + case Map.get(event, :event) do + nil -> data + event_type -> ["event: #{event_type}\n" | data] + end + + # Add ID if present + data = + case Map.get(event, :id) do + nil -> data + id -> ["id: #{id}\n" | data] + end + + # Add data (required) + data = + case Map.get(event, :data) do + nil -> + ["data: \n" | data] + event_data when is_binary(event_data) -> + ["data: #{event_data}\n" | data] + event_data -> + json_data = Jason.encode!(event_data) + ["data: #{json_data}\n" | data] + end + + # Reverse to get correct order and add final newline + data + |> Enum.reverse() + |> Enum.join("") + |> Kernel.<>("\n") + end end \ No newline at end of file