mirror of
https://github.com/wanderer-industries/wanderer
synced 2026-05-01 23:10:30 +00:00
528 lines
14 KiB
Elixir
528 lines
14 KiB
Elixir
defmodule WandererApp.Character.TrackerPool do
|
|
@moduledoc false
|
|
use GenServer, restart: :transient
|
|
|
|
require Logger
|
|
|
|
defstruct [
|
|
:tracked_ids,
|
|
:uuid,
|
|
:characters,
|
|
server_online: false,
|
|
last_location_duration: 0
|
|
]
|
|
|
|
@name __MODULE__
|
|
@cache :tracked_characters
|
|
@registry :tracker_pool_registry
|
|
@unique_registry :unique_tracker_pool_registry
|
|
|
|
@update_location_interval :timer.seconds(1)
|
|
@update_online_interval :timer.seconds(30)
|
|
@check_offline_characters_interval :timer.minutes(5)
|
|
@update_ship_interval :timer.seconds(2)
|
|
@update_info_interval :timer.minutes(2)
|
|
@update_wallet_interval :timer.minutes(10)
|
|
|
|
# Per-operation concurrency limits
|
|
# Location updates are critical and need high concurrency (100 chars in ~200ms)
|
|
# Note: This is fetched at runtime since it's configured via runtime.exs
|
|
defp location_concurrency do
|
|
Application.get_env(:wanderer_app, :location_concurrency, System.schedulers_online() * 12)
|
|
end
|
|
|
|
# Other operations can use lower concurrency
|
|
@standard_concurrency System.schedulers_online() * 2
|
|
|
|
@logger Application.compile_env(:wanderer_app, :logger)
|
|
|
|
def new(), do: __struct__()
|
|
def new(args), do: __struct__(args)
|
|
|
|
def start_link(tracked_ids) do
|
|
uuid = UUID.uuid1()
|
|
|
|
GenServer.start_link(
|
|
@name,
|
|
{uuid, tracked_ids},
|
|
name: Module.concat(__MODULE__, uuid)
|
|
)
|
|
end
|
|
|
|
@impl true
|
|
def init({uuid, tracked_ids}) do
|
|
{:ok, _} = Registry.register(@unique_registry, Module.concat(__MODULE__, uuid), tracked_ids)
|
|
{:ok, _} = Registry.register(@registry, __MODULE__, uuid)
|
|
|
|
tracked_ids
|
|
|> Enum.each(fn id ->
|
|
Cachex.put(@cache, id, uuid)
|
|
end)
|
|
|
|
state =
|
|
%{
|
|
uuid: uuid,
|
|
characters: tracked_ids
|
|
}
|
|
|> new()
|
|
|
|
{:ok, state, {:continue, :start}}
|
|
end
|
|
|
|
@impl true
|
|
def terminate(_reason, _state) do
|
|
:ok
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(:stop, state), do: {:stop, :normal, state}
|
|
|
|
@impl true
|
|
def handle_cast({:add_tracked_id, tracked_id}, %{characters: characters, uuid: uuid} = state) do
|
|
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_tracked_ids ->
|
|
[tracked_id | r_tracked_ids]
|
|
end)
|
|
|
|
Cachex.put(@cache, tracked_id, uuid)
|
|
|
|
{:noreply, %{state | characters: [tracked_id | characters]}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_cast(
|
|
{:remove_tracked_id, tracked_id},
|
|
%{characters: characters, uuid: uuid} = state
|
|
) do
|
|
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_tracked_ids ->
|
|
r_tracked_ids |> Enum.reject(fn id -> id == tracked_id end)
|
|
end)
|
|
|
|
Cachex.del(@cache, tracked_id)
|
|
|
|
{:noreply, %{state | characters: characters |> Enum.reject(fn id -> id == tracked_id end)}}
|
|
end
|
|
|
|
@impl true
|
|
def handle_call(:error, _, state), do: {:stop, :error, :ok, state}
|
|
|
|
@impl true
|
|
def handle_continue(:start, state) do
|
|
Logger.info("#{@name} started")
|
|
|
|
# Start message queue monitoring
|
|
Process.send_after(self(), :monitor_message_queue, :timer.seconds(30))
|
|
|
|
Phoenix.PubSub.subscribe(
|
|
WandererApp.PubSub,
|
|
"server_status"
|
|
)
|
|
|
|
# Stagger pool startups to distribute load across multiple pools
|
|
# Critical location updates get minimal stagger (0-500ms)
|
|
# Other operations get wider stagger (0-10s) to reduce thundering herd
|
|
location_stagger = :rand.uniform(500)
|
|
online_stagger = :rand.uniform(10_000)
|
|
ship_stagger = :rand.uniform(10_000)
|
|
info_stagger = :rand.uniform(60_000)
|
|
|
|
Process.send_after(self(), :update_online, 100 + online_stagger)
|
|
Process.send_after(self(), :update_location, 300 + location_stagger)
|
|
Process.send_after(self(), :update_ship, 500 + ship_stagger)
|
|
Process.send_after(self(), :update_info, 1500 + info_stagger)
|
|
Process.send_after(self(), :check_offline_characters, @check_offline_characters_interval)
|
|
|
|
if WandererApp.Env.wallet_tracking_enabled?() do
|
|
wallet_stagger = :rand.uniform(120_000)
|
|
Process.send_after(self(), :update_wallet, 1000 + wallet_stagger)
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
@impl true
|
|
def handle_info(:monitor_message_queue, state) do
|
|
monitor_message_queue(state)
|
|
|
|
# Schedule next monitoring check
|
|
Process.send_after(self(), :monitor_message_queue, :timer.seconds(30))
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info({ref, result}, state) when is_reference(ref) do
|
|
Process.demonitor(ref, [:flush])
|
|
|
|
case result do
|
|
{:error, error} ->
|
|
@logger.error("#{__MODULE__} failed to process: #{inspect(error)}")
|
|
:ok
|
|
|
|
_ ->
|
|
:ok
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info({:server_status, status}, state),
|
|
do: {:noreply, %{state | server_online: not status.vip}}
|
|
|
|
def handle_info(
|
|
:update_online,
|
|
%{
|
|
characters: characters,
|
|
server_online: true
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_online, @update_online_interval)
|
|
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.update_online(character_id)
|
|
end,
|
|
max_concurrency: @standard_concurrency,
|
|
on_timeout: :kill_task,
|
|
timeout: :timer.seconds(5)
|
|
)
|
|
|> Enum.each(fn _result -> :ok end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_online => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
|
|
ErrorTracker.report(e, __STACKTRACE__)
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_online,
|
|
%{
|
|
characters: characters
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_online, @update_online_interval)
|
|
|
|
try do
|
|
characters
|
|
|> Enum.each(fn character_id ->
|
|
WandererApp.Character.update_character(character_id, %{online: false})
|
|
|
|
WandererApp.Character.update_character_state(character_id, %{
|
|
is_online: false
|
|
})
|
|
end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_online => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:check_offline_characters,
|
|
%{
|
|
characters: characters
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :check_offline_characters, @check_offline_characters_interval)
|
|
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.check_offline(character_id)
|
|
end,
|
|
timeout: :timer.seconds(15),
|
|
max_concurrency: @standard_concurrency,
|
|
on_timeout: :kill_task
|
|
)
|
|
|> Enum.each(fn
|
|
{:ok, _result} -> :ok
|
|
error -> @logger.error("Error in check_offline: #{inspect(error)}")
|
|
end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] check_offline => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_location,
|
|
%{
|
|
characters: characters,
|
|
server_online: true
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_location, @update_location_interval)
|
|
|
|
start_time = System.monotonic_time(:millisecond)
|
|
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.update_location(character_id)
|
|
end,
|
|
max_concurrency: location_concurrency(),
|
|
on_timeout: :kill_task,
|
|
timeout: :timer.seconds(5)
|
|
)
|
|
|> Enum.each(fn _result -> :ok end)
|
|
|
|
# Emit telemetry for location update performance
|
|
duration = System.monotonic_time(:millisecond) - start_time
|
|
|
|
:telemetry.execute(
|
|
[:wanderer_app, :tracker_pool, :location_update],
|
|
%{duration: duration, character_count: length(characters)},
|
|
%{pool_uuid: state.uuid}
|
|
)
|
|
|
|
# Warn if location updates are falling behind (taking > 800ms for 100 chars)
|
|
if duration > 2000 do
|
|
Logger.warning(
|
|
"[Tracker Pool] Location updates falling behind: #{duration}ms for #{length(characters)} chars (pool: #{state.uuid})"
|
|
)
|
|
|
|
:telemetry.execute(
|
|
[:wanderer_app, :tracker_pool, :location_lag],
|
|
%{duration: duration, character_count: length(characters)},
|
|
%{pool_uuid: state.uuid}
|
|
)
|
|
end
|
|
|
|
{:noreply, %{state | last_location_duration: duration}}
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_location => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
def handle_info(
|
|
:update_location,
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_location, @update_location_interval)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_ship,
|
|
%{
|
|
characters: characters,
|
|
server_online: true,
|
|
last_location_duration: location_duration
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_ship, @update_ship_interval)
|
|
|
|
# Backpressure: Skip ship updates if location updates are falling behind
|
|
if location_duration > 1000 do
|
|
Logger.debug(
|
|
"[Tracker Pool] Skipping ship update due to location lag (#{location_duration}ms)"
|
|
)
|
|
|
|
:telemetry.execute(
|
|
[:wanderer_app, :tracker_pool, :ship_skipped],
|
|
%{count: 1},
|
|
%{pool_uuid: state.uuid, reason: :location_lag}
|
|
)
|
|
|
|
{:noreply, state}
|
|
else
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.update_ship(character_id)
|
|
end,
|
|
max_concurrency: @standard_concurrency,
|
|
on_timeout: :kill_task,
|
|
timeout: :timer.seconds(5)
|
|
)
|
|
|> Enum.each(fn _result -> :ok end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_ship => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
def handle_info(
|
|
:update_ship,
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_ship, @update_ship_interval)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_info,
|
|
%{
|
|
characters: characters,
|
|
server_online: true,
|
|
last_location_duration: location_duration
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_info, @update_info_interval)
|
|
|
|
# Backpressure: Skip info updates if location updates are severely falling behind
|
|
if location_duration > 1500 do
|
|
Logger.debug(
|
|
"[Tracker Pool] Skipping info update due to location lag (#{location_duration}ms)"
|
|
)
|
|
|
|
:telemetry.execute(
|
|
[:wanderer_app, :tracker_pool, :info_skipped],
|
|
%{count: 1},
|
|
%{pool_uuid: state.uuid, reason: :location_lag}
|
|
)
|
|
|
|
{:noreply, state}
|
|
else
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.update_info(character_id)
|
|
end,
|
|
timeout: :timer.seconds(15),
|
|
max_concurrency: @standard_concurrency,
|
|
on_timeout: :kill_task
|
|
)
|
|
|> Enum.each(fn
|
|
{:ok, _result} -> :ok
|
|
error -> Logger.error("Error in update_info: #{inspect(error)}")
|
|
end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_info => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
end
|
|
|
|
def handle_info(
|
|
:update_info,
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_info, @update_info_interval)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_wallet,
|
|
%{
|
|
characters: characters,
|
|
server_online: true
|
|
} =
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_wallet, @update_wallet_interval)
|
|
|
|
try do
|
|
characters
|
|
|> Task.async_stream(
|
|
fn character_id ->
|
|
WandererApp.Character.Tracker.update_wallet(character_id)
|
|
end,
|
|
timeout: :timer.minutes(5),
|
|
max_concurrency: @standard_concurrency,
|
|
on_timeout: :kill_task
|
|
)
|
|
|> Enum.each(fn
|
|
{:ok, _result} -> :ok
|
|
error -> Logger.error("Error in update_wallet: #{inspect(error)}")
|
|
end)
|
|
rescue
|
|
e ->
|
|
Logger.error("""
|
|
[Tracker Pool] update_wallet => exception: #{Exception.message(e)}
|
|
#{Exception.format_stacktrace(__STACKTRACE__)}
|
|
""")
|
|
end
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
def handle_info(
|
|
:update_wallet,
|
|
state
|
|
) do
|
|
Process.send_after(self(), :update_wallet, @update_wallet_interval)
|
|
|
|
{:noreply, state}
|
|
end
|
|
|
|
defp monitor_message_queue(state) do
|
|
try do
|
|
{_, message_queue_len} = Process.info(self(), :message_queue_len)
|
|
{_, memory} = Process.info(self(), :memory)
|
|
|
|
# Alert on high message queue
|
|
if message_queue_len > 50 do
|
|
Logger.warning("GENSERVER_QUEUE_HIGH: Character tracker pool message queue buildup",
|
|
pool_id: state.uuid,
|
|
message_queue_length: message_queue_len,
|
|
memory_bytes: memory,
|
|
tracked_characters: length(state.characters)
|
|
)
|
|
|
|
# Emit telemetry
|
|
:telemetry.execute(
|
|
[:wanderer_app, :character, :tracker_pool, :queue_buildup],
|
|
%{
|
|
message_queue_length: message_queue_len,
|
|
memory_bytes: memory
|
|
},
|
|
%{
|
|
pool_id: state.uuid,
|
|
tracked_characters: length(state.characters)
|
|
}
|
|
)
|
|
end
|
|
rescue
|
|
error ->
|
|
Logger.debug("Failed to monitor message queue: #{inspect(error)}")
|
|
end
|
|
end
|
|
end
|