mirror of
https://github.com/wanderer-industries/wanderer
synced 2025-12-12 10:45:54 +00:00
fix: update env variable usage for sse
This commit is contained in:
@@ -393,7 +393,7 @@ config :wanderer_app, :license_manager,
|
|||||||
|
|
||||||
# SSE Configuration
|
# SSE Configuration
|
||||||
config :wanderer_app, :sse,
|
config :wanderer_app, :sse,
|
||||||
enabled: System.get_env("WANDERER_SSE_ENABLED", "true") == "true",
|
enabled: config_dir |> get_var_from_path_or_env("WANDERER_SSE_ENABLED", "true") |> String.to_existing_atom(),
|
||||||
max_connections_total: config_dir |> get_int_from_path_or_env("WANDERER_SSE_MAX_CONNECTIONS", 1000),
|
max_connections_total: config_dir |> get_int_from_path_or_env("WANDERER_SSE_MAX_CONNECTIONS", 1000),
|
||||||
max_connections_per_map: config_dir |> get_int_from_path_or_env("SSE_MAX_CONNECTIONS_PER_MAP", 50),
|
max_connections_per_map: config_dir |> get_int_from_path_or_env("SSE_MAX_CONNECTIONS_PER_MAP", 50),
|
||||||
max_connections_per_api_key:
|
max_connections_per_api_key:
|
||||||
@@ -403,5 +403,5 @@ config :wanderer_app, :sse,
|
|||||||
|
|
||||||
# External Events Configuration
|
# External Events Configuration
|
||||||
config :wanderer_app, :external_events,
|
config :wanderer_app, :external_events,
|
||||||
webhooks_enabled: System.get_env("WANDERER_WEBHOOKS_ENABLED", "true") == "true",
|
webhooks_enabled: config_dir |> get_var_from_path_or_env("WANDERER_WEBHOOKS_ENABLED", "true") |> String.to_existing_atom(),
|
||||||
webhook_timeout_ms: config_dir |> get_int_from_path_or_env("WANDERER_WEBHOOK_TIMEOUT_MS", 15000)
|
webhook_timeout_ms: config_dir |> get_int_from_path_or_env("WANDERER_WEBHOOK_TIMEOUT_MS", 15000)
|
||||||
|
|||||||
@@ -48,6 +48,26 @@ defmodule WandererApp.Env do
|
|||||||
)
|
)
|
||||||
def restrict_maps_creation?, do: get_key(:restrict_maps_creation, false)
|
def restrict_maps_creation?, do: get_key(:restrict_maps_creation, false)
|
||||||
|
|
||||||
|
def sse_enabled? do
|
||||||
|
Application.get_env(@app, :sse, [])
|
||||||
|
|> Keyword.get(:enabled, false)
|
||||||
|
|> case do
|
||||||
|
:true -> true
|
||||||
|
:false -> false
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
|
def webhooks_enabled? do
|
||||||
|
Application.get_env(@app, :external_events, [])
|
||||||
|
|> Keyword.get(:webhooks_enabled, false)
|
||||||
|
|> case do
|
||||||
|
:true -> true
|
||||||
|
:false -> false
|
||||||
|
_ -> false
|
||||||
|
end
|
||||||
|
end
|
||||||
|
|
||||||
@decorate cacheable(
|
@decorate cacheable(
|
||||||
cache: WandererApp.Cache,
|
cache: WandererApp.Cache,
|
||||||
key: "map-connection-auto-expire-hours"
|
key: "map-connection-auto-expire-hours"
|
||||||
|
|||||||
@@ -58,9 +58,20 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do
|
|||||||
# Schedule periodic cleanup of dead connections
|
# Schedule periodic cleanup of dead connections
|
||||||
schedule_cleanup()
|
schedule_cleanup()
|
||||||
|
|
||||||
|
# Read configuration once during initialization
|
||||||
|
sse_config = Application.get_env(:wanderer_app, :sse, [])
|
||||||
|
|
||||||
state = %{
|
state = %{
|
||||||
connections: %{}, # map_id => %{api_key => [connection_info]}
|
connections: %{}, # map_id => %{api_key => [connection_info]}
|
||||||
monitors: %{} # pid => {map_id, api_key}
|
monitors: %{}, # pid => {map_id, api_key}
|
||||||
|
# Configuration
|
||||||
|
enabled: WandererApp.Env.sse_enabled?() |> then(fn
|
||||||
|
true -> :true
|
||||||
|
false -> :false
|
||||||
|
end),
|
||||||
|
max_connections_total: Keyword.get(sse_config, :max_connections_total, 1000),
|
||||||
|
max_connections_per_map: Keyword.get(sse_config, :max_connections_per_map, 50),
|
||||||
|
max_connections_per_api_key: Keyword.get(sse_config, :max_connections_per_api_key, 10)
|
||||||
}
|
}
|
||||||
|
|
||||||
Logger.info("SSE Stream Manager started")
|
Logger.info("SSE Stream Manager started")
|
||||||
@@ -70,13 +81,11 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do
|
|||||||
@impl true
|
@impl true
|
||||||
def handle_call({:add_client, map_id, api_key, client_pid, event_filter}, _from, state) do
|
def handle_call({:add_client, map_id, api_key, client_pid, event_filter}, _from, state) do
|
||||||
# Check if feature is enabled
|
# Check if feature is enabled
|
||||||
unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do
|
unless state.enabled == :true do
|
||||||
{:reply, {:error, :sse_disabled}, state}
|
{:reply, {:error, :sse_disabled}, state}
|
||||||
else
|
else
|
||||||
# Check connection limits
|
# Check connection limits
|
||||||
max_connections = Application.get_env(:wanderer_app, :sse, [])[:max_connections_total] || 1000
|
case check_connection_limits(state, map_id, api_key, state.max_connections_total) do
|
||||||
|
|
||||||
case check_connection_limits(state, map_id, api_key, max_connections) do
|
|
||||||
:ok ->
|
:ok ->
|
||||||
# Monitor the client process
|
# Monitor the client process
|
||||||
ref = Process.monitor(client_pid)
|
ref = Process.monitor(client_pid)
|
||||||
@@ -184,18 +193,14 @@ defmodule WandererApp.ExternalEvents.SseStreamManager do
|
|||||||
if total_connections >= max_total do
|
if total_connections >= max_total do
|
||||||
{:error, :max_connections_reached}
|
{:error, :max_connections_reached}
|
||||||
else
|
else
|
||||||
# Check per-map and per-API-key limits from existing SSE config
|
# Check per-map and per-API-key limits from state
|
||||||
sse_config = Application.get_env(:wanderer_app, :sse, [])
|
|
||||||
max_per_map = sse_config[:max_connections_per_map] || 50
|
|
||||||
max_per_key = sse_config[:max_connections_per_api_key] || 10
|
|
||||||
|
|
||||||
map_connections = count_map_connections(state, map_id)
|
map_connections = count_map_connections(state, map_id)
|
||||||
key_connections = count_api_key_connections(state, map_id, api_key)
|
key_connections = count_api_key_connections(state, map_id, api_key)
|
||||||
|
|
||||||
cond do
|
cond do
|
||||||
map_connections >= max_per_map ->
|
map_connections >= state.max_connections_per_map ->
|
||||||
{:error, :map_connection_limit_reached}
|
{:error, :map_connection_limit_reached}
|
||||||
key_connections >= max_per_key ->
|
key_connections >= state.max_connections_per_api_key ->
|
||||||
{:error, :api_key_connection_limit_reached}
|
{:error, :api_key_connection_limit_reached}
|
||||||
true ->
|
true ->
|
||||||
:ok
|
:ok
|
||||||
|
|||||||
@@ -55,9 +55,16 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do
|
|||||||
# Extract the pid from the tuple returned by start_link
|
# Extract the pid from the tuple returned by start_link
|
||||||
{:ok, task_supervisor_pid} = Task.Supervisor.start_link(name: WebhookDispatcher.TaskSupervisor)
|
{:ok, task_supervisor_pid} = Task.Supervisor.start_link(name: WebhookDispatcher.TaskSupervisor)
|
||||||
|
|
||||||
|
# Read configuration once during initialization
|
||||||
|
webhooks_enabled = WandererApp.Env.webhooks_enabled?() |> then(fn
|
||||||
|
true -> :true
|
||||||
|
false -> :false
|
||||||
|
end)
|
||||||
|
|
||||||
{:ok, %{
|
{:ok, %{
|
||||||
task_supervisor: task_supervisor_pid,
|
task_supervisor: task_supervisor_pid,
|
||||||
delivery_count: 0
|
delivery_count: 0,
|
||||||
|
webhooks_enabled: webhooks_enabled
|
||||||
}}
|
}}
|
||||||
end
|
end
|
||||||
|
|
||||||
@@ -99,7 +106,7 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do
|
|||||||
|
|
||||||
defp process_webhook_delivery(map_id, events, state) do
|
defp process_webhook_delivery(map_id, events, state) do
|
||||||
# Check if webhooks are enabled globally and for this map
|
# Check if webhooks are enabled globally and for this map
|
||||||
case webhooks_allowed?(map_id) do
|
case webhooks_allowed?(map_id, state.webhooks_enabled) do
|
||||||
:ok ->
|
:ok ->
|
||||||
# Get active webhook subscriptions for this map
|
# Get active webhook subscriptions for this map
|
||||||
case get_active_subscriptions(map_id) do
|
case get_active_subscriptions(map_id) do
|
||||||
@@ -116,9 +123,17 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do
|
|||||||
state
|
state
|
||||||
end
|
end
|
||||||
|
|
||||||
{:error, :webhooks_disabled} ->
|
{:error, :webhooks_globally_disabled} ->
|
||||||
|
Logger.debug(fn -> "Webhooks globally disabled" end)
|
||||||
|
state
|
||||||
|
|
||||||
|
{:error, :webhooks_disabled_for_map} ->
|
||||||
Logger.debug(fn -> "Webhooks disabled for map #{map_id}" end)
|
Logger.debug(fn -> "Webhooks disabled for map #{map_id}" end)
|
||||||
state
|
state
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.debug(fn -> "Webhooks not allowed for map #{map_id}: #{inspect(reason)}" end)
|
||||||
|
state
|
||||||
end
|
end
|
||||||
|> Map.update(:delivery_count, length(events), &(&1 + length(events)))
|
|> Map.update(:delivery_count, length(events), &(&1 + length(events)))
|
||||||
end
|
end
|
||||||
@@ -359,13 +374,14 @@ defmodule WandererApp.ExternalEvents.WebhookDispatcher do
|
|||||||
round(capped_delay + jitter)
|
round(capped_delay + jitter)
|
||||||
end
|
end
|
||||||
|
|
||||||
defp webhooks_allowed?(map_id) do
|
defp webhooks_allowed?(map_id, webhooks_globally_enabled) do
|
||||||
with true <- Application.get_env(:wanderer_app, :external_events, [])[:webhooks_enabled],
|
with :true <- webhooks_globally_enabled,
|
||||||
{:ok, map} <- WandererApp.Api.Map.by_id(map_id),
|
{:ok, map} <- WandererApp.Api.Map.by_id(map_id),
|
||||||
true <- map.webhooks_enabled do
|
true <- map.webhooks_enabled do
|
||||||
:ok
|
:ok
|
||||||
else
|
else
|
||||||
false -> {:error, :webhooks_globally_disabled}
|
:false -> {:error, :webhooks_globally_disabled}
|
||||||
|
nil -> {:error, :webhooks_globally_disabled}
|
||||||
{:error, :not_found} -> {:error, :map_not_found}
|
{:error, :not_found} -> {:error, :map_not_found}
|
||||||
%{webhooks_enabled: false} -> {:error, :webhooks_disabled_for_map}
|
%{webhooks_enabled: false} -> {:error, :webhooks_disabled_for_map}
|
||||||
{:error, reason} -> {:error, reason}
|
{:error, reason} -> {:error, reason}
|
||||||
|
|||||||
@@ -25,7 +25,7 @@ defmodule WandererAppWeb.Api.EventsController do
|
|||||||
Logger.info("SSE stream requested for map #{map_identifier}")
|
Logger.info("SSE stream requested for map #{map_identifier}")
|
||||||
|
|
||||||
# Check if SSE is enabled
|
# Check if SSE is enabled
|
||||||
unless Application.get_env(:wanderer_app, :sse, [])[:enabled] do
|
unless WandererApp.Env.sse_enabled?() do
|
||||||
conn
|
conn
|
||||||
|> put_status(:service_unavailable)
|
|> put_status(:service_unavailable)
|
||||||
|> put_resp_content_type("text/plain")
|
|> put_resp_content_type("text/plain")
|
||||||
|
|||||||
Reference in New Issue
Block a user