mirror of
https://github.com/wanderer-industries/wanderer
synced 2025-12-12 10:45:54 +00:00
fix: clean up SSE warnings
This commit is contained in:
@@ -9,7 +9,6 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
|
|
||||||
alias WandererApp.ExternalEvents.{SseStreamManager, EventFilter, MapEventRelay}
|
alias WandererApp.ExternalEvents.{SseStreamManager, EventFilter, MapEventRelay}
|
||||||
alias WandererApp.Api.Map, as: ApiMap
|
alias WandererApp.Api.Map, as: ApiMap
|
||||||
alias WandererAppWeb.SSE
|
|
||||||
alias Plug.Crypto
|
alias Plug.Crypto
|
||||||
|
|
||||||
require Logger
|
require Logger
|
||||||
@@ -54,13 +53,13 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
end
|
end
|
||||||
|
|
||||||
# Send SSE headers
|
# Send SSE headers
|
||||||
conn = SSE.send_headers(conn)
|
conn = send_headers(conn)
|
||||||
|
|
||||||
# Track the connection
|
# Track the connection
|
||||||
case SseStreamManager.add_client(map_id, api_key, self(), event_filter) do
|
case SseStreamManager.add_client(map_id, api_key, self(), event_filter) do
|
||||||
{:ok, _} ->
|
{:ok, _} ->
|
||||||
# Send initial connection event
|
# Send initial connection event
|
||||||
conn = SSE.send_event(conn, %{
|
conn = send_event(conn, %{
|
||||||
id: Ulid.generate(),
|
id: Ulid.generate(),
|
||||||
event: "connected",
|
event: "connected",
|
||||||
data: %{
|
data: %{
|
||||||
@@ -117,7 +116,7 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
case Jason.decode(event_json) do
|
case Jason.decode(event_json) do
|
||||||
{:ok, event} ->
|
{:ok, event} ->
|
||||||
if EventFilter.matches?(event["type"], event_filter) do
|
if EventFilter.matches?(event["type"], event_filter) do
|
||||||
SSE.send_event(acc_conn, event)
|
send_event(acc_conn, event)
|
||||||
else
|
else
|
||||||
acc_conn
|
acc_conn
|
||||||
end
|
end
|
||||||
@@ -142,7 +141,7 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
case Jason.decode(event_json) do
|
case Jason.decode(event_json) do
|
||||||
{:ok, event} ->
|
{:ok, event} ->
|
||||||
if EventFilter.matches?(event["type"], event_filter) do
|
if EventFilter.matches?(event["type"], event_filter) do
|
||||||
SSE.send_event(conn, event)
|
send_event(conn, event)
|
||||||
else
|
else
|
||||||
conn
|
conn
|
||||||
end
|
end
|
||||||
@@ -157,7 +156,7 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
|
|
||||||
:keepalive ->
|
:keepalive ->
|
||||||
# Send keepalive
|
# Send keepalive
|
||||||
conn = SSE.send_keepalive(conn)
|
conn = send_keepalive(conn)
|
||||||
|
|
||||||
# Continue streaming
|
# Continue streaming
|
||||||
stream_events(conn, map_id, api_key, event_filter)
|
stream_events(conn, map_id, api_key, event_filter)
|
||||||
@@ -169,7 +168,7 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
after
|
after
|
||||||
30_000 ->
|
30_000 ->
|
||||||
# Send keepalive every 30 seconds
|
# Send keepalive every 30 seconds
|
||||||
conn = SSE.send_keepalive(conn)
|
conn = send_keepalive(conn)
|
||||||
stream_events(conn, map_id, api_key, event_filter)
|
stream_events(conn, map_id, api_key, event_filter)
|
||||||
end
|
end
|
||||||
rescue
|
rescue
|
||||||
@@ -227,4 +226,80 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
end
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
|
# SSE helper functions
|
||||||
|
|
||||||
|
defp send_headers(conn) do
|
||||||
|
conn
|
||||||
|
|> put_resp_content_type("text/event-stream")
|
||||||
|
|> put_resp_header("cache-control", "no-cache")
|
||||||
|
|> put_resp_header("connection", "keep-alive")
|
||||||
|
|> put_resp_header("access-control-allow-origin", "*")
|
||||||
|
|> put_resp_header("access-control-allow-headers", "Cache-Control")
|
||||||
|
|> send_chunked(200)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp send_event(conn, event) when is_map(event) do
|
||||||
|
sse_data = format_sse_event(event)
|
||||||
|
|
||||||
|
case chunk(conn, sse_data) do
|
||||||
|
{:ok, conn} ->
|
||||||
|
conn
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("Failed to send SSE event: #{inspect(reason)}")
|
||||||
|
# Return the connection as-is since we can't recover from chunk errors
|
||||||
|
# The error will be caught by the stream_events rescue clause
|
||||||
|
conn
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp send_keepalive(conn) do
|
||||||
|
case chunk(conn, ": keepalive\n\n") do
|
||||||
|
{:ok, conn} ->
|
||||||
|
conn
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("Failed to send SSE keepalive: #{inspect(reason)}")
|
||||||
|
# Return the connection as-is since we can't recover from chunk errors
|
||||||
|
# The error will be caught by the stream_events rescue clause
|
||||||
|
conn
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
defp format_sse_event(event) do
|
||||||
|
data = []
|
||||||
|
|
||||||
|
# Add event type if present
|
||||||
|
data =
|
||||||
|
case Map.get(event, :event) do
|
||||||
|
nil -> data
|
||||||
|
event_type -> ["event: #{event_type}\n" | data]
|
||||||
|
end
|
||||||
|
|
||||||
|
# Add ID if present
|
||||||
|
data =
|
||||||
|
case Map.get(event, :id) do
|
||||||
|
nil -> data
|
||||||
|
id -> ["id: #{id}\n" | data]
|
||||||
|
end
|
||||||
|
|
||||||
|
# Add data (required)
|
||||||
|
data =
|
||||||
|
case Map.get(event, :data) do
|
||||||
|
nil ->
|
||||||
|
["data: \n" | data]
|
||||||
|
event_data when is_binary(event_data) ->
|
||||||
|
["data: #{event_data}\n" | data]
|
||||||
|
event_data ->
|
||||||
|
json_data = Jason.encode!(event_data)
|
||||||
|
["data: #{json_data}\n" | data]
|
||||||
|
end
|
||||||
|
|
||||||
|
# Reverse to get correct order and add final newline
|
||||||
|
data
|
||||||
|
|> Enum.reverse()
|
||||||
|
|> Enum.join("")
|
||||||
|
|> Kernel.<>("\n")
|
||||||
|
end
|
||||||
end
|
end
|
||||||
Reference in New Issue
Block a user