mirror of
https://github.com/wanderer-industries/wanderer
synced 2025-11-14 21:26:10 +00:00
Compare commits
19 Commits
fix-error-
...
develop
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7613ca78da | ||
|
|
5ec97d74ca | ||
|
|
74359a5542 | ||
|
|
0020f46dd8 | ||
|
|
c8631708b9 | ||
|
|
a6751b45c6 | ||
|
|
f48aeb5cec | ||
|
|
a5f25646c9 | ||
|
|
23cf1fd96f | ||
|
|
6f15521069 | ||
|
|
9d41e57c06 | ||
|
|
ea9a22df09 | ||
|
|
0d4fd6f214 | ||
|
|
87a6c20545 | ||
|
|
c375f4e4ce | ||
|
|
843a6d7320 | ||
|
|
63ca473113 | ||
|
|
7df8284124 | ||
|
|
21ca630abd |
36
CHANGELOG.md
36
CHANGELOG.md
@@ -2,6 +2,42 @@
|
||||
|
||||
<!-- changelog -->
|
||||
|
||||
## [v1.84.17](https://github.com/wanderer-industries/wanderer/compare/v1.84.16...v1.84.17) (2025-11-14)
|
||||
|
||||
|
||||
|
||||
|
||||
### Bug Fixes:
|
||||
|
||||
* core: fixed activity tracking issues
|
||||
|
||||
## [v1.84.16](https://github.com/wanderer-industries/wanderer/compare/v1.84.15...v1.84.16) (2025-11-13)
|
||||
|
||||
|
||||
|
||||
|
||||
### Bug Fixes:
|
||||
|
||||
* core: removed maps auto-start logic
|
||||
|
||||
## [v1.84.15](https://github.com/wanderer-industries/wanderer/compare/v1.84.14...v1.84.15) (2025-11-13)
|
||||
|
||||
|
||||
|
||||
|
||||
### Bug Fixes:
|
||||
|
||||
* core: fixed maps start/stop logic, added server downtime period support
|
||||
|
||||
## [v1.84.14](https://github.com/wanderer-industries/wanderer/compare/v1.84.13...v1.84.14) (2025-11-13)
|
||||
|
||||
|
||||
|
||||
|
||||
### Bug Fixes:
|
||||
|
||||
* Map: Fixed problem related with error if settings was removed and mapper crashed. Fixed settings reset.
|
||||
|
||||
## [v1.84.13](https://github.com/wanderer-industries/wanderer/compare/v1.84.12...v1.84.13) (2025-11-13)
|
||||
|
||||
|
||||
|
||||
@@ -27,11 +27,7 @@ config :wanderer_app,
|
||||
generators: [timestamp_type: :utc_datetime],
|
||||
ddrt: WandererApp.Map.CacheRTree,
|
||||
logger: Logger,
|
||||
pubsub_client: Phoenix.PubSub,
|
||||
wanderer_kills_base_url:
|
||||
System.get_env("WANDERER_KILLS_BASE_URL", "ws://host.docker.internal:4004"),
|
||||
wanderer_kills_service_enabled:
|
||||
System.get_env("WANDERER_KILLS_SERVICE_ENABLED", "false") == "true"
|
||||
pubsub_client: Phoenix.PubSub
|
||||
|
||||
config :wanderer_app, WandererAppWeb.Endpoint,
|
||||
adapter: Bandit.PhoenixAdapter,
|
||||
|
||||
@@ -4,7 +4,7 @@ import Config
|
||||
config :wanderer_app, WandererApp.Repo,
|
||||
username: "postgres",
|
||||
password: "postgres",
|
||||
hostname: System.get_env("DB_HOST", "localhost"),
|
||||
hostname: "localhost",
|
||||
database: "wanderer_dev",
|
||||
stacktrace: true,
|
||||
show_sensitive_data_on_connection_error: true,
|
||||
|
||||
@@ -8,7 +8,7 @@ defmodule WandererApp.Character.TrackerPool do
|
||||
:tracked_ids,
|
||||
:uuid,
|
||||
:characters,
|
||||
server_online: true
|
||||
server_online: false
|
||||
]
|
||||
|
||||
@name __MODULE__
|
||||
|
||||
@@ -12,7 +12,7 @@ defmodule WandererApp.Character.TransactionsTracker.Impl do
|
||||
total_balance: 0,
|
||||
transactions: [],
|
||||
retries: 5,
|
||||
server_online: true,
|
||||
server_online: false,
|
||||
status: :started
|
||||
]
|
||||
|
||||
@@ -75,7 +75,7 @@ defmodule WandererApp.Character.TransactionsTracker.Impl do
|
||||
|
||||
def handle_event(
|
||||
:update_corp_wallets,
|
||||
%{character: character} = state
|
||||
%{character: character, server_online: true} = state
|
||||
) do
|
||||
Process.send_after(self(), :update_corp_wallets, @update_interval)
|
||||
|
||||
@@ -88,26 +88,26 @@ defmodule WandererApp.Character.TransactionsTracker.Impl do
|
||||
:update_corp_wallets,
|
||||
state
|
||||
) do
|
||||
Process.send_after(self(), :update_corp_wallets, :timer.seconds(15))
|
||||
Process.send_after(self(), :update_corp_wallets, @update_interval)
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
:check_wallets,
|
||||
%{wallets: []} = state
|
||||
%{character: character, wallets: wallets, server_online: true} = state
|
||||
) do
|
||||
Process.send_after(self(), :check_wallets, :timer.seconds(5))
|
||||
Process.send_after(self(), :check_wallets, @update_interval)
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
:check_wallets,
|
||||
%{character: character, wallets: wallets} = state
|
||||
) do
|
||||
check_wallets(wallets, character)
|
||||
|
||||
state
|
||||
end
|
||||
|
||||
def handle_event(
|
||||
:check_wallets,
|
||||
state
|
||||
) do
|
||||
Process.send_after(self(), :check_wallets, @update_interval)
|
||||
|
||||
state
|
||||
|
||||
@@ -9,8 +9,8 @@ defmodule WandererApp.Map.Manager do
|
||||
|
||||
alias WandererApp.Map.Server
|
||||
|
||||
@maps_start_per_second 10
|
||||
@maps_start_interval 1000
|
||||
@maps_start_chunk_size 20
|
||||
@maps_start_interval 500
|
||||
@maps_queue :maps_queue
|
||||
@check_maps_queue_interval :timer.seconds(1)
|
||||
|
||||
@@ -58,9 +58,9 @@ defmodule WandererApp.Map.Manager do
|
||||
{:ok, pings_cleanup_timer} =
|
||||
:timer.send_interval(@pings_cleanup_interval, :cleanup_pings)
|
||||
|
||||
safe_async_task(fn ->
|
||||
start_last_active_maps()
|
||||
end)
|
||||
# safe_async_task(fn ->
|
||||
# start_last_active_maps()
|
||||
# end)
|
||||
|
||||
{:ok,
|
||||
%{
|
||||
@@ -153,7 +153,7 @@ defmodule WandererApp.Map.Manager do
|
||||
@maps_queue
|
||||
|> WandererApp.Queue.to_list!()
|
||||
|> Enum.uniq()
|
||||
|> Enum.chunk_every(@maps_start_per_second)
|
||||
|> Enum.chunk_every(@maps_start_chunk_size)
|
||||
|
||||
WandererApp.Queue.clear(@maps_queue)
|
||||
|
||||
|
||||
@@ -15,6 +15,7 @@ defmodule WandererApp.Map.MapPool do
|
||||
@cache :map_pool_cache
|
||||
@registry :map_pool_registry
|
||||
@unique_registry :unique_map_pool_registry
|
||||
@map_pool_limit 20
|
||||
|
||||
@garbage_collection_interval :timer.hours(4)
|
||||
@systems_cleanup_timeout :timer.minutes(30)
|
||||
@@ -64,11 +65,21 @@ defmodule WandererApp.Map.MapPool do
|
||||
def handle_continue({:start, map_ids}, state) do
|
||||
Logger.info("#{@name} started")
|
||||
|
||||
map_ids
|
||||
|> Enum.each(fn map_id ->
|
||||
GenServer.cast(self(), {:start_map, map_id})
|
||||
end)
|
||||
# Start maps synchronously and accumulate state changes
|
||||
new_state =
|
||||
map_ids
|
||||
|> Enum.reduce(state, fn map_id, current_state ->
|
||||
case do_start_map(map_id, current_state) do
|
||||
{:ok, updated_state} ->
|
||||
updated_state
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("[Map Pool] Failed to start map #{map_id}: #{reason}")
|
||||
current_state
|
||||
end
|
||||
end)
|
||||
|
||||
# Schedule periodic tasks
|
||||
Process.send_after(self(), :backup_state, @backup_state_timeout)
|
||||
Process.send_after(self(), :cleanup_systems, 15_000)
|
||||
Process.send_after(self(), :cleanup_characters, @characters_cleanup_timeout)
|
||||
@@ -77,46 +88,247 @@ defmodule WandererApp.Map.MapPool do
|
||||
# Start message queue monitoring
|
||||
Process.send_after(self(), :monitor_message_queue, :timer.seconds(30))
|
||||
|
||||
{:noreply, state}
|
||||
{:noreply, new_state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(:stop, state), do: {:stop, :normal, state}
|
||||
|
||||
@impl true
|
||||
def handle_cast({:start_map, map_id}, %{map_ids: map_ids, uuid: uuid} = state) do
|
||||
if map_id not in map_ids do
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
[map_id | r_map_ids]
|
||||
def handle_call({:start_map, map_id}, _from, %{map_ids: map_ids, uuid: uuid} = state) do
|
||||
# Enforce capacity limit to prevent pool overload due to race conditions
|
||||
if length(map_ids) >= @map_pool_limit do
|
||||
Logger.warning(
|
||||
"[Map Pool #{uuid}] Pool at capacity (#{length(map_ids)}/#{@map_pool_limit}), " <>
|
||||
"rejecting map #{map_id} and triggering new pool creation"
|
||||
)
|
||||
|
||||
# Trigger a new pool creation attempt asynchronously
|
||||
# This allows the system to create a new pool for this map
|
||||
spawn(fn ->
|
||||
WandererApp.Map.MapPoolDynamicSupervisor.start_map(map_id)
|
||||
end)
|
||||
|
||||
Cachex.put(@cache, map_id, uuid)
|
||||
|
||||
map_id
|
||||
|> WandererApp.Map.get_map_state!()
|
||||
|> Server.Impl.start_map()
|
||||
|
||||
{:noreply, %{state | map_ids: [map_id | map_ids]}}
|
||||
{:reply, :ok, state}
|
||||
else
|
||||
{:noreply, state}
|
||||
case do_start_map(map_id, state) do
|
||||
{:ok, new_state} ->
|
||||
{:reply, :ok, new_state}
|
||||
|
||||
{:error, _reason} ->
|
||||
# Error already logged in do_start_map
|
||||
{:reply, :ok, state}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(
|
||||
def handle_call(
|
||||
{:stop_map, map_id},
|
||||
%{map_ids: map_ids, uuid: uuid} = state
|
||||
_from,
|
||||
state
|
||||
) do
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
r_map_ids |> Enum.reject(fn id -> id == map_id end)
|
||||
end)
|
||||
case do_stop_map(map_id, state) do
|
||||
{:ok, new_state} ->
|
||||
{:reply, :ok, new_state}
|
||||
|
||||
Cachex.del(@cache, map_id)
|
||||
{:error, reason} ->
|
||||
{:reply, {:error, reason}, state}
|
||||
end
|
||||
end
|
||||
|
||||
map_id
|
||||
|> Server.Impl.stop_map()
|
||||
defp do_start_map(map_id, %{map_ids: map_ids, uuid: uuid} = state) do
|
||||
if map_id in map_ids do
|
||||
# Map already started
|
||||
{:ok, state}
|
||||
else
|
||||
# Track what operations succeeded for potential rollback
|
||||
completed_operations = []
|
||||
|
||||
{:noreply, %{state | map_ids: map_ids |> Enum.reject(fn id -> id == map_id end)}}
|
||||
try do
|
||||
# Step 1: Update Registry (most critical, do first)
|
||||
registry_result =
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
[map_id | r_map_ids]
|
||||
end)
|
||||
|
||||
completed_operations = [:registry | completed_operations]
|
||||
|
||||
case registry_result do
|
||||
{new_value, _old_value} when is_list(new_value) ->
|
||||
:ok
|
||||
|
||||
:error ->
|
||||
raise "Failed to update registry for pool #{uuid}"
|
||||
end
|
||||
|
||||
# Step 2: Add to cache
|
||||
case Cachex.put(@cache, map_id, uuid) do
|
||||
{:ok, _} ->
|
||||
completed_operations = [:cache | completed_operations]
|
||||
|
||||
{:error, reason} ->
|
||||
raise "Failed to add to cache: #{inspect(reason)}"
|
||||
end
|
||||
|
||||
# Step 3: Start the map server
|
||||
map_id
|
||||
|> WandererApp.Map.get_map_state!()
|
||||
|> Server.Impl.start_map()
|
||||
|
||||
completed_operations = [:map_server | completed_operations]
|
||||
|
||||
# Step 4: Update GenServer state (last, as this is in-memory and fast)
|
||||
new_state = %{state | map_ids: [map_id | map_ids]}
|
||||
|
||||
Logger.debug("[Map Pool] Successfully started map #{map_id} in pool #{uuid}")
|
||||
{:ok, new_state}
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("""
|
||||
[Map Pool] Failed to start map #{map_id} (completed: #{inspect(completed_operations)}): #{Exception.message(e)}
|
||||
#{Exception.format_stacktrace(__STACKTRACE__)}
|
||||
""")
|
||||
|
||||
# Attempt rollback of completed operations
|
||||
rollback_start_map_operations(map_id, uuid, completed_operations)
|
||||
|
||||
{:error, Exception.message(e)}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp rollback_start_map_operations(map_id, uuid, completed_operations) do
|
||||
Logger.warning("[Map Pool] Attempting to rollback start_map operations for #{map_id}")
|
||||
|
||||
# Rollback in reverse order
|
||||
if :map_server in completed_operations do
|
||||
Logger.debug("[Map Pool] Rollback: Stopping map server for #{map_id}")
|
||||
|
||||
try do
|
||||
Server.Impl.stop_map(map_id)
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("[Map Pool] Rollback failed to stop map server: #{Exception.message(e)}")
|
||||
end
|
||||
end
|
||||
|
||||
if :cache in completed_operations do
|
||||
Logger.debug("[Map Pool] Rollback: Removing #{map_id} from cache")
|
||||
|
||||
case Cachex.del(@cache, map_id) do
|
||||
{:ok, _} ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("[Map Pool] Rollback failed for cache: #{inspect(reason)}")
|
||||
end
|
||||
end
|
||||
|
||||
if :registry in completed_operations do
|
||||
Logger.debug("[Map Pool] Rollback: Removing #{map_id} from registry")
|
||||
|
||||
try do
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
r_map_ids |> Enum.reject(fn id -> id == map_id end)
|
||||
end)
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("[Map Pool] Rollback failed for registry: #{Exception.message(e)}")
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
defp do_stop_map(map_id, %{map_ids: map_ids, uuid: uuid} = state) do
|
||||
# Track what operations succeeded for potential rollback
|
||||
completed_operations = []
|
||||
|
||||
try do
|
||||
# Step 1: Update Registry (most critical, do first)
|
||||
registry_result =
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
r_map_ids |> Enum.reject(fn id -> id == map_id end)
|
||||
end)
|
||||
|
||||
completed_operations = [:registry | completed_operations]
|
||||
|
||||
case registry_result do
|
||||
{new_value, _old_value} when is_list(new_value) ->
|
||||
:ok
|
||||
|
||||
:error ->
|
||||
raise "Failed to update registry for pool #{uuid}"
|
||||
end
|
||||
|
||||
# Step 2: Delete from cache
|
||||
case Cachex.del(@cache, map_id) do
|
||||
{:ok, _} ->
|
||||
completed_operations = [:cache | completed_operations]
|
||||
|
||||
{:error, reason} ->
|
||||
raise "Failed to delete from cache: #{inspect(reason)}"
|
||||
end
|
||||
|
||||
# Step 3: Stop the map server (clean up all map resources)
|
||||
map_id
|
||||
|> Server.Impl.stop_map()
|
||||
|
||||
completed_operations = [:map_server | completed_operations]
|
||||
|
||||
# Step 4: Update GenServer state (last, as this is in-memory and fast)
|
||||
new_state = %{state | map_ids: map_ids |> Enum.reject(fn id -> id == map_id end)}
|
||||
|
||||
Logger.debug("[Map Pool] Successfully stopped map #{map_id} from pool #{uuid}")
|
||||
{:ok, new_state}
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("""
|
||||
[Map Pool] Failed to stop map #{map_id} (completed: #{inspect(completed_operations)}): #{Exception.message(e)}
|
||||
#{Exception.format_stacktrace(__STACKTRACE__)}
|
||||
""")
|
||||
|
||||
# Attempt rollback of completed operations
|
||||
rollback_stop_map_operations(map_id, uuid, completed_operations)
|
||||
|
||||
{:error, Exception.message(e)}
|
||||
end
|
||||
end
|
||||
|
||||
defp rollback_stop_map_operations(map_id, uuid, completed_operations) do
|
||||
Logger.warning("[Map Pool] Attempting to rollback stop_map operations for #{map_id}")
|
||||
|
||||
# Rollback in reverse order
|
||||
if :cache in completed_operations do
|
||||
Logger.debug("[Map Pool] Rollback: Re-adding #{map_id} to cache")
|
||||
|
||||
case Cachex.put(@cache, map_id, uuid) do
|
||||
{:ok, _} ->
|
||||
:ok
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("[Map Pool] Rollback failed for cache: #{inspect(reason)}")
|
||||
end
|
||||
end
|
||||
|
||||
if :registry in completed_operations do
|
||||
Logger.debug("[Map Pool] Rollback: Re-adding #{map_id} to registry")
|
||||
|
||||
try do
|
||||
Registry.update_value(@unique_registry, Module.concat(__MODULE__, uuid), fn r_map_ids ->
|
||||
if map_id in r_map_ids do
|
||||
r_map_ids
|
||||
else
|
||||
[map_id | r_map_ids]
|
||||
end
|
||||
end)
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("[Map Pool] Rollback failed for registry: #{Exception.message(e)}")
|
||||
end
|
||||
end
|
||||
|
||||
# Note: We don't rollback map_server stop as Server.Impl.stop_map() is idempotent
|
||||
# and the cleanup operations are safe to leave in a "stopped" state
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -231,25 +443,38 @@ defmodule WandererApp.Map.MapPool do
|
||||
Process.send_after(self(), :garbage_collect, @garbage_collection_interval)
|
||||
|
||||
try do
|
||||
map_ids
|
||||
|> Enum.each(fn map_id ->
|
||||
# presence_character_ids =
|
||||
# WandererApp.Cache.lookup!("map_#{map_id}:presence_character_ids", [])
|
||||
# Process each map and accumulate state changes
|
||||
new_state =
|
||||
map_ids
|
||||
|> Enum.reduce(state, fn map_id, current_state ->
|
||||
presence_character_ids =
|
||||
WandererApp.Cache.lookup!("map_#{map_id}:presence_character_ids", [])
|
||||
|
||||
# if presence_character_ids |> Enum.empty?() do
|
||||
Logger.info(
|
||||
"#{uuid}: No more characters present on: #{map_id}, shutting down map server..."
|
||||
)
|
||||
if presence_character_ids |> Enum.empty?() do
|
||||
Logger.info(
|
||||
"#{uuid}: No more characters present on: #{map_id}, shutting down map server..."
|
||||
)
|
||||
|
||||
GenServer.cast(self(), {:stop_map, map_id})
|
||||
# end
|
||||
end)
|
||||
case do_stop_map(map_id, current_state) do
|
||||
{:ok, updated_state} ->
|
||||
Logger.debug("#{uuid}: Successfully stopped map #{map_id}")
|
||||
updated_state
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("#{uuid}: Failed to stop map #{map_id}: #{reason}")
|
||||
current_state
|
||||
end
|
||||
else
|
||||
current_state
|
||||
end
|
||||
end)
|
||||
|
||||
{:noreply, new_state}
|
||||
rescue
|
||||
e ->
|
||||
Logger.error(Exception.message(e))
|
||||
Logger.error("#{uuid}: Garbage collection error: #{Exception.message(e)}")
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@impl true
|
||||
|
||||
@@ -7,7 +7,7 @@ defmodule WandererApp.Map.MapPoolDynamicSupervisor do
|
||||
@cache :map_pool_cache
|
||||
@registry :map_pool_registry
|
||||
@unique_registry :unique_map_pool_registry
|
||||
@map_pool_limit 10
|
||||
@map_pool_limit 20
|
||||
|
||||
@name __MODULE__
|
||||
|
||||
@@ -30,23 +30,84 @@ defmodule WandererApp.Map.MapPoolDynamicSupervisor do
|
||||
start_child([map_id], pools |> Enum.count())
|
||||
|
||||
pid ->
|
||||
GenServer.cast(pid, {:start_map, map_id})
|
||||
GenServer.call(pid, {:start_map, map_id})
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
def stop_map(map_id) do
|
||||
{:ok, pool_uuid} = Cachex.get(@cache, map_id)
|
||||
case Cachex.get(@cache, map_id) do
|
||||
{:ok, nil} ->
|
||||
# Cache miss - try to find the pool by scanning the registry
|
||||
Logger.warning(
|
||||
"Cache miss for map #{map_id}, scanning registry for pool containing this map"
|
||||
)
|
||||
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, pool_uuid)
|
||||
) do
|
||||
find_pool_by_scanning_registry(map_id)
|
||||
|
||||
{:ok, pool_uuid} ->
|
||||
# Cache hit - use the pool_uuid to lookup the pool
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, pool_uuid)
|
||||
) do
|
||||
[] ->
|
||||
Logger.warning(
|
||||
"Pool with UUID #{pool_uuid} not found in registry for map #{map_id}, scanning registry"
|
||||
)
|
||||
|
||||
find_pool_by_scanning_registry(map_id)
|
||||
|
||||
[{pool_pid, _}] ->
|
||||
GenServer.call(pool_pid, {:stop_map, map_id})
|
||||
end
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Failed to lookup map #{map_id} in cache: #{inspect(reason)}")
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
defp find_pool_by_scanning_registry(map_id) do
|
||||
case Registry.lookup(@registry, WandererApp.Map.MapPool) do
|
||||
[] ->
|
||||
Logger.debug("No map pools found in registry for map #{map_id}")
|
||||
:ok
|
||||
|
||||
[{pool_pid, _}] ->
|
||||
GenServer.cast(pool_pid, {:stop_map, map_id})
|
||||
pools ->
|
||||
# Scan all pools to find the one containing this map_id
|
||||
found_pool =
|
||||
Enum.find_value(pools, fn {_pid, uuid} ->
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, uuid)
|
||||
) do
|
||||
[{pool_pid, map_ids}] ->
|
||||
if map_id in map_ids do
|
||||
{pool_pid, uuid}
|
||||
else
|
||||
nil
|
||||
end
|
||||
|
||||
_ ->
|
||||
nil
|
||||
end
|
||||
end)
|
||||
|
||||
case found_pool do
|
||||
{pool_pid, pool_uuid} ->
|
||||
Logger.info(
|
||||
"Found map #{map_id} in pool #{pool_uuid} via registry scan, updating cache"
|
||||
)
|
||||
|
||||
# Update the cache to fix the inconsistency
|
||||
Cachex.put(@cache, map_id, pool_uuid)
|
||||
GenServer.call(pool_pid, {:stop_map, map_id})
|
||||
|
||||
nil ->
|
||||
Logger.debug("Map #{map_id} not found in any pool registry")
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
|
||||
@@ -14,7 +14,8 @@ defmodule WandererApp.Map.MapPoolSupervisor do
|
||||
children = [
|
||||
{Registry, [keys: :unique, name: @unique_registry]},
|
||||
{Registry, [keys: :duplicate, name: @registry]},
|
||||
{WandererApp.Map.MapPoolDynamicSupervisor, []}
|
||||
{WandererApp.Map.MapPoolDynamicSupervisor, []},
|
||||
{WandererApp.Map.Reconciler, []}
|
||||
]
|
||||
|
||||
Supervisor.init(children, strategy: :rest_for_one, max_restarts: 10)
|
||||
|
||||
280
lib/wanderer_app/map/map_reconciler.ex
Normal file
280
lib/wanderer_app/map/map_reconciler.ex
Normal file
@@ -0,0 +1,280 @@
|
||||
defmodule WandererApp.Map.Reconciler do
|
||||
@moduledoc """
|
||||
Periodically reconciles map state across different stores (Cache, Registry, GenServer state)
|
||||
to detect and fix inconsistencies that may prevent map servers from restarting.
|
||||
"""
|
||||
use GenServer
|
||||
|
||||
require Logger
|
||||
|
||||
@cache :map_pool_cache
|
||||
@registry :map_pool_registry
|
||||
@unique_registry :unique_map_pool_registry
|
||||
@reconciliation_interval :timer.minutes(5)
|
||||
|
||||
def start_link(_opts) do
|
||||
GenServer.start_link(__MODULE__, [], name: __MODULE__)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def init(_opts) do
|
||||
Logger.info("Starting Map Reconciler")
|
||||
schedule_reconciliation()
|
||||
{:ok, %{}}
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_info(:reconcile, state) do
|
||||
schedule_reconciliation()
|
||||
|
||||
try do
|
||||
reconcile_state()
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("""
|
||||
[Map Reconciler] reconciliation error: #{Exception.message(e)}
|
||||
#{Exception.format_stacktrace(__STACKTRACE__)}
|
||||
""")
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@doc """
|
||||
Manually trigger a reconciliation (useful for testing or manual cleanup)
|
||||
"""
|
||||
def trigger_reconciliation do
|
||||
GenServer.cast(__MODULE__, :reconcile_now)
|
||||
end
|
||||
|
||||
@impl true
|
||||
def handle_cast(:reconcile_now, state) do
|
||||
try do
|
||||
reconcile_state()
|
||||
rescue
|
||||
e ->
|
||||
Logger.error("""
|
||||
[Map Reconciler] manual reconciliation error: #{Exception.message(e)}
|
||||
#{Exception.format_stacktrace(__STACKTRACE__)}
|
||||
""")
|
||||
end
|
||||
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
defp schedule_reconciliation do
|
||||
Process.send_after(self(), :reconcile, @reconciliation_interval)
|
||||
end
|
||||
|
||||
defp reconcile_state do
|
||||
Logger.debug("[Map Reconciler] Starting state reconciliation")
|
||||
|
||||
# Get started_maps from cache
|
||||
{:ok, started_maps} = WandererApp.Cache.lookup("started_maps", [])
|
||||
|
||||
# Get all maps from registries
|
||||
registry_maps = get_all_registry_maps()
|
||||
|
||||
# Detect zombie maps (in started_maps but not in any registry)
|
||||
zombie_maps = started_maps -- registry_maps
|
||||
# Detect orphan maps (in registry but not in started_maps)
|
||||
orphan_maps = registry_maps -- started_maps
|
||||
|
||||
# Detect cache inconsistencies (map_pool_cache pointing to wrong or non-existent pools)
|
||||
cache_inconsistencies = find_cache_inconsistencies(registry_maps)
|
||||
|
||||
stats = %{
|
||||
total_started_maps: length(started_maps),
|
||||
total_registry_maps: length(registry_maps),
|
||||
zombie_maps: length(zombie_maps),
|
||||
orphan_maps: length(orphan_maps),
|
||||
cache_inconsistencies: length(cache_inconsistencies)
|
||||
}
|
||||
|
||||
Logger.info("[Map Reconciler] Reconciliation stats: #{inspect(stats)}")
|
||||
|
||||
# Emit telemetry
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :map, :reconciliation],
|
||||
stats,
|
||||
%{}
|
||||
)
|
||||
|
||||
# Clean up zombie maps
|
||||
cleanup_zombie_maps(zombie_maps)
|
||||
|
||||
# Fix orphan maps
|
||||
fix_orphan_maps(orphan_maps)
|
||||
|
||||
# Fix cache inconsistencies
|
||||
fix_cache_inconsistencies(cache_inconsistencies)
|
||||
|
||||
Logger.debug("[Map Reconciler] State reconciliation completed")
|
||||
end
|
||||
|
||||
defp get_all_registry_maps do
|
||||
case Registry.lookup(@registry, WandererApp.Map.MapPool) do
|
||||
[] ->
|
||||
[]
|
||||
|
||||
pools ->
|
||||
pools
|
||||
|> Enum.flat_map(fn {_pid, uuid} ->
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, uuid)
|
||||
) do
|
||||
[{_pool_pid, map_ids}] -> map_ids
|
||||
_ -> []
|
||||
end
|
||||
end)
|
||||
|> Enum.uniq()
|
||||
end
|
||||
end
|
||||
|
||||
defp find_cache_inconsistencies(registry_maps) do
|
||||
registry_maps
|
||||
|> Enum.filter(fn map_id ->
|
||||
case Cachex.get(@cache, map_id) do
|
||||
{:ok, nil} ->
|
||||
# Map in registry but not in cache
|
||||
true
|
||||
|
||||
{:ok, pool_uuid} ->
|
||||
# Check if the pool_uuid actually exists in registry
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, pool_uuid)
|
||||
) do
|
||||
[] ->
|
||||
# Cache points to non-existent pool
|
||||
true
|
||||
|
||||
[{_pool_pid, map_ids}] ->
|
||||
# Check if this map is actually in the pool's map_ids
|
||||
map_id not in map_ids
|
||||
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
|
||||
{:error, _} ->
|
||||
true
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp cleanup_zombie_maps([]), do: :ok
|
||||
|
||||
defp cleanup_zombie_maps(zombie_maps) do
|
||||
Logger.warning("[Map Reconciler] Found #{length(zombie_maps)} zombie maps: #{inspect(zombie_maps)}")
|
||||
|
||||
Enum.each(zombie_maps, fn map_id ->
|
||||
Logger.info("[Map Reconciler] Cleaning up zombie map: #{map_id}")
|
||||
|
||||
# Remove from started_maps cache
|
||||
WandererApp.Cache.insert_or_update(
|
||||
"started_maps",
|
||||
[],
|
||||
fn started_maps ->
|
||||
started_maps |> Enum.reject(fn started_map_id -> started_map_id == map_id end)
|
||||
end
|
||||
)
|
||||
|
||||
# Clean up any stale map_pool_cache entries
|
||||
Cachex.del(@cache, map_id)
|
||||
|
||||
# Clean up map-specific caches
|
||||
WandererApp.Cache.delete("map_#{map_id}:started")
|
||||
WandererApp.Cache.delete("map_characters-#{map_id}")
|
||||
WandererApp.Map.CacheRTree.clear_tree("rtree_#{map_id}")
|
||||
WandererApp.Map.delete_map_state(map_id)
|
||||
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :map, :reconciliation, :zombie_cleanup],
|
||||
%{count: 1},
|
||||
%{map_id: map_id}
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
defp fix_orphan_maps([]), do: :ok
|
||||
|
||||
defp fix_orphan_maps(orphan_maps) do
|
||||
Logger.warning("[Map Reconciler] Found #{length(orphan_maps)} orphan maps: #{inspect(orphan_maps)}")
|
||||
|
||||
Enum.each(orphan_maps, fn map_id ->
|
||||
Logger.info("[Map Reconciler] Fixing orphan map: #{map_id}")
|
||||
|
||||
# Add to started_maps cache
|
||||
WandererApp.Cache.insert_or_update(
|
||||
"started_maps",
|
||||
[map_id],
|
||||
fn existing ->
|
||||
[map_id | existing] |> Enum.uniq()
|
||||
end
|
||||
)
|
||||
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :map, :reconciliation, :orphan_fixed],
|
||||
%{count: 1},
|
||||
%{map_id: map_id}
|
||||
)
|
||||
end)
|
||||
end
|
||||
|
||||
defp fix_cache_inconsistencies([]), do: :ok
|
||||
|
||||
defp fix_cache_inconsistencies(inconsistent_maps) do
|
||||
Logger.warning(
|
||||
"[Map Reconciler] Found #{length(inconsistent_maps)} cache inconsistencies: #{inspect(inconsistent_maps)}"
|
||||
)
|
||||
|
||||
Enum.each(inconsistent_maps, fn map_id ->
|
||||
Logger.info("[Map Reconciler] Fixing cache inconsistency for map: #{map_id}")
|
||||
|
||||
# Find the correct pool for this map
|
||||
case find_pool_for_map(map_id) do
|
||||
{:ok, pool_uuid} ->
|
||||
Logger.info("[Map Reconciler] Updating cache: #{map_id} -> #{pool_uuid}")
|
||||
Cachex.put(@cache, map_id, pool_uuid)
|
||||
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :map, :reconciliation, :cache_fixed],
|
||||
%{count: 1},
|
||||
%{map_id: map_id, pool_uuid: pool_uuid}
|
||||
)
|
||||
|
||||
:error ->
|
||||
Logger.warning("[Map Reconciler] Could not find pool for map #{map_id}, removing from cache")
|
||||
Cachex.del(@cache, map_id)
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp find_pool_for_map(map_id) do
|
||||
case Registry.lookup(@registry, WandererApp.Map.MapPool) do
|
||||
[] ->
|
||||
:error
|
||||
|
||||
pools ->
|
||||
pools
|
||||
|> Enum.find_value(:error, fn {_pid, uuid} ->
|
||||
case Registry.lookup(
|
||||
@unique_registry,
|
||||
Module.concat(WandererApp.Map.MapPool, uuid)
|
||||
) do
|
||||
[{_pool_pid, map_ids}] ->
|
||||
if map_id in map_ids do
|
||||
{:ok, uuid}
|
||||
else
|
||||
nil
|
||||
end
|
||||
|
||||
_ ->
|
||||
nil
|
||||
end
|
||||
end)
|
||||
end
|
||||
end
|
||||
end
|
||||
@@ -537,6 +537,12 @@ defmodule WandererApp.Map.Server.ConnectionsImpl do
|
||||
|
||||
Impl.broadcast!(map_id, :add_connection, connection)
|
||||
|
||||
Impl.broadcast!(map_id, :maybe_link_signature, %{
|
||||
character_id: character_id,
|
||||
solar_system_source: old_location.solar_system_id,
|
||||
solar_system_target: location.solar_system_id
|
||||
})
|
||||
|
||||
# ADDITIVE: Also broadcast to external event system (webhooks/WebSocket)
|
||||
WandererApp.ExternalEvents.broadcast(map_id, :connection_added, %{
|
||||
connection_id: connection.id,
|
||||
@@ -548,19 +554,12 @@ defmodule WandererApp.Map.Server.ConnectionsImpl do
|
||||
time_status: connection.time_status
|
||||
})
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: character_id,
|
||||
user_id: character.user_id,
|
||||
map_id: map_id,
|
||||
solar_system_source_id: old_location.solar_system_id,
|
||||
solar_system_target_id: location.solar_system_id
|
||||
})
|
||||
|
||||
Impl.broadcast!(map_id, :maybe_link_signature, %{
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: character_id,
|
||||
solar_system_source: old_location.solar_system_id,
|
||||
solar_system_target: location.solar_system_id
|
||||
user_id: character.user_id,
|
||||
map_id: map_id,
|
||||
solar_system_source_id: old_location.solar_system_id,
|
||||
solar_system_target_id: location.solar_system_id
|
||||
})
|
||||
|
||||
:ok
|
||||
|
||||
@@ -642,13 +642,12 @@ defmodule WandererApp.Map.Server.SystemsImpl do
|
||||
position_y: system.position_y
|
||||
})
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:system_added, %{
|
||||
character_id: character_id,
|
||||
user_id: user_id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:system_added, %{
|
||||
character_id: character_id,
|
||||
user_id: user_id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
end
|
||||
|
||||
defp maybe_update_extra_info(system, nil), do: system
|
||||
|
||||
@@ -501,13 +501,16 @@ defmodule WandererApp.SecurityAudit do
|
||||
# Ensure event_type is properly formatted
|
||||
event_type = normalize_event_type(audit_entry.event_type)
|
||||
|
||||
# Generate unique entity_id to avoid constraint violations
|
||||
entity_id = generate_entity_id(audit_entry.session_id)
|
||||
|
||||
attrs = %{
|
||||
user_id: audit_entry.user_id,
|
||||
character_id: nil,
|
||||
entity_id: hash_identifier(audit_entry.session_id),
|
||||
entity_id: entity_id,
|
||||
entity_type: :security_event,
|
||||
event_type: event_type,
|
||||
event_data: encode_event_data(audit_entry)
|
||||
event_data: encode_event_data(audit_entry),
|
||||
user_id: audit_entry.user_id,
|
||||
character_id: nil
|
||||
}
|
||||
|
||||
case UserActivity.new(attrs) do
|
||||
@@ -619,8 +622,13 @@ defmodule WandererApp.SecurityAudit do
|
||||
defp convert_datetime(%NaiveDateTime{} = dt), do: NaiveDateTime.to_iso8601(dt)
|
||||
defp convert_datetime(value), do: value
|
||||
|
||||
defp generate_entity_id do
|
||||
"audit_#{DateTime.utc_now() |> DateTime.to_unix(:microsecond)}_#{System.unique_integer([:positive])}"
|
||||
defp generate_entity_id(session_id \\ nil) do
|
||||
if session_id do
|
||||
# Include high-resolution timestamp and unique component for guaranteed uniqueness
|
||||
"#{hash_identifier(session_id)}_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
|
||||
else
|
||||
"audit_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
|
||||
end
|
||||
end
|
||||
|
||||
defp async_enabled? do
|
||||
|
||||
@@ -88,20 +88,21 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
||||
def handle_cast({:log_event, audit_entry}, state) do
|
||||
# Add to buffer
|
||||
buffer = [audit_entry | state.buffer]
|
||||
buf_len = length(buffer)
|
||||
|
||||
# Update stats
|
||||
stats = Map.update!(state.stats, :events_processed, &(&1 + 1))
|
||||
|
||||
# Check if we need to flush
|
||||
cond do
|
||||
length(buffer) >= state.batch_size ->
|
||||
buf_len >= state.batch_size ->
|
||||
# Flush immediately if batch size reached
|
||||
{:noreply, do_flush(%{state | buffer: buffer, stats: stats})}
|
||||
|
||||
length(buffer) >= @max_buffer_size ->
|
||||
buf_len >= @max_buffer_size ->
|
||||
# Force flush if max buffer size reached
|
||||
Logger.warning("Security audit buffer overflow, forcing flush",
|
||||
buffer_size: length(buffer),
|
||||
buffer_size: buf_len,
|
||||
max_size: @max_buffer_size
|
||||
)
|
||||
|
||||
@@ -186,23 +187,66 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
||||
# Clear buffer
|
||||
%{state | buffer: [], stats: stats}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Failed to flush security audit events",
|
||||
reason: inspect(reason),
|
||||
event_count: length(events)
|
||||
{:partial, success_count, failed_events} ->
|
||||
failed_count = length(failed_events)
|
||||
|
||||
Logger.warning(
|
||||
"Partial flush: stored #{success_count}, failed #{failed_count} audit events",
|
||||
success_count: success_count,
|
||||
failed_count: failed_count,
|
||||
buffer_size: length(state.buffer)
|
||||
)
|
||||
|
||||
# Emit telemetry for monitoring
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :security_audit, :async_flush_partial],
|
||||
%{success_count: success_count, failed_count: failed_count},
|
||||
%{}
|
||||
)
|
||||
|
||||
# Update stats - count partial flush as both success and error
|
||||
stats =
|
||||
state.stats
|
||||
|> Map.update!(:batches_flushed, &(&1 + 1))
|
||||
|> Map.update!(:errors, &(&1 + 1))
|
||||
|> Map.put(:last_flush, DateTime.utc_now())
|
||||
|
||||
# Extract just the events from failed_events tuples
|
||||
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
|
||||
|
||||
remaining_buffer = Enum.reject(state.buffer, fn ev -> ev in failed_only end)
|
||||
|
||||
# Re-buffer failed events at the front, preserving newest-first ordering
|
||||
# Reverse failed_only since flush reversed the buffer to oldest-first
|
||||
new_buffer = Enum.reverse(failed_only) ++ remaining_buffer
|
||||
buffer = handle_buffer_overflow(new_buffer, @max_buffer_size)
|
||||
|
||||
%{state | buffer: buffer, stats: stats}
|
||||
|
||||
{:error, failed_events} ->
|
||||
failed_count = length(failed_events)
|
||||
|
||||
Logger.error("Failed to flush all #{failed_count} security audit events",
|
||||
failed_count: failed_count,
|
||||
buffer_size: length(state.buffer)
|
||||
)
|
||||
|
||||
# Emit telemetry for monitoring
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :security_audit, :async_flush_failure],
|
||||
%{count: 1, event_count: failed_count},
|
||||
%{}
|
||||
)
|
||||
|
||||
# Update error stats
|
||||
stats = Map.update!(state.stats, :errors, &(&1 + 1))
|
||||
|
||||
# Implement backoff - keep events in buffer but don't grow indefinitely
|
||||
buffer =
|
||||
if length(state.buffer) > @max_buffer_size do
|
||||
Logger.warning("Dropping oldest audit events due to repeated flush failures")
|
||||
Enum.take(state.buffer, @max_buffer_size)
|
||||
else
|
||||
state.buffer
|
||||
end
|
||||
# Extract just the events from failed_events tuples
|
||||
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
|
||||
|
||||
# Since ALL events failed, the new buffer should only contain the failed events
|
||||
# Reverse to maintain newest-first ordering (flush reversed to oldest-first)
|
||||
buffer = handle_buffer_overflow(Enum.reverse(failed_only), @max_buffer_size)
|
||||
|
||||
%{state | buffer: buffer, stats: stats}
|
||||
end
|
||||
@@ -213,34 +257,100 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
||||
events
|
||||
# Ash bulk operations work better with smaller chunks
|
||||
|> Enum.chunk_every(50)
|
||||
|> Enum.reduce_while({:ok, 0}, fn chunk, {:ok, count} ->
|
||||
|> Enum.reduce({0, []}, fn chunk, {total_success, all_failed} ->
|
||||
case store_event_chunk(chunk) do
|
||||
{:ok, chunk_count} ->
|
||||
{:cont, {:ok, count + chunk_count}}
|
||||
{total_success + chunk_count, all_failed}
|
||||
|
||||
{:error, _} = error ->
|
||||
{:halt, error}
|
||||
{:partial, chunk_count, failed_events} ->
|
||||
{total_success + chunk_count, all_failed ++ failed_events}
|
||||
|
||||
{:error, failed_events} ->
|
||||
{total_success, all_failed ++ failed_events}
|
||||
end
|
||||
end)
|
||||
|> then(fn {success_count, failed_events_list} ->
|
||||
# Derive the final return shape based on results
|
||||
cond do
|
||||
failed_events_list == [] ->
|
||||
{:ok, success_count}
|
||||
|
||||
success_count == 0 ->
|
||||
{:error, failed_events_list}
|
||||
|
||||
true ->
|
||||
{:partial, success_count, failed_events_list}
|
||||
end
|
||||
end)
|
||||
end
|
||||
|
||||
defp handle_buffer_overflow(buffer, max_size) when length(buffer) > max_size do
|
||||
dropped = length(buffer) - max_size
|
||||
|
||||
Logger.warning(
|
||||
"Dropping #{dropped} oldest audit events due to buffer overflow",
|
||||
buffer_size: length(buffer),
|
||||
max_size: max_size
|
||||
)
|
||||
|
||||
# Emit telemetry for dropped events
|
||||
:telemetry.execute(
|
||||
[:wanderer_app, :security_audit, :events_dropped],
|
||||
%{count: dropped},
|
||||
%{}
|
||||
)
|
||||
|
||||
# Keep the newest events (take from the front since buffer is newest-first)
|
||||
Enum.take(buffer, max_size)
|
||||
end
|
||||
|
||||
defp handle_buffer_overflow(buffer, _max_size), do: buffer
|
||||
|
||||
defp store_event_chunk(events) do
|
||||
# Transform events to Ash attributes
|
||||
records =
|
||||
Enum.map(events, fn event ->
|
||||
SecurityAudit.do_store_audit_entry(event)
|
||||
# Process each event and partition results
|
||||
{successes, failures} =
|
||||
events
|
||||
|> Enum.map(fn event ->
|
||||
case SecurityAudit.do_store_audit_entry(event) do
|
||||
:ok ->
|
||||
{:ok, event}
|
||||
|
||||
{:error, reason} ->
|
||||
Logger.error("Failed to store individual audit event",
|
||||
error: inspect(reason),
|
||||
event_type: Map.get(event, :event_type),
|
||||
user_id: Map.get(event, :user_id)
|
||||
)
|
||||
|
||||
{:error, {event, reason}}
|
||||
end
|
||||
end)
|
||||
|> Enum.split_with(fn
|
||||
{:ok, _} -> true
|
||||
{:error, _} -> false
|
||||
end)
|
||||
|
||||
# Count successful stores
|
||||
successful =
|
||||
Enum.count(records, fn
|
||||
:ok -> true
|
||||
_ -> false
|
||||
end)
|
||||
successful_count = length(successes)
|
||||
failed_count = length(failures)
|
||||
|
||||
{:ok, successful}
|
||||
rescue
|
||||
error ->
|
||||
{:error, error}
|
||||
# Extract failed events with reasons
|
||||
failed_events = Enum.map(failures, fn {:error, event_reason} -> event_reason end)
|
||||
|
||||
# Log if some events failed (telemetry will be emitted at flush level)
|
||||
if failed_count > 0 do
|
||||
Logger.debug("Chunk processing: #{failed_count} of #{length(events)} events failed")
|
||||
end
|
||||
|
||||
# Return richer result shape
|
||||
cond do
|
||||
successful_count == 0 ->
|
||||
{:error, failed_events}
|
||||
|
||||
failed_count > 0 ->
|
||||
{:partial, successful_count, failed_events}
|
||||
|
||||
true ->
|
||||
{:ok, successful_count}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -11,7 +11,9 @@ defmodule WandererApp.Server.ServerStatusTracker do
|
||||
:server_version,
|
||||
:start_time,
|
||||
:vip,
|
||||
:retries
|
||||
:retries,
|
||||
:in_forced_downtime,
|
||||
:downtime_notified
|
||||
]
|
||||
|
||||
@retries_count 3
|
||||
@@ -21,9 +23,17 @@ defmodule WandererApp.Server.ServerStatusTracker do
|
||||
retries: @retries_count,
|
||||
server_version: "0",
|
||||
start_time: "0",
|
||||
vip: true
|
||||
vip: true,
|
||||
in_forced_downtime: false,
|
||||
downtime_notified: false
|
||||
}
|
||||
|
||||
# EVE Online daily downtime period (UTC/GMT)
|
||||
@downtime_start_hour 10
|
||||
@downtime_start_minute 58
|
||||
@downtime_end_hour 11
|
||||
@downtime_end_minute 2
|
||||
|
||||
@refresh_interval :timer.minutes(1)
|
||||
|
||||
@logger Application.compile_env(:wanderer_app, :logger)
|
||||
@@ -57,13 +67,51 @@ defmodule WandererApp.Server.ServerStatusTracker do
|
||||
def handle_info(
|
||||
:refresh_status,
|
||||
%{
|
||||
retries: retries
|
||||
retries: retries,
|
||||
in_forced_downtime: was_in_downtime
|
||||
} = state
|
||||
) do
|
||||
Process.send_after(self(), :refresh_status, @refresh_interval)
|
||||
Task.async(fn -> get_server_status(retries) end)
|
||||
|
||||
{:noreply, state}
|
||||
in_downtime = in_forced_downtime?()
|
||||
|
||||
cond do
|
||||
# Entering downtime period - broadcast offline status immediately
|
||||
in_downtime and not was_in_downtime ->
|
||||
@logger.info("#{__MODULE__} entering forced downtime period (10:58-11:02 GMT)")
|
||||
|
||||
downtime_status = %{
|
||||
players: 0,
|
||||
server_version: "downtime",
|
||||
start_time: DateTime.utc_now() |> DateTime.to_iso8601(),
|
||||
vip: true
|
||||
}
|
||||
|
||||
Phoenix.PubSub.broadcast(
|
||||
WandererApp.PubSub,
|
||||
"server_status",
|
||||
{:server_status, downtime_status}
|
||||
)
|
||||
|
||||
{:noreply,
|
||||
%{state | in_forced_downtime: true, downtime_notified: true}
|
||||
|> Map.merge(downtime_status)}
|
||||
|
||||
# Currently in downtime - skip API call
|
||||
in_downtime ->
|
||||
{:noreply, state}
|
||||
|
||||
# Exiting downtime period - resume normal operations
|
||||
not in_downtime and was_in_downtime ->
|
||||
@logger.info("#{__MODULE__} exiting forced downtime period, resuming normal operations")
|
||||
Task.async(fn -> get_server_status(retries) end)
|
||||
{:noreply, %{state | in_forced_downtime: false, downtime_notified: false}}
|
||||
|
||||
# Normal operation
|
||||
true ->
|
||||
Task.async(fn -> get_server_status(retries) end)
|
||||
{:noreply, state}
|
||||
end
|
||||
end
|
||||
|
||||
@impl true
|
||||
@@ -155,4 +203,19 @@ defmodule WandererApp.Server.ServerStatusTracker do
|
||||
vip: false
|
||||
}
|
||||
end
|
||||
|
||||
# Checks if the current UTC time falls within the forced downtime period (10:58-11:02 GMT).
|
||||
defp in_forced_downtime? do
|
||||
now = DateTime.utc_now()
|
||||
current_hour = now.hour
|
||||
current_minute = now.minute
|
||||
|
||||
# Convert times to minutes since midnight for easier comparison
|
||||
current_time_minutes = current_hour * 60 + current_minute
|
||||
downtime_start_minutes = @downtime_start_hour * 60 + @downtime_start_minute
|
||||
downtime_end_minutes = @downtime_end_hour * 60 + @downtime_end_minute
|
||||
|
||||
current_time_minutes >= downtime_start_minutes and
|
||||
current_time_minutes < downtime_end_minutes
|
||||
end
|
||||
end
|
||||
|
||||
@@ -1,16 +1,57 @@
|
||||
defmodule WandererApp.User.ActivityTracker do
|
||||
@moduledoc false
|
||||
@moduledoc """
|
||||
Activity tracking wrapper that ensures audit logging never crashes application logic.
|
||||
|
||||
Activity tracking is best-effort and errors are logged but not propagated to callers.
|
||||
This prevents race conditions (e.g., duplicate activity records) from affecting
|
||||
critical business operations like character tracking or connection management.
|
||||
"""
|
||||
require Logger
|
||||
|
||||
def track_map_event(
|
||||
event_type,
|
||||
metadata
|
||||
),
|
||||
do: WandererApp.Map.Audit.track_map_event(event_type, metadata)
|
||||
@doc """
|
||||
Track a map-related event. Always returns `{:ok, result}` even on error.
|
||||
|
||||
def track_acl_event(
|
||||
event_type,
|
||||
metadata
|
||||
),
|
||||
do: WandererApp.Map.Audit.track_acl_event(event_type, metadata)
|
||||
Errors (such as unique constraint violations from concurrent operations)
|
||||
are logged but do not propagate to prevent crashing critical application logic.
|
||||
"""
|
||||
def track_map_event(event_type, metadata) do
|
||||
case WandererApp.Map.Audit.track_map_event(event_type, metadata) do
|
||||
{:ok, result} ->
|
||||
{:ok, result}
|
||||
|
||||
{:error, error} ->
|
||||
Logger.warning("Failed to track map event (non-critical)",
|
||||
event_type: event_type,
|
||||
map_id: metadata[:map_id],
|
||||
error: inspect(error),
|
||||
reason: :best_effort_tracking
|
||||
)
|
||||
|
||||
# Return success to prevent crashes - activity tracking is best-effort
|
||||
{:ok, nil}
|
||||
end
|
||||
end
|
||||
|
||||
@doc """
|
||||
Track an ACL-related event. Always returns `{:ok, result}` even on error.
|
||||
|
||||
Errors are logged but do not propagate to prevent crashing critical application logic.
|
||||
"""
|
||||
def track_acl_event(event_type, metadata) do
|
||||
case WandererApp.Map.Audit.track_acl_event(event_type, metadata) do
|
||||
{:ok, result} ->
|
||||
{:ok, result}
|
||||
|
||||
{:error, error} ->
|
||||
Logger.warning("Failed to track ACL event (non-critical)",
|
||||
event_type: event_type,
|
||||
acl_id: metadata[:acl_id],
|
||||
error: inspect(error),
|
||||
reason: :best_effort_tracking
|
||||
)
|
||||
|
||||
# Return success to prevent crashes - activity tracking is best-effort
|
||||
{:ok, nil}
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
@@ -59,14 +59,13 @@ defmodule WandererAppWeb.MapConnectionsEventHandler do
|
||||
character_id: main_character_id
|
||||
})
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user_id,
|
||||
map_id: map_id,
|
||||
solar_system_source_id: "#{solar_system_source_id}" |> String.to_integer(),
|
||||
solar_system_target_id: "#{solar_system_target_id}" |> String.to_integer()
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user_id,
|
||||
map_id: map_id,
|
||||
solar_system_source_id: "#{solar_system_source_id}" |> String.to_integer(),
|
||||
solar_system_target_id: "#{solar_system_target_id}" |> String.to_integer()
|
||||
})
|
||||
|
||||
{:noreply, socket}
|
||||
end
|
||||
@@ -149,7 +148,6 @@ defmodule WandererAppWeb.MapConnectionsEventHandler do
|
||||
end
|
||||
end
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_removed, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user_id,
|
||||
@@ -202,7 +200,6 @@ defmodule WandererAppWeb.MapConnectionsEventHandler do
|
||||
_ -> nil
|
||||
end
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_connection_updated, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user_id,
|
||||
|
||||
@@ -165,13 +165,12 @@ defmodule WandererAppWeb.MapRoutesEventHandler do
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:hub_added, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:hub_added, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
|
||||
{:noreply, socket}
|
||||
else
|
||||
@@ -204,13 +203,12 @@ defmodule WandererAppWeb.MapRoutesEventHandler do
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:hub_removed, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:hub_removed, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: solar_system_id
|
||||
})
|
||||
|
||||
{:noreply, socket}
|
||||
end
|
||||
|
||||
@@ -250,15 +250,14 @@ defmodule WandererAppWeb.MapSystemsEventHandler do
|
||||
|> Map.put_new(key_atom, value)
|
||||
])
|
||||
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:system_updated, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: "#{solar_system_id}" |> String.to_integer(),
|
||||
key: key_atom,
|
||||
value: value
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:system_updated, %{
|
||||
character_id: main_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map_id,
|
||||
solar_system_id: "#{solar_system_id}" |> String.to_integer(),
|
||||
key: key_atom,
|
||||
value: value
|
||||
})
|
||||
end
|
||||
|
||||
{:noreply, socket}
|
||||
|
||||
@@ -383,24 +383,22 @@ defmodule WandererAppWeb.MapsLive do
|
||||
|
||||
added_acls
|
||||
|> Enum.each(fn acl_id ->
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_acl_added, %{
|
||||
character_id: first_tracked_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map.id,
|
||||
acl_id: acl_id
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_acl_added, %{
|
||||
character_id: first_tracked_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map.id,
|
||||
acl_id: acl_id
|
||||
})
|
||||
end)
|
||||
|
||||
removed_acls
|
||||
|> Enum.each(fn acl_id ->
|
||||
{:ok, _} =
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_acl_removed, %{
|
||||
character_id: first_tracked_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map.id,
|
||||
acl_id: acl_id
|
||||
})
|
||||
WandererApp.User.ActivityTracker.track_map_event(:map_acl_removed, %{
|
||||
character_id: first_tracked_character_id,
|
||||
user_id: current_user.id,
|
||||
map_id: map.id,
|
||||
acl_id: acl_id
|
||||
})
|
||||
end)
|
||||
|
||||
{:noreply,
|
||||
|
||||
2
mix.exs
2
mix.exs
@@ -3,7 +3,7 @@ defmodule WandererApp.MixProject do
|
||||
|
||||
@source_url "https://github.com/wanderer-industries/wanderer"
|
||||
|
||||
@version "1.84.13"
|
||||
@version "1.84.17"
|
||||
|
||||
def project do
|
||||
[
|
||||
|
||||
343
test/unit/map/map_pool_test.exs
Normal file
343
test/unit/map/map_pool_test.exs
Normal file
@@ -0,0 +1,343 @@
|
||||
defmodule WandererApp.Map.MapPoolTest do
|
||||
use ExUnit.Case, async: false
|
||||
|
||||
alias WandererApp.Map.{MapPool, MapPoolDynamicSupervisor, Reconciler}
|
||||
|
||||
@cache :map_pool_cache
|
||||
@registry :map_pool_registry
|
||||
@unique_registry :unique_map_pool_registry
|
||||
|
||||
setup do
|
||||
# Clean up any existing test data
|
||||
cleanup_test_data()
|
||||
|
||||
# Check if required infrastructure is running
|
||||
registries_running? =
|
||||
try do
|
||||
Registry.keys(@registry, self()) != :error
|
||||
rescue
|
||||
_ -> false
|
||||
end
|
||||
|
||||
reconciler_running? = Process.whereis(Reconciler) != nil
|
||||
|
||||
on_exit(fn ->
|
||||
cleanup_test_data()
|
||||
end)
|
||||
|
||||
{:ok, registries_running: registries_running?, reconciler_running: reconciler_running?}
|
||||
end
|
||||
|
||||
defp cleanup_test_data do
|
||||
# Clean up test caches
|
||||
WandererApp.Cache.delete("started_maps")
|
||||
Cachex.clear(@cache)
|
||||
end
|
||||
|
||||
describe "garbage collection with synchronous stop" do
|
||||
@tag :skip
|
||||
test "garbage collector successfully stops map with synchronous call" do
|
||||
# This test would require setting up a full map pool with a test map
|
||||
# Skipping for now as it requires more complex setup with actual map data
|
||||
:ok
|
||||
end
|
||||
|
||||
@tag :skip
|
||||
test "garbage collector handles stop failures gracefully" do
|
||||
# This test would verify error handling when stop fails
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
describe "cache lookup with registry fallback" do
|
||||
test "stop_map handles cache miss by scanning registry", %{registries_running: registries_running?} do
|
||||
if registries_running? do
|
||||
# Setup: Create a map_id that's not in cache but will be found in registry scan
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Verify cache is empty for this map
|
||||
assert {:ok, nil} = Cachex.get(@cache, map_id)
|
||||
|
||||
# Call stop_map - should handle gracefully with fallback
|
||||
assert :ok = MapPoolDynamicSupervisor.stop_map(map_id)
|
||||
else
|
||||
# Skip test if registries not running
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
test "stop_map handles non-existent pool_uuid in registry", %{registries_running: registries_running?} do
|
||||
if registries_running? do
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
fake_uuid = "fake_uuid_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Put fake uuid in cache that doesn't exist in registry
|
||||
Cachex.put(@cache, map_id, fake_uuid)
|
||||
|
||||
# Call stop_map - should handle gracefully with fallback
|
||||
assert :ok = MapPoolDynamicSupervisor.stop_map(map_id)
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
test "stop_map updates cache when found via registry scan", %{registries_running: registries_running?} do
|
||||
if registries_running? do
|
||||
# This test would require a running pool with registered maps
|
||||
# For now, we verify the fallback logic doesn't crash
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
assert :ok = MapPoolDynamicSupervisor.stop_map(map_id)
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "state cleanup atomicity" do
|
||||
@tag :skip
|
||||
test "rollback occurs when registry update fails" do
|
||||
# This would require mocking Registry.update_value to fail
|
||||
# Skipping for now as it requires more complex mocking setup
|
||||
:ok
|
||||
end
|
||||
|
||||
@tag :skip
|
||||
test "rollback occurs when cache delete fails" do
|
||||
# This would require mocking Cachex.del to fail
|
||||
:ok
|
||||
end
|
||||
|
||||
@tag :skip
|
||||
test "successful cleanup updates all three state stores" do
|
||||
# This would verify Registry, Cache, and GenServer state are all updated
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
describe "Reconciler - zombie map detection and cleanup" do
|
||||
test "reconciler detects zombie maps in started_maps cache", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
# Setup: Add maps to started_maps that aren't in any registry
|
||||
zombie_map_id = "zombie_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
WandererApp.Cache.insert_or_update(
|
||||
"started_maps",
|
||||
[zombie_map_id],
|
||||
fn existing -> [zombie_map_id | existing] |> Enum.uniq() end
|
||||
)
|
||||
|
||||
# Get started_maps
|
||||
{:ok, started_maps} = WandererApp.Cache.lookup("started_maps", [])
|
||||
assert zombie_map_id in started_maps
|
||||
|
||||
# Trigger reconciliation
|
||||
send(Reconciler, :reconcile)
|
||||
# Give it time to process
|
||||
Process.sleep(200)
|
||||
|
||||
# Verify zombie was cleaned up
|
||||
{:ok, started_maps_after} = WandererApp.Cache.lookup("started_maps", [])
|
||||
refute zombie_map_id in started_maps_after
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
test "reconciler cleans up zombie map caches", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
zombie_map_id = "zombie_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Setup zombie state
|
||||
WandererApp.Cache.insert_or_update(
|
||||
"started_maps",
|
||||
[zombie_map_id],
|
||||
fn existing -> [zombie_map_id | existing] |> Enum.uniq() end
|
||||
)
|
||||
|
||||
WandererApp.Cache.insert("map_#{zombie_map_id}:started", true)
|
||||
Cachex.put(@cache, zombie_map_id, "fake_uuid")
|
||||
|
||||
# Trigger reconciliation
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
# Verify all caches cleaned
|
||||
{:ok, started_maps} = WandererApp.Cache.lookup("started_maps", [])
|
||||
refute zombie_map_id in started_maps
|
||||
|
||||
{:ok, cache_entry} = Cachex.get(@cache, zombie_map_id)
|
||||
assert cache_entry == nil
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Reconciler - orphan map detection and fix" do
|
||||
@tag :skip
|
||||
test "reconciler detects orphan maps in registry" do
|
||||
# This would require setting up a pool with maps in registry
|
||||
# but not in started_maps cache
|
||||
:ok
|
||||
end
|
||||
|
||||
@tag :skip
|
||||
test "reconciler adds orphan maps to started_maps cache" do
|
||||
# This would verify orphan maps get added to the cache
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
describe "Reconciler - cache inconsistency detection and fix" do
|
||||
test "reconciler detects map with missing cache entry", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
# This test verifies the reconciler can detect when a map
|
||||
# is in the registry but has no cache entry
|
||||
# Since we can't easily set up a full pool, we test the detection logic
|
||||
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Ensure no cache entry
|
||||
Cachex.del(@cache, map_id)
|
||||
|
||||
# The reconciler would detect this if the map was in a registry
|
||||
# For now, we just verify the logic doesn't crash
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
# No assertions needed - just verifying no crashes
|
||||
end
|
||||
end
|
||||
|
||||
test "reconciler detects cache pointing to non-existent pool", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
fake_uuid = "fake_uuid_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Put fake uuid in cache
|
||||
Cachex.put(@cache, map_id, fake_uuid)
|
||||
|
||||
# Trigger reconciliation
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
# Cache entry should be removed since pool doesn't exist
|
||||
{:ok, cache_entry} = Cachex.get(@cache, map_id)
|
||||
assert cache_entry == nil
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Reconciler - stats and telemetry" do
|
||||
test "reconciler emits telemetry events", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
# Setup telemetry handler
|
||||
test_pid = self()
|
||||
|
||||
:telemetry.attach(
|
||||
"test-reconciliation",
|
||||
[:wanderer_app, :map, :reconciliation],
|
||||
fn _event, measurements, _metadata, _config ->
|
||||
send(test_pid, {:telemetry, measurements})
|
||||
end,
|
||||
nil
|
||||
)
|
||||
|
||||
# Trigger reconciliation
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
# Should receive telemetry event
|
||||
assert_receive {:telemetry, measurements}, 500
|
||||
|
||||
assert is_integer(measurements.total_started_maps)
|
||||
assert is_integer(measurements.total_registry_maps)
|
||||
assert is_integer(measurements.zombie_maps)
|
||||
assert is_integer(measurements.orphan_maps)
|
||||
assert is_integer(measurements.cache_inconsistencies)
|
||||
|
||||
# Cleanup
|
||||
:telemetry.detach("test-reconciliation")
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "Reconciler - manual trigger" do
|
||||
test "trigger_reconciliation runs reconciliation immediately", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
zombie_map_id = "zombie_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Setup zombie state
|
||||
WandererApp.Cache.insert_or_update(
|
||||
"started_maps",
|
||||
[zombie_map_id],
|
||||
fn existing -> [zombie_map_id | existing] |> Enum.uniq() end
|
||||
)
|
||||
|
||||
# Verify it exists
|
||||
{:ok, started_maps_before} = WandererApp.Cache.lookup("started_maps", [])
|
||||
assert zombie_map_id in started_maps_before
|
||||
|
||||
# Trigger manual reconciliation
|
||||
Reconciler.trigger_reconciliation()
|
||||
Process.sleep(200)
|
||||
|
||||
# Verify zombie was cleaned up
|
||||
{:ok, started_maps_after} = WandererApp.Cache.lookup("started_maps", [])
|
||||
refute zombie_map_id in started_maps_after
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
|
||||
describe "edge cases and error handling" do
|
||||
test "stop_map with cache error returns ok", %{registries_running: registries_running?} do
|
||||
if registries_running? do
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Even if cache operations fail, should return :ok
|
||||
assert :ok = MapPoolDynamicSupervisor.stop_map(map_id)
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
test "reconciler handles empty registries gracefully", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
# Clear everything
|
||||
cleanup_test_data()
|
||||
|
||||
# Should not crash even with empty data
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
# No assertions - just verifying no crash
|
||||
assert true
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
|
||||
test "reconciler handles nil values in caches", %{reconciler_running: reconciler_running?} do
|
||||
if reconciler_running? do
|
||||
map_id = "test_map_#{:rand.uniform(1_000_000)}"
|
||||
|
||||
# Explicitly set nil
|
||||
Cachex.put(@cache, map_id, nil)
|
||||
|
||||
# Should handle gracefully
|
||||
send(Reconciler, :reconcile)
|
||||
Process.sleep(200)
|
||||
|
||||
assert true
|
||||
else
|
||||
:ok
|
||||
end
|
||||
end
|
||||
end
|
||||
end
|
||||
84
test/unit/user/user_activity_tracker_test.exs
Normal file
84
test/unit/user/user_activity_tracker_test.exs
Normal file
@@ -0,0 +1,84 @@
|
||||
defmodule WandererApp.User.ActivityTrackerTest do
|
||||
use WandererApp.DataCase, async: false
|
||||
|
||||
alias WandererApp.User.ActivityTracker
|
||||
|
||||
describe "track_map_event/2" do
|
||||
test "returns {:ok, result} on success" do
|
||||
# This test verifies the happy path
|
||||
# In real scenarios, this would succeed when creating a new activity record
|
||||
assert {:ok, _} = ActivityTracker.track_map_event(:test_event, %{})
|
||||
end
|
||||
|
||||
test "returns {:ok, nil} on error without crashing" do
|
||||
# This simulates the scenario where tracking fails (e.g., unique constraint violation)
|
||||
# The function should handle the error gracefully and return {:ok, nil}
|
||||
|
||||
# Note: In actual implementation, this would catch errors from:
|
||||
# - Unique constraint violations
|
||||
# - Database connection issues
|
||||
# - Invalid data
|
||||
|
||||
# The key requirement is that it NEVER crashes the calling code
|
||||
result = ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: nil, # This will cause the function to skip tracking
|
||||
user_id: nil,
|
||||
map_id: nil
|
||||
})
|
||||
|
||||
# Should return success even when input is incomplete
|
||||
assert {:ok, _} = result
|
||||
end
|
||||
|
||||
test "handles errors gracefully and logs them" do
|
||||
# Verify that errors are logged for observability
|
||||
# This is important for monitoring and debugging
|
||||
|
||||
# The function should complete without raising even with incomplete data
|
||||
assert {:ok, _} = ActivityTracker.track_map_event(:map_connection_added, %{
|
||||
character_id: nil,
|
||||
user_id: nil,
|
||||
map_id: nil
|
||||
})
|
||||
end
|
||||
end
|
||||
|
||||
describe "track_acl_event/2" do
|
||||
test "returns {:ok, result} on success" do
|
||||
assert {:ok, _} = ActivityTracker.track_acl_event(:test_event, %{})
|
||||
end
|
||||
|
||||
test "returns {:ok, nil} on error without crashing" do
|
||||
result = ActivityTracker.track_acl_event(:map_acl_added, %{
|
||||
user_id: nil,
|
||||
acl_id: nil
|
||||
})
|
||||
|
||||
assert {:ok, _} = result
|
||||
end
|
||||
end
|
||||
|
||||
describe "error resilience" do
|
||||
test "always returns success tuple even on internal errors" do
|
||||
# The key guarantee is that activity tracking never crashes calling code
|
||||
# Even if the internal tracking fails (e.g., unique constraint violation),
|
||||
# the wrapper ensures a success tuple is returned
|
||||
|
||||
# This test verifies that the function signature guarantees {:ok, _}
|
||||
# regardless of internal errors
|
||||
|
||||
# Test with nil values (which will fail validation)
|
||||
assert {:ok, _} = ActivityTracker.track_map_event(:test_event, %{
|
||||
character_id: nil,
|
||||
user_id: nil,
|
||||
map_id: nil
|
||||
})
|
||||
|
||||
# Test with empty map (which will fail validation)
|
||||
assert {:ok, _} = ActivityTracker.track_map_event(:test_event, %{})
|
||||
|
||||
# The guarantee is: no matter what, it returns {:ok, _}
|
||||
# This prevents MatchError crashes in calling code
|
||||
end
|
||||
end
|
||||
end
|
||||
Reference in New Issue
Block a user