Files

528 lines
14 KiB
Elixir

defmodule WandererApp.Character.TrackerPool do
@moduledoc false
use GenServer, restart: :transient
require Logger
defstruct [
:tracked_ids,
:uuid,
:characters,
server_online: false,
last_location_duration: 0
]
@name __MODULE__
@cache :tracked_characters
@registry :tracker_pool_registry
@unique_registry :unique_tracker_pool_registry
@update_location_interval :timer.seconds(1)
@update_online_interval :timer.seconds(30)
@check_offline_characters_interval :timer.minutes(5)
@update_ship_interval :timer.seconds(2)
@update_info_interval :timer.minutes(2)
@update_wallet_interval :timer.minutes(10)
# Per-operation concurrency limits
# Location updates are critical and need high concurrency (100 chars in ~200ms)
# Note: This is fetched at runtime since it's configured via runtime.exs
defp location_concurrency do
Application.get_env(:wanderer_app, :location_concurrency, System.schedulers_online() * 12)
end
# Other operations can use lower concurrency
@standard_concurrency System.schedulers_online() * 2
@logger Application.compile_env(:wanderer_app, :logger)
def new(), do: __struct__()
def new(args), do: __struct__(args)
def start_link(tracked_ids) do
uuid = UUID.uuid1()
GenServer.start_link(
@name,
{uuid, tracked_ids},
name: Module.concat(__MODULE__, uuid)
)
end
@impl true
def init({uuid, tracked_ids}) do
{:ok, _} = Registry.register(@unique_registry, Module.concat(__MODULE__, uuid), tracked_ids)
{:ok, _} = Registry.register(@registry, __MODULE__, uuid)
tracked_ids
|> Enum.each(fn id ->
Cachex.put(@cache, id, uuid)
end)
state =
%{
uuid: uuid,
characters: tracked_ids
}
|> new()
{:ok, state, {:continue, :start}}
end
@impl true
def terminate(_reason, _state) do
:ok
end
@impl true
def handle_cast(:stop, state), do: {:stop, :normal, state}
@impl true
def handle_cast({:add_tracked_id, tracked_id}, %{characters: characters, uuid: uuid} = state) do
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_tracked_ids ->
[tracked_id | r_tracked_ids]
end)
Cachex.put(@cache, tracked_id, uuid)
{:noreply, %{state | characters: [tracked_id | characters]}}
end
@impl true
def handle_cast(
{:remove_tracked_id, tracked_id},
%{characters: characters, uuid: uuid} = state
) do
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_tracked_ids ->
r_tracked_ids |> Enum.reject(fn id -> id == tracked_id end)
end)
Cachex.del(@cache, tracked_id)
{:noreply, %{state | characters: characters |> Enum.reject(fn id -> id == tracked_id end)}}
end
@impl true
def handle_call(:error, _, state), do: {:stop, :error, :ok, state}
@impl true
def handle_continue(:start, state) do
Logger.info("#{@name} started")
# Start message queue monitoring
Process.send_after(self(), :monitor_message_queue, :timer.seconds(30))
Phoenix.PubSub.subscribe(
WandererApp.PubSub,
"server_status"
)
# Stagger pool startups to distribute load across multiple pools
# Critical location updates get minimal stagger (0-500ms)
# Other operations get wider stagger (0-10s) to reduce thundering herd
location_stagger = :rand.uniform(500)
online_stagger = :rand.uniform(10_000)
ship_stagger = :rand.uniform(10_000)
info_stagger = :rand.uniform(60_000)
Process.send_after(self(), :update_online, 100 + online_stagger)
Process.send_after(self(), :update_location, 300 + location_stagger)
Process.send_after(self(), :update_ship, 500 + ship_stagger)
Process.send_after(self(), :update_info, 1500 + info_stagger)
Process.send_after(self(), :check_offline_characters, @check_offline_characters_interval)
if WandererApp.Env.wallet_tracking_enabled?() do
wallet_stagger = :rand.uniform(120_000)
Process.send_after(self(), :update_wallet, 1000 + wallet_stagger)
end
{:noreply, state}
end
@impl true
def handle_info(:monitor_message_queue, state) do
monitor_message_queue(state)
# Schedule next monitoring check
Process.send_after(self(), :monitor_message_queue, :timer.seconds(30))
{:noreply, state}
end
def handle_info({ref, result}, state) when is_reference(ref) do
Process.demonitor(ref, [:flush])
case result do
{:error, error} ->
@logger.error("#{__MODULE__} failed to process: #{inspect(error)}")
:ok
_ ->
:ok
end
{:noreply, state}
end
def handle_info({:server_status, status}, state),
do: {:noreply, %{state | server_online: not status.vip}}
def handle_info(
:update_online,
%{
characters: characters,
server_online: true
} =
state
) do
Process.send_after(self(), :update_online, @update_online_interval)
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.update_online(character_id)
end,
max_concurrency: @standard_concurrency,
on_timeout: :kill_task,
timeout: :timer.seconds(5)
)
|> Enum.each(fn _result -> :ok end)
rescue
e ->
Logger.error("""
[Tracker Pool] update_online => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
ErrorTracker.report(e, __STACKTRACE__)
end
{:noreply, state}
end
def handle_info(
:update_online,
%{
characters: characters
} =
state
) do
Process.send_after(self(), :update_online, @update_online_interval)
try do
characters
|> Enum.each(fn character_id ->
WandererApp.Character.update_character(character_id, %{online: false})
WandererApp.Character.update_character_state(character_id, %{
is_online: false
})
end)
rescue
e ->
Logger.error("""
[Tracker Pool] update_online => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
end
{:noreply, state}
end
def handle_info(
:check_offline_characters,
%{
characters: characters
} =
state
) do
Process.send_after(self(), :check_offline_characters, @check_offline_characters_interval)
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.check_offline(character_id)
end,
timeout: :timer.seconds(15),
max_concurrency: @standard_concurrency,
on_timeout: :kill_task
)
|> Enum.each(fn
{:ok, _result} -> :ok
error -> @logger.error("Error in check_offline: #{inspect(error)}")
end)
rescue
e ->
Logger.error("""
[Tracker Pool] check_offline => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
end
{:noreply, state}
end
def handle_info(
:update_location,
%{
characters: characters,
server_online: true
} =
state
) do
Process.send_after(self(), :update_location, @update_location_interval)
start_time = System.monotonic_time(:millisecond)
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.update_location(character_id)
end,
max_concurrency: location_concurrency(),
on_timeout: :kill_task,
timeout: :timer.seconds(5)
)
|> Enum.each(fn _result -> :ok end)
# Emit telemetry for location update performance
duration = System.monotonic_time(:millisecond) - start_time
:telemetry.execute(
[:wanderer_app, :tracker_pool, :location_update],
%{duration: duration, character_count: length(characters)},
%{pool_uuid: state.uuid}
)
# Warn if location updates are falling behind (taking > 800ms for 100 chars)
if duration > 2000 do
Logger.warning(
"[Tracker Pool] Location updates falling behind: #{duration}ms for #{length(characters)} chars (pool: #{state.uuid})"
)
:telemetry.execute(
[:wanderer_app, :tracker_pool, :location_lag],
%{duration: duration, character_count: length(characters)},
%{pool_uuid: state.uuid}
)
end
{:noreply, %{state | last_location_duration: duration}}
rescue
e ->
Logger.error("""
[Tracker Pool] update_location => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
{:noreply, state}
end
end
def handle_info(
:update_location,
state
) do
Process.send_after(self(), :update_location, @update_location_interval)
{:noreply, state}
end
def handle_info(
:update_ship,
%{
characters: characters,
server_online: true,
last_location_duration: location_duration
} =
state
) do
Process.send_after(self(), :update_ship, @update_ship_interval)
# Backpressure: Skip ship updates if location updates are falling behind
if location_duration > 1000 do
Logger.debug(
"[Tracker Pool] Skipping ship update due to location lag (#{location_duration}ms)"
)
:telemetry.execute(
[:wanderer_app, :tracker_pool, :ship_skipped],
%{count: 1},
%{pool_uuid: state.uuid, reason: :location_lag}
)
{:noreply, state}
else
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.update_ship(character_id)
end,
max_concurrency: @standard_concurrency,
on_timeout: :kill_task,
timeout: :timer.seconds(5)
)
|> Enum.each(fn _result -> :ok end)
rescue
e ->
Logger.error("""
[Tracker Pool] update_ship => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
end
{:noreply, state}
end
end
def handle_info(
:update_ship,
state
) do
Process.send_after(self(), :update_ship, @update_ship_interval)
{:noreply, state}
end
def handle_info(
:update_info,
%{
characters: characters,
server_online: true,
last_location_duration: location_duration
} =
state
) do
Process.send_after(self(), :update_info, @update_info_interval)
# Backpressure: Skip info updates if location updates are severely falling behind
if location_duration > 1500 do
Logger.debug(
"[Tracker Pool] Skipping info update due to location lag (#{location_duration}ms)"
)
:telemetry.execute(
[:wanderer_app, :tracker_pool, :info_skipped],
%{count: 1},
%{pool_uuid: state.uuid, reason: :location_lag}
)
{:noreply, state}
else
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.update_info(character_id)
end,
timeout: :timer.seconds(15),
max_concurrency: @standard_concurrency,
on_timeout: :kill_task
)
|> Enum.each(fn
{:ok, _result} -> :ok
error -> Logger.error("Error in update_info: #{inspect(error)}")
end)
rescue
e ->
Logger.error("""
[Tracker Pool] update_info => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
end
{:noreply, state}
end
end
def handle_info(
:update_info,
state
) do
Process.send_after(self(), :update_info, @update_info_interval)
{:noreply, state}
end
def handle_info(
:update_wallet,
%{
characters: characters,
server_online: true
} =
state
) do
Process.send_after(self(), :update_wallet, @update_wallet_interval)
try do
characters
|> Task.async_stream(
fn character_id ->
WandererApp.Character.Tracker.update_wallet(character_id)
end,
timeout: :timer.minutes(5),
max_concurrency: @standard_concurrency,
on_timeout: :kill_task
)
|> Enum.each(fn
{:ok, _result} -> :ok
error -> Logger.error("Error in update_wallet: #{inspect(error)}")
end)
rescue
e ->
Logger.error("""
[Tracker Pool] update_wallet => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
end
{:noreply, state}
end
def handle_info(
:update_wallet,
state
) do
Process.send_after(self(), :update_wallet, @update_wallet_interval)
{:noreply, state}
end
defp monitor_message_queue(state) do
try do
{_, message_queue_len} = Process.info(self(), :message_queue_len)
{_, memory} = Process.info(self(), :memory)
# Alert on high message queue
if message_queue_len > 50 do
Logger.warning("GENSERVER_QUEUE_HIGH: Character tracker pool message queue buildup",
pool_id: state.uuid,
message_queue_length: message_queue_len,
memory_bytes: memory,
tracked_characters: length(state.characters)
)
# Emit telemetry
:telemetry.execute(
[:wanderer_app, :character, :tracker_pool, :queue_buildup],
%{
message_queue_length: message_queue_len,
memory_bytes: memory
},
%{
pool_id: state.uuid,
tracked_characters: length(state.characters)
}
)
end
rescue
error ->
Logger.debug("Failed to monitor message queue: #{inspect(error)}")
end
end
end