feat: use external services for kill data

This commit is contained in:
guarzo
2025-06-17 12:29:58 -04:00
parent a97cf25031
commit 41e77e8336
38 changed files with 2643 additions and 1776 deletions

View File

@@ -8,4 +8,5 @@ export GIT_SHA="1111"
export WANDERER_INVITES="false"
export WANDERER_PUBLIC_API_DISABLED="false"
export WANDERER_CHARACTER_API_DISABLED="false"
export WANDERER_ZKILL_PRELOAD_DISABLED="false"
export WANDERER_KILLS_SERVICE_ENABLED="true"
export WANDERER_KILLS_BASE_URL="ws://host.docker.internal:4004"

View File

@@ -17,67 +17,91 @@ import { TooltipPosition } from '@/hooks/Mapper/components/ui-kit';
import { WithClassName } from '@/hooks/Mapper/types/common.ts';
export type CompactKillRowProps = {
killDetails: DetailedKill;
killDetails?: DetailedKill | null;
systemName: string;
onlyOneSystem: boolean;
} & WithClassName;
export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, className }: CompactKillRowProps) => {
const {
killmail_id = 0,
killmail_id,
// Victim data
victim_char_name = 'Unknown Pilot',
victim_alliance_ticker = '',
victim_corp_ticker = '',
victim_ship_name = 'Unknown Ship',
victim_corp_name = '',
victim_alliance_name = '',
victim_char_id = 0,
victim_corp_id = 0,
victim_alliance_id = 0,
victim_ship_type_id = 0,
// Attacker data
final_blow_char_id = 0,
final_blow_char_name = '',
final_blow_alliance_ticker = '',
final_blow_alliance_name = '',
final_blow_alliance_id = 0,
final_blow_corp_ticker = '',
final_blow_corp_id = 0,
final_blow_corp_name = '',
final_blow_ship_type_id = 0,
kill_time = '',
total_value = 0,
} = killDetails || {};
const attackerIsNpc = final_blow_char_id === 0;
// Define victim affiliation ticker.
const victimAffiliationTicker = victim_alliance_ticker || victim_corp_ticker || 'No Ticker';
const killValueFormatted = total_value != null && total_value > 0 ? `${formatISK(total_value)} ISK` : null;
const killTimeAgo = kill_time ? formatTimeMixed(kill_time) : '0h ago';
const attackerSubscript = getAttackerSubscript(killDetails);
const { victimCorpLogoUrl, victimAllianceLogoUrl, victimShipUrl } = buildVictimImageUrls({
victim_char_name,
victim_alliance_ticker,
victim_corp_ticker,
victim_ship_name,
victim_corp_name,
victim_alliance_name,
victim_char_id,
victim_ship_type_id,
victim_corp_id,
victim_alliance_id,
victim_ship_type_id,
// Attacker data
final_blow_char_id,
final_blow_char_name,
final_blow_alliance_ticker,
final_blow_alliance_name,
final_blow_alliance_id,
final_blow_corp_ticker,
final_blow_corp_id,
final_blow_corp_name,
final_blow_ship_type_id,
kill_time,
total_value,
} = killDetails || {};
// Apply fallback values using nullish coalescing to handle both null and undefined
const safeKillmailId = killmail_id ?? 0;
const safeVictimCharName = victim_char_name ?? 'Unknown Pilot';
const safeVictimAllianceTicker = victim_alliance_ticker ?? '';
const safeVictimCorpTicker = victim_corp_ticker ?? '';
const safeVictimShipName = victim_ship_name ?? 'Unknown Ship';
const safeVictimCorpName = victim_corp_name ?? '';
const safeVictimAllianceName = victim_alliance_name ?? '';
const safeVictimCharId = victim_char_id ?? 0;
const safeVictimCorpId = victim_corp_id ?? 0;
const safeVictimAllianceId = victim_alliance_id ?? 0;
const safeVictimShipTypeId = victim_ship_type_id ?? 0;
const safeFinalBlowCharId = final_blow_char_id ?? 0;
const safeFinalBlowCharName = final_blow_char_name ?? '';
const safeFinalBlowAllianceTicker = final_blow_alliance_ticker ?? '';
const safeFinalBlowAllianceName = final_blow_alliance_name ?? '';
const safeFinalBlowAllianceId = final_blow_alliance_id ?? 0;
const safeFinalBlowCorpTicker = final_blow_corp_ticker ?? '';
const safeFinalBlowCorpId = final_blow_corp_id ?? 0;
const safeFinalBlowCorpName = final_blow_corp_name ?? '';
const safeFinalBlowShipTypeId = final_blow_ship_type_id ?? 0;
const safeKillTime = kill_time ?? '';
const safeTotalValue = total_value ?? 0;
const attackerIsNpc = safeFinalBlowCharId === 0;
// Define victim affiliation ticker.
const victimAffiliationTicker = safeVictimAllianceTicker || safeVictimCorpTicker || 'No Ticker';
const killValueFormatted = safeTotalValue != null && safeTotalValue > 0 ? `${formatISK(safeTotalValue)} ISK` : null;
const killTimeAgo = safeKillTime ? formatTimeMixed(safeKillTime) : '0h ago';
const attackerSubscript = killDetails ? getAttackerSubscript(killDetails) : undefined;
const { victimCorpLogoUrl, victimAllianceLogoUrl, victimShipUrl } = buildVictimImageUrls({
victim_char_id: safeVictimCharId,
victim_ship_type_id: safeVictimShipTypeId,
victim_corp_id: safeVictimCorpId,
victim_alliance_id: safeVictimAllianceId,
});
const { attackerCorpLogoUrl, attackerAllianceLogoUrl } = buildAttackerImageUrls({
final_blow_char_id,
final_blow_corp_id,
final_blow_alliance_id,
final_blow_char_id: safeFinalBlowCharId,
final_blow_corp_id: safeFinalBlowCorpId,
final_blow_alliance_id: safeFinalBlowAllianceId,
});
const { url: victimPrimaryLogoUrl, tooltip: victimPrimaryTooltip } = getPrimaryLogoAndTooltip(
victimAllianceLogoUrl,
victimCorpLogoUrl,
victim_alliance_name,
victim_corp_name,
safeVictimAllianceName,
safeVictimCorpName,
'Victim',
);
@@ -87,25 +111,25 @@ export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, classNam
attackerIsNpc,
attackerAllianceLogoUrl,
attackerCorpLogoUrl,
final_blow_alliance_name,
final_blow_corp_name,
final_blow_ship_type_id,
safeFinalBlowAllianceName,
safeFinalBlowCorpName,
safeFinalBlowShipTypeId,
),
[
attackerAllianceLogoUrl,
attackerCorpLogoUrl,
attackerIsNpc,
final_blow_alliance_name,
final_blow_corp_name,
final_blow_ship_type_id,
safeFinalBlowAllianceName,
safeFinalBlowCorpName,
safeFinalBlowShipTypeId,
],
);
// Define attackerTicker to use the alliance ticker if available, otherwise the corp ticker.
const attackerTicker = attackerIsNpc ? '' : final_blow_alliance_ticker || final_blow_corp_ticker || '';
const attackerTicker = attackerIsNpc ? '' : safeFinalBlowAllianceTicker || safeFinalBlowCorpTicker || '';
// For the attacker image link: if the attacker is not an NPC, link to the character page; otherwise, link to the kill page.
const attackerLink = attackerIsNpc ? zkillLink('kill', killmail_id) : zkillLink('character', final_blow_char_id);
const attackerLink = attackerIsNpc ? zkillLink('kill', safeKillmailId) : zkillLink('character', safeFinalBlowCharId);
return (
<div
@@ -121,7 +145,7 @@ export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, classNam
{victimShipUrl && (
<div className="relative shrink-0 w-8 h-8 overflow-hidden">
<a
href={zkillLink('kill', killmail_id)}
href={zkillLink('kill', safeKillmailId)}
target="_blank"
rel="noopener noreferrer"
className="block w-full h-full"
@@ -137,7 +161,7 @@ export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, classNam
{victimPrimaryLogoUrl && (
<WdTooltipWrapper content={victimPrimaryTooltip} position={TooltipPosition.top}>
<a
href={zkillLink('kill', killmail_id)}
href={zkillLink('kill', safeKillmailId)}
target="_blank"
rel="noopener noreferrer"
className="relative block shrink-0 w-8 h-8 overflow-hidden"
@@ -153,12 +177,12 @@ export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, classNam
</div>
<div className="flex flex-col ml-2 flex-1 min-w-0 overflow-hidden leading-[1rem]">
<div className="truncate text-stone-200">
{victim_char_name}
{safeVictimCharName}
<span className="text-stone-400"> / {victimAffiliationTicker}</span>
</div>
<div className="truncate text-stone-300 flex items-center gap-1">
<span className="text-stone-400 overflow-hidden text-ellipsis whitespace-nowrap max-w-[140px]">
{victim_ship_name}
{safeVictimShipName}
</span>
{killValueFormatted && (
<>
@@ -170,9 +194,9 @@ export const KillRowDetail = ({ killDetails, systemName, onlyOneSystem, classNam
</div>
<div className="flex items-center ml-auto gap-2">
<div className="flex flex-col items-end flex-1 min-w-0 overflow-hidden text-right leading-[1rem]">
{!attackerIsNpc && (final_blow_char_name || attackerTicker) && (
{!attackerIsNpc && (safeFinalBlowCharName || attackerTicker) && (
<div className="truncate text-stone-200">
{final_blow_char_name}
{safeFinalBlowCharName}
{!attackerIsNpc && attackerTicker && <span className="ml-1 text-stone-400">/ {attackerTicker}</span>}
</div>
)}

View File

@@ -33,7 +33,10 @@ export function formatISK(value: number): string {
return Math.round(value).toString();
}
export function getAttackerSubscript(kill: DetailedKill) {
export function getAttackerSubscript(kill: DetailedKill | undefined) {
if (!kill) {
return null;
}
if (kill.npc) {
return { label: 'npc', cssClass: 'text-purple-400' };
}

View File

@@ -27,7 +27,11 @@ config :wanderer_app,
generators: [timestamp_type: :utc_datetime],
ddrt: DDRT,
logger: Logger,
pubsub_client: Phoenix.PubSub
pubsub_client: Phoenix.PubSub,
wanderer_kills_base_url:
System.get_env("WANDERER_KILLS_BASE_URL", "ws://host.docker.internal:4004"),
wanderer_kills_service_enabled:
System.get_env("WANDERER_KILLS_SERVICE_ENABLED", "false") == "true"
config :wanderer_app, WandererAppWeb.Endpoint,
adapter: Bandit.PhoenixAdapter,

View File

@@ -84,3 +84,12 @@ config :swoosh, :api_client, false
config :logger, :console,
level: :info,
format: "$time $metadata[$level] $message\n"
# WandererKills service configuration (WebSocket-based)
config :wanderer_app,
# Enable WandererKills service integration
wanderer_kills_service_enabled: true,
# WebSocket connection URL for WandererKills service
wanderer_kills_base_url:
System.get_env("WANDERER_KILLS_BASE_URL", "ws://host.docker.internal:4004")

View File

@@ -53,6 +53,20 @@ public_api_disabled =
|> get_var_from_path_or_env("WANDERER_PUBLIC_API_DISABLED", "false")
|> String.to_existing_atom()
character_api_disabled =
config_dir
|> get_var_from_path_or_env("WANDERER_CHARACTER_API_DISABLED", "true")
|> String.to_existing_atom()
wanderer_kills_service_enabled =
config_dir
|> get_var_from_path_or_env("WANDERER_KILLS_SERVICE_ENABLED", "false")
|> String.to_existing_atom()
wanderer_kills_base_url =
config_dir
|> get_var_from_path_or_env("WANDERER_KILLS_BASE_URL", "ws://wanderer-kills:4004")
map_subscriptions_enabled =
config_dir
|> get_var_from_path_or_env("WANDERER_MAP_SUBSCRIPTIONS_ENABLED", "false")
@@ -122,10 +136,9 @@ config :wanderer_app,
character_tracking_pause_disabled:
System.get_env("WANDERER_CHARACTER_TRACKING_PAUSE_DISABLED", "true")
|> String.to_existing_atom(),
character_api_disabled:
System.get_env("WANDERER_CHARACTER_API_DISABLED", "true") |> String.to_existing_atom(),
zkill_preload_disabled:
System.get_env("WANDERER_ZKILL_PRELOAD_DISABLED", "false") |> String.to_existing_atom(),
character_api_disabled: character_api_disabled,
wanderer_kills_service_enabled: wanderer_kills_service_enabled,
wanderer_kills_base_url: wanderer_kills_base_url,
map_subscriptions_enabled: map_subscriptions_enabled,
map_connection_auto_expire_hours: map_connection_auto_expire_hours,
map_connection_auto_eol_hours: map_connection_auto_eol_hours,

View File

@@ -27,7 +27,7 @@ defmodule WandererApp.Application do
}
},
WandererApp.Cache,
Supervisor.child_spec({Cachex, name: :api_cache}, id: :api_cache_worker),
Supervisor.child_spec({Cachex, name: :api_cache, default_ttl: :timer.hours(1)}, id: :api_cache_worker),
Supervisor.child_spec({Cachex, name: :system_static_info_cache},
id: :system_static_info_cache_worker
),
@@ -56,7 +56,7 @@ defmodule WandererApp.Application do
WandererAppWeb.Endpoint
] ++
maybe_start_corp_wallet_tracker(WandererApp.Env.map_subscriptions_enabled?()) ++
maybe_start_zkb(WandererApp.Env.zkill_preload_disabled?())
maybe_start_kills_services()
opts = [strategy: :one_for_one, name: WandererApp.Supervisor]
@@ -77,11 +77,6 @@ defmodule WandererApp.Application do
:ok
end
defp maybe_start_zkb(false),
do: [WandererApp.Zkb.Supervisor, WandererApp.Map.ZkbDataFetcher]
defp maybe_start_zkb(_),
do: []
defp maybe_start_corp_wallet_tracker(true),
do: [
@@ -90,4 +85,20 @@ defmodule WandererApp.Application do
defp maybe_start_corp_wallet_tracker(_),
do: []
defp maybe_start_kills_services do
wanderer_kills_enabled =
Application.get_env(:wanderer_app, :wanderer_kills_service_enabled, false)
if wanderer_kills_enabled in [true, :true, "true"] do
Logger.info("Starting WandererKills service integration...")
[
WandererApp.Kills.Supervisor,
WandererApp.Map.ZkbDataFetcher
]
else
[]
end
end
end

View File

@@ -18,7 +18,7 @@ defmodule WandererApp.Env do
def public_api_disabled?, do: get_key(:public_api_disabled, false)
def character_tracking_pause_disabled?, do: get_key(:character_tracking_pause_disabled, true)
def character_api_disabled?, do: get_key(:character_api_disabled, false)
def zkill_preload_disabled?, do: get_key(:zkill_preload_disabled, false)
def wanderer_kills_service_enabled?, do: get_key(:wanderer_kills_service_enabled, false)
def wallet_tracking_enabled?, do: get_key(:wallet_tracking_enabled, false)
def admins, do: get_key(:admins, [])
def admin_username, do: get_key(:admin_username)
@@ -60,6 +60,7 @@ defmodule WandererApp.Env do
made available to react
"""
def to_client_env do
%{detailedKillsDisabled: zkill_preload_disabled?()}
%{detailedKillsDisabled: not wanderer_kills_service_enabled?()}
end
end

55
lib/wanderer_app/kills.ex Normal file
View File

@@ -0,0 +1,55 @@
defmodule WandererApp.Kills do
@moduledoc """
Main interface for the WandererKills integration subsystem.
Provides high-level functions for monitoring and managing the kills
data pipeline, including connection status, health metrics, and
system subscriptions.
"""
alias WandererApp.Kills.{Client, Storage}
@doc """
Gets comprehensive status of the kills subsystem.
"""
@spec get_status() :: {:ok, map()} | {:error, term()}
def get_status do
with {:ok, client_status} <- Client.get_status() do
{:ok, %{
enabled: Application.get_env(:wanderer_app, :wanderer_kills_service_enabled, false),
client: client_status,
websocket_url: Application.get_env(:wanderer_app, :wanderer_kills_base_url, "ws://wanderer-kills:4004")
}}
end
end
@doc """
Subscribes to killmail updates for specified systems.
"""
@spec subscribe_systems([integer()]) :: :ok | {:error, term()}
defdelegate subscribe_systems(system_ids), to: Client, as: :subscribe_to_systems
@doc """
Unsubscribes from killmail updates for specified systems.
"""
@spec unsubscribe_systems([integer()]) :: :ok | {:error, term()}
defdelegate unsubscribe_systems(system_ids), to: Client, as: :unsubscribe_from_systems
@doc """
Gets kill count for a specific system.
"""
@spec get_system_kill_count(integer()) :: {:ok, non_neg_integer()} | {:error, :not_found}
defdelegate get_system_kill_count(system_id), to: Storage, as: :get_kill_count
@doc """
Gets recent kills for a specific system.
"""
@spec get_system_kills(integer()) :: {:ok, list(map())} | {:error, :not_found}
defdelegate get_system_kills(system_id), to: Storage
@doc """
Manually triggers a reconnection attempt.
"""
@spec reconnect() :: :ok | {:error, term()}
defdelegate reconnect(), to: Client
end

View File

@@ -0,0 +1,457 @@
defmodule WandererApp.Kills.Client do
@moduledoc """
WebSocket client for WandererKills service.
Follows patterns established in the character and map modules.
"""
use GenServer
require Logger
alias WandererApp.Kills.{MessageHandler, Config}
alias WandererApp.Kills.Subscription.{Manager, MapIntegration}
alias Phoenix.Channels.GenSocketClient
# Simple retry configuration - inline like character module
@retry_delays [5_000, 10_000, 30_000, 60_000]
@max_retries 10
@health_check_interval :timer.minutes(2)
defstruct [
:socket_pid,
:retry_timer_ref,
connected: false,
connecting: false,
subscribed_systems: MapSet.new(),
retry_count: 0,
last_error: nil
]
# Client API
@spec start_link(keyword()) :: GenServer.on_start()
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@spec subscribe_to_systems([integer()]) :: :ok | {:error, atom()}
def subscribe_to_systems(system_ids) do
case validate_system_ids(system_ids) do
{:ok, valid_ids} ->
GenServer.cast(__MODULE__, {:subscribe_systems, valid_ids})
{:error, _} = error ->
Logger.error("[Client] Invalid system IDs: #{inspect(system_ids)}")
error
end
end
@spec unsubscribe_from_systems([integer()]) :: :ok | {:error, atom()}
def unsubscribe_from_systems(system_ids) do
case validate_system_ids(system_ids) do
{:ok, valid_ids} ->
GenServer.cast(__MODULE__, {:unsubscribe_systems, valid_ids})
{:error, _} = error ->
Logger.error("[Client] Invalid system IDs: #{inspect(system_ids)}")
error
end
end
@spec get_status() :: {:ok, map()} | {:error, term()}
def get_status do
GenServer.call(__MODULE__, :get_status)
catch
:exit, _ -> {:error, :not_running}
end
@spec reconnect() :: :ok | {:error, term()}
def reconnect do
GenServer.call(__MODULE__, :reconnect)
catch
:exit, _ -> {:error, :not_running}
end
# Server callbacks
@impl true
def init(_opts) do
if Config.enabled?() do
Logger.info("[Client] Starting kills WebSocket client")
send(self(), :connect)
schedule_health_check()
{:ok, %__MODULE__{}}
else
Logger.info("[Client] Kills integration disabled")
:ignore
end
end
@impl true
def handle_info(:connect, state) do
state = cancel_retry(state)
new_state = attempt_connection(%{state | connecting: true})
{:noreply, new_state}
end
def handle_info(:retry_connection, state) do
Logger.info("[Client] Retrying connection (attempt #{state.retry_count + 1}/#{@max_retries})")
state = %{state | retry_timer_ref: nil}
new_state = attempt_connection(state)
{:noreply, new_state}
end
def handle_info(:refresh_subscriptions, %{connected: true} = state) do
Logger.info("[Client] Refreshing subscriptions after connection")
case MapIntegration.get_all_map_systems() do
systems when is_struct(systems, MapSet) ->
system_list = MapSet.to_list(systems)
if system_list != [] do
subscribe_to_systems(system_list)
end
_ ->
Logger.error("[Client] Failed to refresh subscriptions, scheduling retry")
Process.send_after(self(), :refresh_subscriptions, 5000)
end
{:noreply, state}
end
def handle_info(:refresh_subscriptions, state) do
# Not connected yet, retry later
Process.send_after(self(), :refresh_subscriptions, 5000)
{:noreply, state}
end
def handle_info({:connected, socket_pid}, state) do
Logger.info("[Client] WebSocket connected")
new_state =
%{
state
| connected: true,
connecting: false,
socket_pid: socket_pid,
retry_count: 0,
last_error: nil
}
|> cancel_retry()
{:noreply, new_state}
end
def handle_info({:disconnected, reason}, state) do
Logger.warning("[Client] WebSocket disconnected: #{inspect(reason)}")
state = %{state | connected: false, connecting: false, socket_pid: nil, last_error: reason}
if should_retry?(state) do
{:noreply, schedule_retry(state)}
else
Logger.error("[Client] Max retry attempts reached. Giving up.")
{:noreply, state}
end
end
def handle_info(:health_check, state) do
# Simple health check like character module
case check_health(state) do
:healthy ->
Logger.debug("[Client] Connection healthy")
:needs_reconnect ->
Logger.warning("[Client] Connection unhealthy, triggering reconnect")
send(self(), :connect)
end
schedule_health_check()
{:noreply, state}
end
def handle_info(_msg, state), do: {:noreply, state}
@impl true
def handle_cast({:subscribe_systems, system_ids}, state) do
{updated_systems, to_subscribe} =
Manager.subscribe_systems(state.subscribed_systems, system_ids)
if length(to_subscribe) > 0 and state.socket_pid do
Manager.sync_with_server(state.socket_pid, to_subscribe, [])
end
{:noreply, %{state | subscribed_systems: updated_systems}}
end
def handle_cast({:unsubscribe_systems, system_ids}, state) do
{updated_systems, to_unsubscribe} =
Manager.unsubscribe_systems(state.subscribed_systems, system_ids)
if length(to_unsubscribe) > 0 and state.socket_pid do
Manager.sync_with_server(state.socket_pid, [], to_unsubscribe)
end
{:noreply, %{state | subscribed_systems: updated_systems}}
end
@impl true
def handle_call(:get_status, _from, state) do
status = %{
connected: state.connected,
connecting: state.connecting,
retry_count: state.retry_count,
last_error: state.last_error,
subscribed_systems: MapSet.size(state.subscribed_systems),
socket_alive: socket_alive?(state.socket_pid),
subscriptions: %{
subscribed_systems: MapSet.to_list(state.subscribed_systems)
}
}
{:reply, {:ok, status}, state}
end
def handle_call(:reconnect, _from, state) do
Logger.info("[Client] Manual reconnection requested")
state = cancel_retry(state)
disconnect_socket(state.socket_pid)
new_state = %{
state
| connected: false,
connecting: false,
socket_pid: nil,
retry_count: 0,
last_error: nil
}
send(self(), :connect)
{:reply, :ok, new_state}
end
# Private functions
defp attempt_connection(state) do
case connect_to_server() do
{:ok, socket_pid} ->
%{state | socket_pid: socket_pid, connecting: true}
{:error, reason} ->
Logger.error("[Client] Connection failed: #{inspect(reason)}")
schedule_retry(%{state | connecting: false, last_error: reason})
end
end
defp connect_to_server do
url = Config.server_url()
Logger.info("[Client] Connecting to: #{url}")
# Get systems for initial subscription
systems =
case MapIntegration.get_all_map_systems() do
systems when is_struct(systems, MapSet) ->
MapSet.to_list(systems)
_ ->
Logger.warning(
"[Client] Failed to get map systems for initial subscription, will retry after connection"
)
# Return empty list but schedule immediate refresh after connection
Process.send_after(self(), :refresh_subscriptions, 1000)
[]
end
handler_state = %{
server_url: url,
parent: self(),
subscribed_systems: systems
}
case GenSocketClient.start_link(
__MODULE__.Handler,
Phoenix.Channels.GenSocketClient.Transport.WebSocketClient,
handler_state
) do
{:ok, socket_pid} -> {:ok, socket_pid}
error -> error
end
end
defp should_retry?(%{retry_count: count}) when count >= @max_retries, do: false
defp should_retry?(_), do: true
defp schedule_retry(state) do
delay = Enum.at(@retry_delays, min(state.retry_count, length(@retry_delays) - 1))
Logger.info("[Client] Scheduling retry in #{delay}ms")
timer_ref = Process.send_after(self(), :retry_connection, delay)
%{state | retry_timer_ref: timer_ref, retry_count: state.retry_count + 1}
end
defp cancel_retry(%{retry_timer_ref: nil} = state), do: state
defp cancel_retry(%{retry_timer_ref: timer_ref} = state) do
Process.cancel_timer(timer_ref)
%{state | retry_timer_ref: nil}
end
defp check_health(%{connected: false}), do: :needs_reconnect
defp check_health(%{socket_pid: nil}), do: :needs_reconnect
defp check_health(%{socket_pid: pid}) do
if socket_alive?(pid), do: :healthy, else: :needs_reconnect
end
defp socket_alive?(nil), do: false
defp socket_alive?(pid), do: Process.alive?(pid)
defp disconnect_socket(nil), do: :ok
defp disconnect_socket(pid) when is_pid(pid) do
if Process.alive?(pid) do
GenServer.stop(pid, :normal)
end
end
defp schedule_health_check do
Process.send_after(self(), :health_check, @health_check_interval)
end
# Handler module for WebSocket events
defmodule Handler do
@moduledoc """
WebSocket handler for the kills client.
Handles Phoenix Channel callbacks for WebSocket communication.
"""
@behaviour Phoenix.Channels.GenSocketClient
require Logger
alias WandererApp.Kills.MessageHandler
@impl true
def init(state) do
ws_url = "#{state.server_url}/socket/websocket"
{:connect, ws_url, [{"vsn", "2.0.0"}], state}
end
@impl true
def handle_connected(transport, state) do
Logger.debug("[Client] Connected, joining channel...")
case Phoenix.Channels.GenSocketClient.join(transport, "killmails:lobby", %{
systems: state.subscribed_systems,
client_identifier: "wanderer_app"
}) do
{:ok, _response} ->
Logger.debug("[Client] Successfully joined killmails:lobby")
send(state.parent, {:connected, self()})
{:ok, state}
{:error, reason} ->
Logger.error("[Client] Failed to join channel: #{inspect(reason)}")
send(state.parent, {:disconnected, {:join_error, reason}})
{:ok, state}
end
end
@impl true
def handle_disconnected(reason, state) do
send(state.parent, {:disconnected, reason})
{:ok, state}
end
@impl true
def handle_channel_closed(topic, _payload, _transport, state) do
Logger.warning("[Client] Channel #{topic} closed")
send(state.parent, {:disconnected, {:channel_closed, topic}})
{:ok, state}
end
@impl true
def handle_message(topic, event, payload, _transport, state) do
case {topic, event} do
{"killmails:lobby", "killmail_update"} ->
# Use supervised task to handle failures gracefully
Task.Supervisor.start_child(
WandererApp.Kills.TaskSupervisor,
fn -> MessageHandler.process_killmail_update(payload) end
)
{"killmails:lobby", "kill_count_update"} ->
# Use supervised task to handle failures gracefully
Task.Supervisor.start_child(
WandererApp.Kills.TaskSupervisor,
fn -> MessageHandler.process_kill_count_update(payload) end
)
_ ->
:ok
end
{:ok, state}
end
@impl true
def handle_reply(_topic, _ref, _payload, _transport, state), do: {:ok, state}
@impl true
def handle_info(_msg, _transport, state), do: {:ok, state}
@impl true
def handle_call(_msg, _from, _transport, state),
do: {:reply, {:error, :not_implemented}, state}
@impl true
def handle_joined(_topic, _payload, _transport, state), do: {:ok, state}
@impl true
def handle_join_error(topic, payload, _transport, state) do
send(state.parent, {:disconnected, {:join_error, {topic, payload}}})
{:ok, state}
end
end
# Validation functions (inlined from Validation module)
@spec validate_system_id(any()) :: {:ok, integer()} | {:error, :invalid_system_id}
defp validate_system_id(system_id)
when is_integer(system_id) and system_id > 30_000_000 and system_id < 33_000_000 do
{:ok, system_id}
end
defp validate_system_id(system_id) when is_binary(system_id) do
case Integer.parse(system_id) do
{id, ""} when id > 30_000_000 and id < 33_000_000 ->
{:ok, id}
_ ->
{:error, :invalid_system_id}
end
end
defp validate_system_id(_), do: {:error, :invalid_system_id}
@spec validate_system_ids(list()) :: {:ok, [integer()]} | {:error, :invalid_system_ids}
defp validate_system_ids(system_ids) when is_list(system_ids) do
results = Enum.map(system_ids, &validate_system_id/1)
case Enum.all?(results, &match?({:ok, _}, &1)) do
true ->
valid_ids = Enum.map(results, fn {:ok, id} -> id end)
{:ok, valid_ids}
false ->
{:error, :invalid_system_ids}
end
end
defp validate_system_ids(_), do: {:error, :invalid_system_ids}
end

View File

@@ -0,0 +1,62 @@
defmodule WandererApp.Kills.Config do
@moduledoc """
Simple configuration helpers for the kills subsystem.
Following the pattern of other modules that use Application.get_env directly.
"""
def enabled? do
Application.get_env(:wanderer_app, :wanderer_kills_service_enabled, false)
end
def websocket_url do
Application.get_env(:wanderer_app, :wanderer_kills_base_url, "ws://wanderer-kills:4004")
end
def server_url do
# Remove /socket/websocket suffix if present for backward compatibility
websocket_url()
|> String.replace(~r/\/socket\/websocket$/, "")
end
def kill_list_limit do
Application.get_env(:wanderer_app, :kill_list_limit, 100)
|> to_integer()
end
def max_concurrent_tasks do
:wanderer_app
|> Application.get_env(:kills_max_concurrent_tasks, 50)
|> ensure_integer()
end
def max_task_queue_size do
:wanderer_app
|> Application.get_env(:kills_max_task_queue_size, 5000)
|> ensure_integer()
end
def killmail_ttl do
:timer.hours(24)
end
def kill_count_ttl do
:timer.hours(24)
end
# Simple conversion helper
defp to_integer(value) when is_binary(value), do: String.to_integer(value)
defp to_integer(value) when is_integer(value), do: value
defp to_integer(_), do: 100
defp ensure_integer(value) when is_integer(value), do: value
defp ensure_integer(value) when is_binary(value) do
case Integer.parse(value) do
{int, ""} -> int
# Default fallback
_ -> 50
end
end
defp ensure_integer(_), do: 50
end

View File

@@ -0,0 +1,191 @@
defmodule WandererApp.Kills.MapEventListener do
@moduledoc """
Listens for map events and updates kill subscriptions accordingly.
This module bridges the gap between map system changes and the kills
WebSocket subscription system.
"""
use GenServer
require Logger
alias WandererApp.Kills.Client
alias WandererApp.Kills.Subscription.MapIntegration
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
# Subscribe to map lifecycle events
Phoenix.PubSub.subscribe(WandererApp.PubSub, "maps")
# Subscribe to existing running maps
running_maps = WandererApp.Map.RegistryHelper.list_all_maps()
running_maps
|> Enum.each(fn %{id: map_id} ->
try do
Phoenix.PubSub.subscribe(WandererApp.PubSub, map_id)
rescue
e ->
Logger.error("[MapEventListener] Failed to subscribe to map #{map_id}: #{inspect(e)}")
end
end)
# Defer subscription update to avoid blocking init
send(self(), :initial_subscription_update)
# Also schedule a re-subscription after a delay in case maps start after us
Process.send_after(self(), :resubscribe_to_maps, 5000)
{:ok, %{last_update: nil, pending_update: nil, pending_removals: MapSet.new()}}
end
@impl true
def handle_info(:initial_subscription_update, state) do
{:noreply, do_update_subscriptions(state)}
end
@impl true
def handle_info(%{event: :map_server_started}, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info(:map_server_started, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info(%{event: :add_system, payload: _system}, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info({:add_system, _system}, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info(%{event: :systems_removed, payload: system_ids}, state) do
# Track pending removals so we can handle them immediately
new_pending_removals = MapSet.union(state.pending_removals, MapSet.new(system_ids))
{:noreply, schedule_subscription_update(%{state | pending_removals: new_pending_removals})}
end
def handle_info({:systems_removed, system_ids}, state) do
# Track pending removals so we can handle them immediately
new_pending_removals = MapSet.union(state.pending_removals, MapSet.new(system_ids))
{:noreply, schedule_subscription_update(%{state | pending_removals: new_pending_removals})}
end
def handle_info(%{event: :update_system, payload: _system}, state) do
# System updates might change visibility or other properties
{:noreply, schedule_subscription_update(state)}
end
def handle_info({:update_system, _system}, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info(%{event: :map_server_stopped}, state) do
{:noreply, schedule_subscription_update(state)}
end
def handle_info(:map_server_stopped, state) do
{:noreply, schedule_subscription_update(state)}
end
# Handle scheduled update
def handle_info(:perform_subscription_update, state) do
# Clear pending removals after processing
new_state = do_update_subscriptions(%{state | pending_update: nil})
{:noreply, %{new_state | pending_removals: MapSet.new()}}
end
# Handle re-subscription attempt
def handle_info(:resubscribe_to_maps, state) do
running_maps = WandererApp.Map.RegistryHelper.list_all_maps()
running_maps
|> Enum.each(fn %{id: map_id} ->
Phoenix.PubSub.subscribe(WandererApp.PubSub, map_id)
end)
{:noreply, state}
end
# Handle map creation - subscribe to new map
def handle_info({:map_created, map_id}, state) do
Phoenix.PubSub.subscribe(WandererApp.PubSub, map_id)
{:noreply, schedule_subscription_update(state)}
end
def handle_info(_msg, state) do
{:noreply, state}
end
# Debounce delay in milliseconds
@debounce_delay 500
defp schedule_subscription_update(state) do
# Cancel pending update if exists
if state.pending_update do
Process.cancel_timer(state.pending_update)
end
# Schedule new update
timer_ref = Process.send_after(self(), :perform_subscription_update, @debounce_delay)
%{state | pending_update: timer_ref}
end
defp do_update_subscriptions(state) do
Task.start(fn ->
try do
perform_subscription_update(state.pending_removals)
# Also refresh the system->map index
WandererApp.Kills.Subscription.SystemMapIndex.refresh()
rescue
e ->
Logger.error("[MapEventListener] Error updating subscriptions: #{inspect(e)}")
end
end)
%{state | last_update: System.monotonic_time(:millisecond)}
end
defp perform_subscription_update(pending_removals) do
case Client.get_status() do
{:ok, %{subscriptions: %{subscribed_systems: current_systems}}} ->
apply_subscription_changes(current_systems, pending_removals)
{:error, :not_running} ->
Logger.debug("[MapEventListener] Kills client not running yet")
error ->
Logger.error("[MapEventListener] Failed to get client status: #{inspect(error)}")
end
end
defp apply_subscription_changes(current_systems, pending_removals) do
current_set = MapSet.new(current_systems)
all_systems = MapIntegration.get_all_map_systems()
# Remove pending removals from all_systems since DB might not be updated yet
all_systems_adjusted = MapSet.difference(all_systems, pending_removals)
# Use the existing MapIntegration logic to determine changes
{:ok, to_subscribe, to_unsubscribe} =
MapIntegration.handle_map_systems_updated(
MapSet.to_list(all_systems_adjusted),
current_set
)
# Apply the changes
if to_subscribe != [] do
Client.subscribe_to_systems(to_subscribe)
end
if to_unsubscribe != [] do
Client.unsubscribe_from_systems(to_unsubscribe)
end
end
end

View File

@@ -0,0 +1,549 @@
defmodule WandererApp.Kills.MessageHandler do
@moduledoc """
Handles killmail message processing and broadcasting.
"""
require Logger
alias WandererApp.Kills.{Config, Storage}
alias WandererApp.Kills.Subscription.MapIntegration
@spec process_killmail_update(map()) :: :ok
def process_killmail_update(payload) do
case validate_killmail_payload(payload) do
{:ok, %{"system_id" => system_id, "killmails" => killmails}} ->
# Log each kill received
log_received_killmails(killmails, system_id)
process_valid_killmail_update(system_id, killmails, payload)
{:error, reason} ->
Logger.error("[MessageHandler] Invalid killmail payload: #{inspect(reason)}")
:ok
end
end
defp process_valid_killmail_update(system_id, killmails, payload) do
{valid_killmails, failed_adaptations} =
killmails
|> Enum.filter(&is_map/1)
|> Enum.with_index()
|> Enum.reduce({[], []}, &process_killmail_for_adaptation/2)
# Reverse to maintain original order
valid_killmails = Enum.reverse(valid_killmails)
failed_adaptations = Enum.reverse(failed_adaptations)
# Store failed adaptations for potential retry
if failed_adaptations != [] do
store_failed_adaptations(system_id, failed_adaptations)
end
Logger.debug(fn ->
"[MessageHandler] Valid killmails after adaptation: #{length(valid_killmails)}"
end)
if valid_killmails != [] do
store_and_broadcast_killmails(system_id, valid_killmails, payload)
else
:ok
end
end
defp store_and_broadcast_killmails(system_id, valid_killmails, payload) do
killmail_ttl = Config.killmail_ttl()
kill_count_ttl = Config.kill_count_ttl()
case Storage.store_killmails(system_id, valid_killmails, killmail_ttl) do
:ok ->
handle_stored_killmails(system_id, valid_killmails, kill_count_ttl, payload)
error ->
Logger.error(
"[MessageHandler] Failed to store killmails for system #{system_id}: #{inspect(error)}"
)
error
end
end
defp handle_stored_killmails(system_id, valid_killmails, kill_count_ttl, payload) do
case Storage.update_kill_count(system_id, length(valid_killmails), kill_count_ttl) do
:ok ->
broadcast_killmails(system_id, valid_killmails, payload)
:ok
error ->
Logger.error(
"[MessageHandler] Failed to update kill count for system #{system_id}: #{inspect(error)}"
)
error
end
end
@spec process_kill_count_update(map()) :: :ok | {:error, atom()} | {:error, term()}
def process_kill_count_update(payload) do
case validate_kill_count_payload(payload) do
{:ok, %{"system_id" => system_id, "count" => count}} ->
case Storage.store_kill_count(system_id, count) do
:ok ->
broadcast_kill_count(system_id, payload)
:ok
error ->
Logger.error(
"[MessageHandler] Failed to store kill count for system #{system_id}: #{inspect(error)}"
)
error
end
{:error, reason} ->
Logger.warning(
"[MessageHandler] Invalid kill count payload: #{inspect(reason)}, payload: #{inspect(payload)}"
)
{:error, :invalid_payload}
end
end
defp broadcast_kill_count(system_id, payload) do
case MapIntegration.broadcast_kill_to_maps(%{
"solar_system_id" => system_id,
"count" => payload["count"],
"type" => :kill_count
}) do
:ok ->
:ok
{:error, reason} ->
Logger.warning("[MessageHandler] Failed to broadcast kill count: #{inspect(reason)}")
:ok
end
end
defp broadcast_killmails(system_id, killmails, payload) do
case MapIntegration.broadcast_kill_to_maps(%{
"solar_system_id" => system_id,
"killmails" => killmails,
"timestamp" => payload["timestamp"],
"type" => :killmail_update
}) do
:ok ->
:ok
{:error, reason} ->
Logger.warning("[MessageHandler] Failed to broadcast killmails: #{inspect(reason)}")
:ok
end
end
defp store_failed_adaptations(system_id, failed_kills) do
# Store with a special key for retry processing
key = "kills:failed_adaptations:#{system_id}"
# Keep for 1 hour for potential retry
ttl = :timer.hours(1)
case WandererApp.Cache.insert_or_update(
key,
failed_kills,
fn existing ->
# Merge with existing failed kills, keeping newest
(failed_kills ++ existing)
|> Enum.uniq_by(& &1["killmail_id"])
# Limit to prevent unbounded growth
|> Enum.take(100)
end,
ttl: ttl
) do
:ok ->
Logger.debug(
"[MessageHandler] Stored #{length(failed_kills)} failed adaptations for system #{system_id}"
)
{:ok, _} ->
Logger.debug(
"[MessageHandler] Stored #{length(failed_kills)} failed adaptations for system #{system_id}"
)
error ->
Logger.error("[MessageHandler] Failed to store failed adaptations: #{inspect(error)}")
end
end
# Data adaptation functions (moved from DataAdapter module)
@type killmail :: map()
@type adapter_result :: {:ok, killmail()} | {:error, term()}
@spec adapt_kill_data(any()) :: adapter_result()
# Pattern match on zkillboard format - not supported
defp adapt_kill_data(%{"killID" => kill_id}) do
Logger.warning("[MessageHandler] Zkillboard format not supported: killID=#{kill_id}")
{:error, :zkillboard_format_not_supported}
end
# Pattern match on flat format - already adapted
defp adapt_kill_data(%{"victim_char_id" => _} = kill) do
validated_kill = validate_flat_format_kill(kill)
if map_size(validated_kill) > 0 do
{:ok, validated_kill}
else
Logger.warning(
"[MessageHandler] Invalid flat format kill: #{inspect(kill["killmail_id"])}"
)
{:error, :invalid_data}
end
end
# Pattern match on nested format with valid structure
defp adapt_kill_data(
%{
"killmail_id" => killmail_id,
"kill_time" => _kill_time,
"victim" => victim
} = kill
)
when is_map(victim) do
# Validate and normalize IDs first
with {:ok, valid_killmail_id} <- validate_killmail_id(killmail_id),
{:ok, valid_system_id} <- get_and_validate_system_id(kill) do
# Update kill with normalized IDs
normalized_kill =
kill
|> Map.put("killmail_id", valid_killmail_id)
|> Map.put("solar_system_id", valid_system_id)
# Remove alternate key
|> Map.delete("system_id")
adapted_kill = adapt_nested_format_kill(normalized_kill)
if map_size(adapted_kill) > 0 do
{:ok, adapted_kill}
else
Logger.warning("[MessageHandler] Invalid nested format kill: #{valid_killmail_id}")
{:error, :invalid_data}
end
else
{:error, reason} ->
Logger.warning("[MessageHandler] ID validation failed: #{inspect(reason)}")
{:error, reason}
end
end
# Invalid data type
defp adapt_kill_data(invalid_data) do
data_type = if(is_nil(invalid_data), do: "nil", else: "#{inspect(invalid_data)}")
Logger.warning("[MessageHandler] Invalid data type: #{data_type}")
{:error, :invalid_format}
end
# Validation and adaptation helper functions
@spec validate_flat_format_kill(map()) :: map()
defp validate_flat_format_kill(kill) do
required_fields = ["killmail_id", "kill_time", "solar_system_id"]
case validate_required_fields(kill, required_fields) do
:ok ->
kill
{:error, missing} ->
Logger.warning(
"[MessageHandler] Flat format kill missing required fields: #{inspect(missing)}"
)
%{}
end
end
@spec adapt_nested_format_kill(map()) :: map()
defp adapt_nested_format_kill(kill) do
victim = kill["victim"]
attackers = Map.get(kill, "attackers", [])
zkb = Map.get(kill, "zkb", %{})
# Validate attackers is a list
attackers_list = if is_list(attackers), do: attackers, else: []
final_blow_attacker = find_final_blow_attacker(attackers_list)
adapted_kill =
%{}
|> add_core_kill_data(kill, zkb)
|> add_victim_data(victim)
|> add_final_blow_attacker_data(final_blow_attacker)
|> add_kill_statistics(attackers_list, zkb)
# Validate that critical output fields are present
case validate_required_output_fields(adapted_kill) do
:ok ->
adapted_kill
{:error, missing_fields} ->
Logger.warning(
"[MessageHandler] Kill adaptation failed - missing required fields: #{inspect(missing_fields)}, killmail_id: #{inspect(kill["killmail_id"])}"
)
%{}
end
end
@spec add_core_kill_data(map(), map(), map()) :: map()
defp add_core_kill_data(acc, kill, zkb) do
# Handle both "solar_system_id" and "system_id"
solar_system_id = kill["solar_system_id"] || kill["system_id"]
Map.merge(acc, %{
"killmail_id" => kill["killmail_id"],
"kill_time" => kill["kill_time"],
"solar_system_id" => solar_system_id,
"zkb" => zkb
})
end
@spec add_victim_data(map(), map()) :: map()
defp add_victim_data(acc, victim) do
victim_data = %{
"victim_char_id" => victim["character_id"],
"victim_char_name" => get_character_name(victim),
"victim_corp_id" => victim["corporation_id"],
"victim_corp_ticker" => get_corp_ticker(victim),
"victim_corp_name" => get_corp_name(victim),
"victim_alliance_id" => victim["alliance_id"],
"victim_alliance_ticker" => get_alliance_ticker(victim),
"victim_alliance_name" => get_alliance_name(victim),
"victim_ship_type_id" => victim["ship_type_id"],
"victim_ship_name" => get_ship_name(victim)
}
Map.merge(acc, victim_data)
end
@spec add_final_blow_attacker_data(map(), map()) :: map()
defp add_final_blow_attacker_data(acc, attacker) do
attacker_data = %{
"final_blow_char_id" => attacker["character_id"],
"final_blow_char_name" => get_character_name(attacker),
"final_blow_corp_id" => attacker["corporation_id"],
"final_blow_corp_ticker" => get_corp_ticker(attacker),
"final_blow_corp_name" => get_corp_name(attacker),
"final_blow_alliance_id" => attacker["alliance_id"],
"final_blow_alliance_ticker" => get_alliance_ticker(attacker),
"final_blow_alliance_name" => get_alliance_name(attacker),
"final_blow_ship_type_id" => attacker["ship_type_id"],
"final_blow_ship_name" => get_ship_name(attacker)
}
Map.merge(acc, attacker_data)
end
@spec add_kill_statistics(map(), list(), map()) :: map()
defp add_kill_statistics(acc, attackers_list, zkb) do
Map.merge(acc, %{
"attacker_count" => length(attackers_list),
"total_value" => zkb["total_value"] || zkb["totalValue"] || 0,
"npc" => zkb["npc"] || false
})
end
# Critical fields that the frontend expects to be present in killmail data
@required_output_fields [
"killmail_id",
"kill_time",
"solar_system_id",
"victim_ship_type_id",
"attacker_count",
"total_value"
]
@spec validate_required_output_fields(map()) :: :ok | {:error, list(String.t())}
defp validate_required_output_fields(adapted_kill) do
validate_required_fields(adapted_kill, @required_output_fields)
end
@spec validate_required_fields(map(), list(String.t())) :: :ok | {:error, list(String.t())}
defp validate_required_fields(data, fields) do
missing = Enum.filter(fields, &(not Map.has_key?(data, &1)))
case missing do
[] -> :ok
_ -> {:error, missing}
end
end
@spec find_final_blow_attacker(list(map()) | any()) :: map()
defp find_final_blow_attacker(attackers) when is_list(attackers) do
final_blow =
Enum.find(attackers, %{}, fn
%{"final_blow" => true} = attacker -> attacker
_ -> false
end)
if final_blow == %{} and length(attackers) > 0 do
Logger.debug(fn ->
"[MessageHandler] No final blow attacker found in #{length(attackers)} attackers"
end)
end
final_blow
end
defp find_final_blow_attacker(_), do: %{}
# Generic field extraction with multiple possible field names
@spec extract_field(map(), list(String.t())) :: String.t() | nil
defp extract_field(data, field_names) when is_map(data) and is_list(field_names) do
Enum.find_value(field_names, fn field_name ->
case Map.get(data, field_name) do
value when is_binary(value) and value != "" -> value
_ -> nil
end
end)
end
defp extract_field(_data, _field_names), do: nil
# Specific field extractors using the generic function
@spec get_character_name(map() | any()) :: String.t() | nil
defp get_character_name(data) when is_map(data) do
# Try multiple possible field names
field_names = ["attacker_name", "victim_name", "character_name", "name"]
extract_field(data, field_names) ||
case Map.get(data, "character") do
%{"name" => name} when is_binary(name) -> name
_ -> nil
end
end
defp get_character_name(_), do: nil
@spec get_corp_ticker(map() | any()) :: String.t() | nil
defp get_corp_ticker(data) when is_map(data) do
extract_field(data, ["corporation_ticker", "corp_ticker"])
end
defp get_corp_ticker(_), do: nil
@spec get_corp_name(map() | any()) :: String.t() | nil
defp get_corp_name(data) when is_map(data) do
extract_field(data, ["corporation_name", "corp_name"])
end
defp get_corp_name(_), do: nil
@spec get_alliance_ticker(map() | any()) :: String.t() | nil
defp get_alliance_ticker(data) when is_map(data) do
extract_field(data, ["alliance_ticker"])
end
defp get_alliance_ticker(_), do: nil
@spec get_alliance_name(map() | any()) :: String.t() | nil
defp get_alliance_name(data) when is_map(data) do
extract_field(data, ["alliance_name"])
end
defp get_alliance_name(_), do: nil
@spec get_ship_name(map() | any()) :: String.t() | nil
defp get_ship_name(data) when is_map(data) do
extract_field(data, ["ship_name", "ship_type_name"])
end
defp get_ship_name(_), do: nil
defp get_and_validate_system_id(kill) do
system_id = kill["solar_system_id"] || kill["system_id"]
validate_system_id(system_id)
end
# Validation functions (inlined from Validation module)
@spec validate_system_id(any()) :: {:ok, integer()} | {:error, :invalid_system_id}
defp validate_system_id(system_id)
when is_integer(system_id) and system_id > 30_000_000 and system_id < 33_000_000 do
{:ok, system_id}
end
defp validate_system_id(system_id) when is_binary(system_id) do
case Integer.parse(system_id) do
{id, ""} when id > 30_000_000 and id < 33_000_000 ->
{:ok, id}
_ ->
{:error, :invalid_system_id}
end
end
defp validate_system_id(_), do: {:error, :invalid_system_id}
@spec validate_killmail_id(any()) :: {:ok, integer()} | {:error, :invalid_killmail_id}
defp validate_killmail_id(killmail_id) when is_integer(killmail_id) and killmail_id > 0 do
{:ok, killmail_id}
end
defp validate_killmail_id(killmail_id) when is_binary(killmail_id) do
case Integer.parse(killmail_id) do
{id, ""} when id > 0 ->
{:ok, id}
_ ->
{:error, :invalid_killmail_id}
end
end
defp validate_killmail_id(_), do: {:error, :invalid_killmail_id}
@spec validate_killmail_payload(map()) :: {:ok, map()} | {:error, atom()}
defp validate_killmail_payload(%{"system_id" => system_id, "killmails" => killmails} = payload)
when is_list(killmails) do
with {:ok, valid_system_id} <- validate_system_id(system_id) do
{:ok, %{payload | "system_id" => valid_system_id}}
end
end
defp validate_killmail_payload(_), do: {:error, :invalid_payload}
@spec validate_kill_count_payload(map()) :: {:ok, map()} | {:error, atom()}
defp validate_kill_count_payload(%{"system_id" => system_id, "count" => count} = payload)
when is_integer(count) and count >= 0 do
with {:ok, valid_system_id} <- validate_system_id(system_id) do
{:ok, %{payload | "system_id" => valid_system_id}}
end
end
defp validate_kill_count_payload(_), do: {:error, :invalid_kill_count_payload}
# Helper functions to reduce nesting
defp log_received_killmails(killmails, system_id) do
Enum.each(killmails, fn kill ->
killmail_id = kill["killmail_id"] || "unknown"
kill_system_id = kill["solar_system_id"] || kill["system_id"] || system_id
Logger.debug(fn ->
"[MessageHandler] Received kill: killmail_id=#{killmail_id}, system_id=#{kill_system_id}"
end)
end)
end
defp process_killmail_for_adaptation({kill, index}, {valid, failed}) do
# Log raw kill data
Logger.debug(fn ->
"[MessageHandler] Raw kill ##{index}: #{inspect(kill, pretty: true, limit: :infinity)}"
end)
# Adapt and log result
case adapt_kill_data(kill) do
{:ok, adapted} ->
Logger.debug(fn ->
"[MessageHandler] Adapted kill ##{index}: #{inspect(adapted, pretty: true, limit: :infinity)}"
end)
{[adapted | valid], failed}
{:error, reason} ->
Logger.warning("[MessageHandler] Failed to adapt kill ##{index}: #{inspect(reason)}")
# Store raw kill for potential retry
failed_kill = Map.put(kill, "_adaptation_error", to_string(reason))
{valid, [failed_kill | failed]}
end
end
end

View File

@@ -0,0 +1,296 @@
defmodule WandererApp.Kills.Storage do
@moduledoc """
Manages caching and storage of killmail data.
Provides a centralized interface for storing and retrieving kill-related data
using Cachex for distributed caching.
"""
require Logger
alias WandererApp.Kills.Config
@doc """
Stores killmails for a specific system.
Stores both individual killmails by ID and a list of kills for the system.
"""
@spec store_killmails(integer(), list(map()), pos_integer()) :: :ok | {:error, term()}
def store_killmails(system_id, killmails, ttl) do
result1 = store_individual_killmails(killmails, ttl)
require Logger
Logger.debug("[Storage] store_individual_killmails returned: #{inspect(result1)}")
result2 = update_system_kill_list(system_id, killmails, ttl)
Logger.debug("[Storage] update_system_kill_list returned: #{inspect(result2)}")
case {result1, result2} do
{:ok, :ok} ->
:ok
{{:error, reason}, _} ->
Logger.error("[Storage] Failed to store individual killmails: #{inspect(reason)}")
{:error, reason}
{_, {:error, reason}} ->
Logger.error("[Storage] Failed to update system kill list: #{inspect(reason)}")
{:error, reason}
other ->
Logger.error("[Storage] Unexpected results: #{inspect(other)}")
{:error, {:unexpected_results, other}}
end
end
@doc """
Stores or updates the kill count for a system.
This should only be used for kill count updates from the WebSocket service.
"""
@spec store_kill_count(integer(), non_neg_integer()) :: :ok | {:error, any()}
def store_kill_count(system_id, count) do
key = "zkb:kills:#{system_id}"
ttl = Config.kill_count_ttl()
metadata_key = "zkb:kills:metadata:#{system_id}"
# Store both the count and metadata about when it was set
# This helps detect if we should trust incremental updates or the absolute count
timestamp = System.system_time(:millisecond)
with :ok <- WandererApp.Cache.insert(key, count, ttl: ttl),
:ok <-
WandererApp.Cache.insert(
metadata_key,
%{
"source" => "websocket",
"timestamp" => timestamp,
"absolute_count" => count
},
ttl: ttl
) do
:ok
else
# Nebulex might return true instead of :ok
true -> :ok
error -> error
end
end
@doc """
Updates the kill count by adding to the existing count.
This is used when processing incoming killmails.
"""
@spec update_kill_count(integer(), non_neg_integer(), pos_integer()) :: :ok | {:error, any()}
def update_kill_count(system_id, additional_kills, ttl) do
key = "zkb:kills:#{system_id}"
metadata_key = "zkb:kills:metadata:#{system_id}"
# Check metadata to see if we should trust incremental updates
metadata = WandererApp.Cache.get(metadata_key)
current_time = System.system_time(:millisecond)
# If we have recent websocket data (within 5 seconds), don't increment
# This prevents double counting when both killmail and count updates arrive
should_increment =
case metadata do
%{"source" => "websocket", "timestamp" => ws_timestamp} ->
current_time - ws_timestamp > 5000
_ ->
true
end
if should_increment do
# Use atomic update operation
result =
WandererApp.Cache.insert_or_update(
key,
additional_kills,
fn current_count -> current_count + additional_kills end,
ttl: ttl
)
case result do
:ok ->
# Update metadata to indicate this was an incremental update
WandererApp.Cache.insert(
metadata_key,
%{
"source" => "incremental",
"timestamp" => current_time,
"last_increment" => additional_kills
},
ttl: ttl
)
:ok
{:ok, _} ->
:ok
true ->
:ok
error ->
error
end
else
# Skip increment as we have recent absolute count from websocket
Logger.debug(
"[Storage] Skipping kill count increment for system #{system_id} due to recent websocket update"
)
:ok
end
end
@doc """
Retrieves the kill count for a system.
"""
@spec get_kill_count(integer()) :: {:ok, non_neg_integer()} | {:error, :not_found}
def get_kill_count(system_id) do
key = "zkb:kills:#{system_id}"
case WandererApp.Cache.get(key) do
nil -> {:error, :not_found}
count -> {:ok, count}
end
end
@doc """
Retrieves a specific killmail by ID.
"""
@spec get_killmail(integer()) :: {:ok, map()} | {:error, :not_found}
def get_killmail(killmail_id) do
key = "zkb:killmail:#{killmail_id}"
case WandererApp.Cache.get(key) do
nil -> {:error, :not_found}
killmail -> {:ok, killmail}
end
end
@doc """
Retrieves all kills for a specific system.
"""
@spec get_system_kills(integer()) :: {:ok, list(map())} | {:error, :not_found}
def get_system_kills(system_id) do
# Get the list of killmail IDs for this system
kill_ids = WandererApp.Cache.get("zkb:kills:list:#{system_id}") || []
if kill_ids == [] do
{:error, :not_found}
else
# Fetch details for each killmail
kills =
kill_ids
|> Enum.map(&WandererApp.Cache.get("zkb:killmail:#{&1}"))
|> Enum.reject(&is_nil/1)
{:ok, kills}
end
end
@doc """
Reconciles kill count with actual kill list length.
This can be called periodically to ensure consistency.
"""
@spec reconcile_kill_count(integer()) :: :ok | {:error, term()}
def reconcile_kill_count(system_id) do
key = "zkb:kills:#{system_id}"
list_key = "zkb:kills:list:#{system_id}"
metadata_key = "zkb:kills:metadata:#{system_id}"
ttl = Config.kill_count_ttl()
# Get actual kill list length
actual_count =
case WandererApp.Cache.get(list_key) do
nil -> 0
list when is_list(list) -> length(list)
_ -> 0
end
# Update the count to match reality
with :ok <- WandererApp.Cache.insert(key, actual_count, ttl: ttl),
:ok <-
WandererApp.Cache.insert(
metadata_key,
%{
"source" => "reconciliation",
"timestamp" => System.system_time(:millisecond),
"actual_count" => actual_count
},
ttl: ttl
) do
:ok
else
true -> :ok
error -> error
end
end
# Private functions
defp store_individual_killmails(killmails, ttl) do
results =
Enum.map(killmails, fn killmail ->
killmail_id = Map.get(killmail, "killmail_id") || Map.get(killmail, :killmail_id)
if killmail_id do
key = "zkb:killmail:#{killmail_id}"
# Capture the result of cache insert
WandererApp.Cache.insert(key, killmail, ttl: ttl)
else
{:error, :missing_killmail_id}
end
end)
# Check if any failed
case Enum.find(results, &match?({:error, _}, &1)) do
nil -> :ok
error -> error
end
end
defp update_system_kill_list(system_id, new_killmails, ttl) do
# Store as a list of killmail IDs for compatibility with ZkbDataFetcher
key = "zkb:kills:list:#{system_id}"
kill_list_limit = Config.kill_list_limit()
# Extract killmail IDs from new kills
new_ids =
new_killmails
|> Enum.map(fn kill ->
Map.get(kill, "killmail_id") || Map.get(kill, :killmail_id)
end)
|> Enum.reject(&is_nil/1)
# Use atomic update to prevent race conditions
case WandererApp.Cache.insert_or_update(
key,
new_ids,
fn existing_ids ->
# Merge with existing, keeping unique IDs and newest first
(new_ids ++ existing_ids)
|> Enum.uniq()
|> Enum.take(kill_list_limit)
end,
ttl: ttl
) do
:ok ->
:ok
{:ok, _} ->
:ok
true ->
:ok
error ->
Logger.error(
"[Storage] Failed to update system kill list for system #{system_id}: #{inspect(error)}"
)
{:error, :cache_update_failed}
end
end
end

View File

@@ -0,0 +1,71 @@
defmodule WandererApp.Kills.Subscription.Manager do
@moduledoc """
Manages system subscriptions for kills WebSocket service.
"""
@type subscriptions :: MapSet.t(integer())
@spec subscribe_systems(subscriptions(), [integer()]) :: {subscriptions(), [integer()]}
def subscribe_systems(current_systems, system_ids) when is_list(system_ids) do
system_set = MapSet.new(system_ids)
new_systems = MapSet.difference(system_set, current_systems)
new_list = MapSet.to_list(new_systems)
{MapSet.union(current_systems, new_systems), new_list}
end
@spec unsubscribe_systems(subscriptions(), [integer()]) :: {subscriptions(), [integer()]}
def unsubscribe_systems(current_systems, system_ids) when is_list(system_ids) do
system_set = MapSet.new(system_ids)
systems_to_remove = MapSet.intersection(current_systems, system_set)
removed_list = MapSet.to_list(systems_to_remove)
{MapSet.difference(current_systems, systems_to_remove), removed_list}
end
@spec sync_with_server(pid() | nil, [integer()], [integer()]) :: :ok
def sync_with_server(nil, _to_subscribe, _to_unsubscribe), do: :ok
def sync_with_server(socket_pid, to_subscribe, to_unsubscribe) do
if to_unsubscribe != [], do: send(socket_pid, {:unsubscribe_systems, to_unsubscribe})
if to_subscribe != [], do: send(socket_pid, {:subscribe_systems, to_subscribe})
:ok
end
@spec resubscribe_all(pid(), subscriptions()) :: :ok
def resubscribe_all(socket_pid, subscribed_systems) do
system_list = MapSet.to_list(subscribed_systems)
if system_list != [] do
send(socket_pid, {:subscribe_systems, system_list})
end
:ok
end
@spec get_stats(subscriptions()) :: map()
def get_stats(subscribed_systems) do
%{
total_subscribed: MapSet.size(subscribed_systems),
subscribed_systems: MapSet.to_list(subscribed_systems) |> Enum.sort()
}
end
@spec cleanup_subscriptions(subscriptions()) :: {subscriptions(), [integer()]}
def cleanup_subscriptions(subscribed_systems) do
systems_to_check = MapSet.to_list(subscribed_systems)
# Use MapIntegration's system_in_active_map? to avoid duplication
valid_systems =
Enum.filter(
systems_to_check,
&WandererApp.Kills.Subscription.MapIntegration.system_in_active_map?/1
)
invalid_systems = systems_to_check -- valid_systems
if invalid_systems != [] do
{MapSet.new(valid_systems), invalid_systems}
else
{subscribed_systems, []}
end
end
end

View File

@@ -0,0 +1,271 @@
defmodule WandererApp.Kills.Subscription.MapIntegration do
@moduledoc """
Handles integration between the kills WebSocket service and the map system.
Manages automatic subscription updates when maps change and provides
utilities for syncing kill data with map systems.
"""
require Logger
@doc """
Handles updates when map systems change.
Determines which systems to subscribe/unsubscribe based on the update.
"""
@spec handle_map_systems_updated([integer()], MapSet.t(integer())) ::
{:ok, [integer()], [integer()]}
def handle_map_systems_updated(system_ids, current_subscriptions) when is_list(system_ids) do
# Find all unique systems across all maps
all_map_systems = get_all_map_systems()
# Systems to subscribe: in the update and in active maps but not currently subscribed
new_systems =
system_ids
|> Enum.filter(&(&1 in all_map_systems))
|> Enum.reject(&MapSet.member?(current_subscriptions, &1))
# Systems to unsubscribe: currently subscribed but no longer in any active map
obsolete_systems =
current_subscriptions
|> MapSet.to_list()
|> Enum.reject(&(&1 in all_map_systems))
{:ok, new_systems, obsolete_systems}
end
@doc """
Gets all unique system IDs across all active maps.
This function queries the DATABASE for all persisted maps and their systems,
regardless of whether those maps have active GenServer processes running.
This is different from `get_tracked_system_ids/0` which only returns systems
from maps with live processes in the Registry.
Use this function when you need a complete view of all systems across all
stored maps (e.g., for bulk operations or reporting).
This replaces the duplicate functionality from SystemTracker.
"""
@spec get_all_map_systems() :: MapSet.t(integer())
def get_all_map_systems do
{:ok, maps} = WandererApp.Maps.get_available_maps()
# Get all map IDs
map_ids = Enum.map(maps, & &1.id)
# Batch query all systems for all maps at once
all_systems = WandererApp.MapSystemRepo.get_all_by_maps(map_ids)
# Handle direct list return from repo
all_systems
|> Enum.map(& &1.solar_system_id)
|> MapSet.new()
end
@doc """
Gets all system IDs that should be tracked for kills.
Returns a list of unique system IDs from all active maps.
This function returns systems from LIVE MAP PROCESSES only - maps that are currently
running in the system. It uses the Registry to find active map GenServers.
This is different from `get_all_map_systems/0` which queries the database for ALL
persisted maps regardless of whether they have an active process.
Use this function when you need to know which systems are actively being tracked
by running map processes (e.g., for real-time updates).
This consolidates functionality from SystemTracker.
"""
@spec get_tracked_system_ids() :: {:ok, list(integer())} | {:error, term()}
def get_tracked_system_ids do
try do
# Get systems from currently running maps
system_ids =
WandererApp.Map.RegistryHelper.list_all_maps()
|> Enum.flat_map(fn %{id: map_id} ->
case WandererApp.MapSystemRepo.get_all_by_map(map_id) do
{:ok, systems} -> Enum.map(systems, & &1.solar_system_id)
_ -> []
end
end)
|> Enum.reject(&is_nil/1)
|> Enum.uniq()
{:ok, system_ids}
rescue
error ->
Logger.error("[MapIntegration] Failed to get tracked systems: #{inspect(error)}")
{:error, error}
end
end
@doc """
Gets all system IDs for a specific map.
"""
@spec get_map_system_ids(String.t()) :: {:ok, [integer()]} | {:error, term()}
def get_map_system_ids(map_id) do
case WandererApp.MapSystemRepo.get_all_by_map(map_id) do
{:ok, systems} ->
system_ids = Enum.map(systems, & &1.solar_system_id)
{:ok, system_ids}
error ->
Logger.error(
"[MapIntegration] Failed to get systems for map #{map_id}: #{inspect(error)}"
)
error
end
end
@doc """
Checks if a system is in any active map.
"""
@spec system_in_active_map?(integer()) :: boolean()
def system_in_active_map?(system_id) do
{:ok, maps} = WandererApp.Maps.get_available_maps()
Enum.any?(maps, &system_in_map?(&1, system_id))
end
@doc """
Broadcasts kill data to relevant map servers.
"""
@spec broadcast_kill_to_maps(map()) :: :ok | {:error, term()}
def broadcast_kill_to_maps(kill_data) when is_map(kill_data) do
case Map.get(kill_data, "solar_system_id") do
system_id when is_integer(system_id) ->
# Use the index to find maps containing this system
map_ids = WandererApp.Kills.Subscription.SystemMapIndex.get_maps_for_system(system_id)
# Broadcast to each relevant map
Enum.each(map_ids, fn map_id ->
Phoenix.PubSub.broadcast(
WandererApp.PubSub,
"map:#{map_id}",
{:map_kill, kill_data}
)
end)
:ok
system_id when is_binary(system_id) ->
Logger.warning(
"[MapIntegration] Invalid solar_system_id format (string): #{inspect(system_id)}"
)
{:error, {:invalid_system_id_format, system_id}}
nil ->
Logger.warning(
"[MapIntegration] Missing solar_system_id in kill data: #{inspect(Map.keys(kill_data))}"
)
{:error, {:missing_solar_system_id, kill_data}}
invalid_id ->
Logger.warning("[MapIntegration] Invalid solar_system_id type: #{inspect(invalid_id)}")
{:error, {:invalid_system_id_type, invalid_id}}
end
end
def broadcast_kill_to_maps(invalid_data) do
Logger.warning(
"[MapIntegration] Invalid kill_data type (expected map): #{inspect(invalid_data)}"
)
{:error, {:invalid_kill_data_type, invalid_data}}
end
@doc """
Gets subscription statistics grouped by map.
"""
@spec get_map_subscription_stats(MapSet.t(integer())) :: map()
def get_map_subscription_stats(subscribed_systems) do
{:ok, maps} = WandererApp.Maps.get_available_maps()
stats = Enum.map(maps, &get_map_stats(&1, subscribed_systems))
%{
maps: stats,
total_subscribed: MapSet.size(subscribed_systems),
total_maps: length(maps)
}
end
@doc """
Handles map deletion by returning systems to unsubscribe.
"""
@spec handle_map_deleted(String.t(), MapSet.t(integer())) :: [integer()]
def handle_map_deleted(map_id, current_subscriptions) do
# Get systems from the deleted map
case get_map_system_ids(map_id) do
{:ok, deleted_systems} ->
# Precompute all active systems to avoid O(N×M) queries
active_systems = get_all_active_systems_set()
# Only unsubscribe systems that aren't in other maps
deleted_systems
|> Enum.filter(&MapSet.member?(current_subscriptions, &1))
|> Enum.reject(&MapSet.member?(active_systems, &1))
_ ->
[]
end
end
# Helper functions to reduce nesting
defp get_all_active_systems_set do
{:ok, maps} = WandererApp.Maps.get_available_maps()
maps
|> Enum.flat_map(&get_map_systems_or_empty/1)
|> MapSet.new()
end
defp get_map_systems_or_empty(map) do
case get_map_system_ids(map.id) do
{:ok, system_ids} -> system_ids
_ -> []
end
end
defp system_in_map?(map, system_id) do
case WandererApp.MapSystemRepo.get_by_map_and_solar_system_id(map.id, system_id) do
{:ok, _system} -> true
_ -> false
end
end
defp get_map_stats(map, subscribed_systems) do
case get_map_system_ids(map.id) do
{:ok, system_ids} ->
subscribed_count =
system_ids
|> Enum.filter(&MapSet.member?(subscribed_systems, &1))
|> length()
%{
map_id: map.id,
map_name: map.name,
total_systems: length(system_ids),
subscribed_systems: subscribed_count,
subscription_rate:
if(length(system_ids) > 0,
do: subscribed_count / length(system_ids) * 100,
else: 0
)
}
_ ->
%{
map_id: map.id,
map_name: map.name,
error: "Failed to load systems"
}
end
end
end

View File

@@ -0,0 +1,130 @@
defmodule WandererApp.Kills.Subscription.SystemMapIndex do
@moduledoc """
Maintains an in-memory index of system_id -> [map_ids] for efficient kill broadcasting.
This index prevents N+1 queries when broadcasting kills to relevant maps.
"""
use GenServer
require Logger
@table_name :kills_system_map_index
@refresh_interval :timer.minutes(5)
# Client API
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Gets all map IDs that contain the given system.
"""
@spec get_maps_for_system(integer()) :: [String.t()]
def get_maps_for_system(system_id) do
case :ets.lookup(@table_name, system_id) do
[{^system_id, map_ids}] -> map_ids
[] -> []
end
end
@doc """
Refreshes the index immediately.
"""
@spec refresh() :: :ok
def refresh do
GenServer.cast(__MODULE__, :refresh)
end
# Server callbacks
@impl true
def init(_opts) do
# Create ETS table for fast lookups
:ets.new(@table_name, [:set, :protected, :named_table, read_concurrency: true])
# Initial build
send(self(), :build_index)
# Schedule periodic refresh
schedule_refresh()
{:ok, %{}}
end
@impl true
def handle_info(:build_index, state) do
build_index()
{:noreply, state}
end
def handle_info(:refresh, state) do
build_index()
schedule_refresh()
{:noreply, state}
end
@impl true
def handle_cast(:refresh, state) do
build_index()
{:noreply, state}
end
# Private functions
defp build_index do
Logger.debug("[SystemMapIndex] Building system->maps index")
case fetch_all_map_systems() do
{:ok, index_data} ->
# Clear and rebuild the table
:ets.delete_all_objects(@table_name)
# Insert all entries
Enum.each(index_data, fn {system_id, map_ids} ->
:ets.insert(@table_name, {system_id, map_ids})
end)
Logger.debug("[SystemMapIndex] Index built with #{map_size(index_data)} systems")
{:error, reason} ->
Logger.error("[SystemMapIndex] Failed to build index: #{inspect(reason)}")
end
end
defp fetch_all_map_systems do
try do
{:ok, maps} = WandererApp.Maps.get_available_maps()
# Build the index: system_id -> [map_ids]
index =
maps
|> Enum.reduce(%{}, fn map, acc ->
case WandererApp.MapSystemRepo.get_all_by_map(map.id) do
{:ok, systems} ->
# Add this map to each system's list
Enum.reduce(systems, acc, fn system, acc2 ->
Map.update(acc2, system.solar_system_id, [map.id], &[map.id | &1])
end)
_ ->
acc
end
end)
|> Enum.map(fn {system_id, map_ids} ->
# Remove duplicates and convert to list
{system_id, Enum.uniq(map_ids)}
end)
|> Map.new()
{:ok, index}
rescue
e ->
{:error, e}
end
end
defp schedule_refresh do
Process.send_after(self(), :refresh, @refresh_interval)
end
end

View File

@@ -0,0 +1,23 @@
defmodule WandererApp.Kills.Supervisor do
@moduledoc """
Supervisor for the kills subsystem.
"""
use Supervisor
@spec start_link(keyword()) :: Supervisor.on_start()
def start_link(opts) do
Supervisor.start_link(__MODULE__, opts, name: __MODULE__)
end
@impl true
def init(_opts) do
children = [
{Task.Supervisor, name: WandererApp.Kills.TaskSupervisor},
{WandererApp.Kills.Subscription.SystemMapIndex, []},
{WandererApp.Kills.Client, []},
{WandererApp.Kills.MapEventListener, []}
]
Supervisor.init(children, strategy: :one_for_one)
end
end

View File

@@ -1,20 +1,18 @@
defmodule WandererApp.Map.ZkbDataFetcher do
@moduledoc """
Refreshes the map zKillboard data every 15 seconds.
Refreshes and broadcasts map kill data every 15 seconds.
Works with cache data populated by the WandererKills WebSocket service.
"""
use GenServer
require Logger
alias WandererApp.Zkb.KillsProvider.KillsCache
@interval :timer.seconds(15)
@store_map_kills_timeout :timer.hours(1)
@killmail_ttl_hours 24
@logger Application.compile_env(:wanderer_app, :logger)
# This means 120 “ticks” of 15s each → ~30 minutes
@preload_cycle_ticks 120
def start_link(_) do
GenServer.start_link(__MODULE__, %{}, name: __MODULE__)
end
@@ -22,25 +20,25 @@ defmodule WandererApp.Map.ZkbDataFetcher do
@impl true
def init(_arg) do
{:ok, _timer_ref} = :timer.send_interval(@interval, :fetch_data)
{:ok, %{iteration: 0}}
{:ok, %{}}
end
@impl true
def handle_info(:fetch_data, %{iteration: iteration} = state) do
zkill_preload_disabled = WandererApp.Env.zkill_preload_disabled?()
def handle_info(:fetch_data, state) do
kills_enabled = Application.get_env(:wanderer_app, :wanderer_kills_service_enabled, true)
if kills_enabled do
WandererApp.Map.RegistryHelper.list_all_maps()
|> Task.async_stream(
fn %{id: map_id, pid: _server_pid} ->
try do
if WandererApp.Map.Server.map_pid(map_id) do
# Always update kill counts
update_map_kills(map_id)
# Update detailed kills for maps with active subscriptions
{:ok, is_subscription_active} = map_id |> WandererApp.Map.is_subscription_active?()
can_preload_zkill = not zkill_preload_disabled && is_subscription_active
if can_preload_zkill do
if is_subscription_active do
update_detailed_map_kills(map_id)
end
end
@@ -53,22 +51,9 @@ defmodule WandererApp.Map.ZkbDataFetcher do
on_timeout: :kill_task
)
|> Enum.each(fn _ -> :ok end)
new_iteration = iteration + 1
cond do
zkill_preload_disabled ->
# If preload is disabled, just update iteration
{:noreply, %{state | iteration: new_iteration}}
new_iteration >= @preload_cycle_ticks ->
Logger.info("[ZkbDataFetcher] Triggering a fresh kill preload pass ...")
WandererApp.Zkb.KillsPreloader.run_preload_now()
{:noreply, %{state | iteration: 0}}
true ->
{:noreply, %{state | iteration: new_iteration}}
end
{:noreply, state}
end
# Catch any async task results we aren't explicitly pattern-matching
@@ -84,7 +69,8 @@ defmodule WandererApp.Map.ZkbDataFetcher do
|> WandererApp.Map.get_map!()
|> Map.get(:systems, %{})
|> Enum.into(%{}, fn {solar_system_id, _system} ->
kills_count = WandererApp.Cache.get("zkb_kills_#{solar_system_id}") || 0
# Read kill counts from cache (populated by WebSocket)
kills_count = WandererApp.Cache.get("zkb:kills:#{solar_system_id}") || 0
{solar_system_id, kills_count}
end)
|> maybe_broadcast_map_kills(map_id)
@@ -98,16 +84,32 @@ defmodule WandererApp.Map.ZkbDataFetcher do
|> WandererApp.Map.get_map!()
|> Map.get(:systems, %{})
# Old cache data
old_ids_map = WandererApp.Cache.get("map_#{map_id}:zkb_ids") || %{}
old_details_map = WandererApp.Cache.get("map_#{map_id}:zkb_detailed_kills") || %{}
# Get existing cached data - ensure it's a map
cache_key_ids = "map:#{map_id}:zkb:ids"
cache_key_details = "map:#{map_id}:zkb:detailed_kills"
old_ids_map = case WandererApp.Cache.get(cache_key_ids) do
map when is_map(map) -> map
_ -> %{}
end
old_details_map = case WandererApp.Cache.get(cache_key_details) do
map when is_map(map) -> map
_ ->
# Initialize with empty map and store it
WandererApp.Cache.insert(cache_key_details, %{}, ttl: :timer.hours(@killmail_ttl_hours))
%{}
end
# Build current killmail ID map from cache
new_ids_map =
Enum.into(systems, %{}, fn {solar_system_id, _} ->
ids = KillsCache.get_system_killmail_ids(solar_system_id) |> MapSet.new()
{solar_system_id, ids}
# Get killmail IDs from cache (populated by WebSocket)
ids = WandererApp.Cache.get("zkb:kills:list:#{solar_system_id}") || []
{solar_system_id, MapSet.new(ids)}
end)
# Find systems with changed killmail lists
changed_systems =
new_ids_map
|> Enum.filter(fn {system_id, new_ids_set} ->
@@ -121,6 +123,16 @@ defmodule WandererApp.Map.ZkbDataFetcher do
"[ZkbDataFetcher] No changes in detailed kills for map_id=#{map_id}"
end)
# Don't overwrite existing cache data when there are no changes
# Only initialize if cache doesn't exist
if old_details_map == %{} do
# First time initialization - create empty structure
empty_map = systems
|> Enum.into(%{}, fn {system_id, _} -> {system_id, []} end)
WandererApp.Cache.insert(cache_key_details, empty_map, ttl: :timer.hours(@killmail_ttl_hours))
end
:ok
else
# Build new details for each changed system
@@ -131,30 +143,34 @@ defmodule WandererApp.Map.ZkbDataFetcher do
|> Map.fetch!(system_id)
|> MapSet.to_list()
# Get killmail details from cache (populated by WebSocket)
kill_details =
kill_ids
|> Enum.map(&KillsCache.get_killmail/1)
|> Enum.map(&WandererApp.Cache.get("zkb:killmail:#{&1}"))
|> Enum.reject(&is_nil/1)
# Ensure system_id is an integer key
Map.put(acc, system_id, kill_details)
end)
# Update the ID map cache
updated_ids_map =
Enum.reduce(changed_systems, old_ids_map, fn system_id, acc ->
new_ids_list = new_ids_map[system_id] |> MapSet.to_list()
Map.put(acc, system_id, new_ids_list)
end)
WandererApp.Cache.put("map_#{map_id}:zkb_ids", updated_ids_map,
ttl: :timer.hours(KillsCache.killmail_ttl())
# Store updated caches
WandererApp.Cache.insert(cache_key_ids, updated_ids_map,
ttl: :timer.hours(@killmail_ttl_hours)
)
WandererApp.Cache.put("map_#{map_id}:zkb_detailed_kills", updated_details_map,
ttl: :timer.hours(KillsCache.killmail_ttl())
WandererApp.Cache.insert(cache_key_details, updated_details_map,
ttl: :timer.hours(@killmail_ttl_hours)
)
# Broadcast changes
changed_data = Map.take(updated_details_map, changed_systems)
WandererApp.Map.Server.Impl.broadcast!(map_id, :detailed_kills_updated, changed_data)
:ok
@@ -163,7 +179,7 @@ defmodule WandererApp.Map.ZkbDataFetcher do
end
defp maybe_broadcast_map_kills(new_kills_map, map_id) do
{:ok, old_kills_map} = WandererApp.Cache.lookup("map_#{map_id}:zkb_kills", %{})
{:ok, old_kills_map} = WandererApp.Cache.lookup("map:#{map_id}:zkb:kills", %{})
# Use the union of keys from both the new and old maps
all_system_ids = Map.keys(Map.merge(new_kills_map, old_kills_map))
@@ -181,7 +197,7 @@ defmodule WandererApp.Map.ZkbDataFetcher do
:ok
else
:ok =
WandererApp.Cache.put("map_#{map_id}:zkb_kills", new_kills_map,
WandererApp.Cache.insert("map:#{map_id}:zkb:kills", new_kills_map,
ttl: @store_map_kills_timeout
)

View File

@@ -98,6 +98,10 @@ defmodule WandererApp.Map.Server.Impl do
WandererApp.Cache.insert("map_#{map_id}:started", true)
# Initialize zkb cache structure to prevent timing issues
cache_key = "map:#{map_id}:zkb:detailed_kills"
WandererApp.Cache.insert(cache_key, %{}, ttl: :timer.hours(24))
broadcast!(map_id, :map_server_started)
:telemetry.execute([:wanderer_app, :map, :started], %{count: 1})

View File

@@ -20,6 +20,18 @@ defmodule WandererApp.MapSystemRepo do
WandererApp.Api.MapSystem.read_all_by_map(%{map_id: map_id})
end
def get_all_by_maps(map_ids) when is_list(map_ids) do
# Since there's no bulk query, we need to query each map individually
map_ids
|> Enum.flat_map(fn map_id ->
case get_all_by_map(map_id) do
{:ok, systems} -> systems
_ -> []
end
end)
|> Enum.uniq_by(& &1.solar_system_id)
end
def get_visible_by_map(map_id) do
WandererApp.Api.MapSystem.read_visible_by_map(%{map_id: map_id})
end

View File

@@ -1,291 +0,0 @@
defmodule WandererApp.Zkb.KillsPreloader do
@moduledoc """
On startup, kicks off two passes (quick and expanded) to preload kills data.
There is also a `run_preload_now/0` function for manual triggering of the same logic.
"""
use GenServer
require Logger
alias WandererApp.Zkb.KillsProvider
alias WandererApp.Zkb.KillsProvider.KillsCache
# ----------------
# Configuration
# ----------------
# (1) Quick pass
@quick_limit 1
@quick_hours 1
# (2) Expanded pass
@expanded_limit 25
@expanded_hours 24
# How many minutes back we look for “last active” maps
@last_active_cutoff 30
# Default concurrency if not provided
@default_max_concurrency 2
@doc """
Starts the GenServer with optional opts (like `max_concurrency`).
"""
def start_link(opts \\ []) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
@doc """
Public helper to explicitly request a fresh preload pass (both quick & expanded).
"""
def run_preload_now() do
send(__MODULE__, :start_preload)
end
@impl true
def init(opts) do
state = %{
phase: :idle,
calls_count: 0,
max_concurrency: Keyword.get(opts, :max_concurrency, @default_max_concurrency)
}
# Kick off the preload passes once at startup
send(self(), :start_preload)
{:ok, state}
end
@impl true
def handle_info(:start_preload, state) do
# Gather last-active maps (or fallback).
cutoff_time =
DateTime.utc_now()
|> DateTime.add(-@last_active_cutoff, :minute)
last_active_maps_result = WandererApp.Api.MapState.get_last_active(cutoff_time)
last_active_maps = resolve_last_active_maps(last_active_maps_result)
active_maps_with_subscription = get_active_maps_with_subscription(last_active_maps)
# Gather systems from those maps
system_tuples = gather_visible_systems(active_maps_with_subscription)
unique_systems = Enum.uniq(system_tuples)
Logger.debug(fn -> "
[KillsPreloader] Found #{length(unique_systems)} unique systems \
across #{length(last_active_maps)} map(s)
" end)
# ---- QUICK PASS ----
state_quick = %{state | phase: :quick_pass}
{time_quick_ms, state_after_quick} =
measure_execution_time(fn ->
do_pass(unique_systems, :quick, @quick_hours, @quick_limit, state_quick)
end)
Logger.info(
"[KillsPreloader] Phase 1 (quick) done => calls_count=#{state_after_quick.calls_count}, elapsed=#{time_quick_ms}ms"
)
# ---- EXPANDED PASS ----
state_expanded = %{state_after_quick | phase: :expanded_pass}
{time_expanded_ms, final_state} =
measure_execution_time(fn ->
do_pass(unique_systems, :expanded, @quick_hours, @expanded_limit, state_expanded)
end)
Logger.info(
"[KillsPreloader] Phase 2 (expanded) done => calls_count=#{final_state.calls_count}, elapsed=#{time_expanded_ms}ms"
)
# Reset phase to :idle
{:noreply, %{final_state | phase: :idle}}
end
@impl true
def handle_info(_other, state), do: {:noreply, state}
defp resolve_last_active_maps({:ok, []}) do
Logger.warning("[KillsPreloader] No last-active maps found. Using fallback logic...")
case WandererApp.Maps.get_available_maps() do
{:ok, []} ->
Logger.error("[KillsPreloader] Fallback: get_available_maps returned zero maps!")
[]
{:ok, maps} ->
# pick the newest map by updated_at
fallback_map = Enum.max_by(maps, & &1.updated_at, fn -> nil end)
if fallback_map, do: [fallback_map], else: []
end
end
defp resolve_last_active_maps({:ok, maps}) when is_list(maps),
do: maps
defp resolve_last_active_maps({:error, reason}) do
Logger.error("[KillsPreloader] Could not load last-active maps => #{inspect(reason)}")
[]
end
defp get_active_maps_with_subscription(maps) do
maps
|> Enum.filter(fn map ->
{:ok, is_subscription_active} = map.id |> WandererApp.Map.is_subscription_active?()
is_subscription_active
end)
end
defp gather_visible_systems(maps) do
maps
|> Enum.flat_map(fn map_record ->
the_map_id = Map.get(map_record, :map_id) || Map.get(map_record, :id)
case WandererApp.MapSystemRepo.get_visible_by_map(the_map_id) do
{:ok, systems} ->
Enum.map(systems, fn sys -> {the_map_id, sys.solar_system_id} end)
{:error, reason} ->
Logger.warning(
"[KillsPreloader] get_visible_by_map failed => map_id=#{inspect(the_map_id)}, reason=#{inspect(reason)}"
)
[]
end
end)
end
defp do_pass(unique_systems, pass_type, hours, limit, state) do
Logger.info(
"[KillsPreloader] Starting #{pass_type} pass => #{length(unique_systems)} systems"
)
{final_state, _kills_map} =
unique_systems
|> Task.async_stream(
fn {_map_id, system_id} ->
fetch_kills_for_system(system_id, pass_type, hours, limit, state)
end,
max_concurrency: state.max_concurrency,
timeout: pass_timeout_ms(pass_type)
)
|> Enum.reduce({state, %{}}, fn task_result, {acc_state, acc_map} ->
reduce_task_result(pass_type, task_result, acc_state, acc_map)
end)
final_state
end
defp fetch_kills_for_system(system_id, :quick, hours, limit, state) do
Logger.debug(fn -> "[KillsPreloader] Quick fetch => system=#{system_id}, hours=#{hours}, limit=#{limit}" end)
case KillsProvider.Fetcher.fetch_kills_for_system(system_id, hours, state,
limit: limit,
force: false
) do
{:ok, kills, updated_state} ->
{:ok, system_id, kills, updated_state}
{:error, reason, updated_state} ->
Logger.warning(
"[KillsPreloader] Quick fetch failed => system=#{system_id}, reason=#{inspect(reason)}"
)
{:error, reason, updated_state}
end
end
defp fetch_kills_for_system(system_id, :expanded, hours, limit, state) do
Logger.debug(fn -> "[KillsPreloader] Expanded fetch => system=#{system_id}, hours=#{hours}, limit=#{limit} (forcing refresh)" end)
with {:ok, kills_1h, updated_state} <-
KillsProvider.Fetcher.fetch_kills_for_system(system_id, hours, state,
limit: limit,
force: true
),
{:ok, final_kills, final_state} <-
maybe_fetch_more_if_needed(system_id, kills_1h, limit, updated_state) do
{:ok, system_id, final_kills, final_state}
else
{:error, reason, updated_state} ->
Logger.warning(
"[KillsPreloader] Expanded fetch (#{hours}h) failed => system=#{system_id}, reason=#{inspect(reason)}"
)
{:error, reason, updated_state}
end
end
# If we got fewer kills than `limit` from the 1h fetch, top up from 24h
defp maybe_fetch_more_if_needed(system_id, kills_1h, limit, state) do
if length(kills_1h) < limit do
needed = limit - length(kills_1h)
Logger.debug(fn -> "[KillsPreloader] Expanding to #{@expanded_hours}h => system=#{system_id}, need=#{needed} more kills" end)
case KillsProvider.Fetcher.fetch_kills_for_system(system_id, @expanded_hours, state,
limit: needed,
force: true
) do
{:ok, _kills_24h, updated_state2} ->
final_kills =
KillsCache.fetch_cached_kills(system_id)
|> Enum.take(limit)
{:ok, final_kills, updated_state2}
{:error, reason2, updated_state2} ->
Logger.warning(
"[KillsPreloader] #{@expanded_hours}h fetch failed => system=#{system_id}, reason=#{inspect(reason2)}"
)
{:error, reason2, updated_state2}
end
else
{:ok, kills_1h, state}
end
end
defp reduce_task_result(pass_type, task_result, acc_state, acc_map) do
case task_result do
{:ok, {:ok, sys_id, kills, updated_state}} ->
# Merge calls count from updated_state into acc_state
new_state = merge_calls_count(acc_state, updated_state)
new_map = Map.put(acc_map, sys_id, kills)
{new_state, new_map}
{:ok, {:error, reason, updated_state}} ->
log_failed_task(pass_type, reason)
new_state = merge_calls_count(acc_state, updated_state)
{new_state, acc_map}
{:error, reason} ->
Logger.error("[KillsPreloader] #{pass_type} fetch task crashed => #{inspect(reason)}")
{acc_state, acc_map}
end
end
defp log_failed_task(:quick, reason),
do: Logger.warning("[KillsPreloader] Quick fetch task failed => #{inspect(reason)}")
defp log_failed_task(:expanded, reason),
do: Logger.error("[KillsPreloader] Expanded fetch task failed => #{inspect(reason)}")
defp merge_calls_count(%{calls_count: c1} = st1, %{calls_count: c2}),
do: %{st1 | calls_count: c1 + c2}
defp merge_calls_count(st1, _other),
do: st1
defp pass_timeout_ms(:quick), do: :timer.minutes(2)
defp pass_timeout_ms(:expanded), do: :timer.minutes(5)
defp measure_execution_time(fun) when is_function(fun, 0) do
start = System.monotonic_time()
result = fun.()
finish = System.monotonic_time()
ms = System.convert_time_unit(finish - start, :native, :millisecond)
{ms, result}
end
end

View File

@@ -1,29 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider do
use Fresh
require Logger
alias WandererApp.Zkb.KillsProvider.Websocket
defstruct [:connected]
def handle_connect(status, headers, state),
do: Websocket.handle_connect(status, headers, state)
def handle_in(frame, state),
do: Websocket.handle_in(frame, state)
def handle_control(msg, state),
do: Websocket.handle_control(msg, state)
def handle_info(msg, state),
do: Websocket.handle_info(msg, state)
def handle_disconnect(code, reason, state),
do: Websocket.handle_disconnect(code, reason, state)
def handle_error(err, state),
do: Websocket.handle_error(err, state)
def handle_terminate(reason, state),
do: Websocket.handle_terminate(reason, state)
end

View File

@@ -1,37 +0,0 @@
defmodule WandererApp.Zkb.Supervisor do
use Supervisor
@name __MODULE__
def start_link(opts \\ []) do
Supervisor.start_link(@name, opts, name: @name)
end
def init(_init_args) do
preloader_child =
unless WandererApp.Env.zkill_preload_disabled?() do
{WandererApp.Zkb.KillsPreloader, []}
end
children =
[
{
WandererApp.Zkb.KillsProvider,
uri: "wss://zkillboard.com/websocket/",
state: %WandererApp.Zkb.KillsProvider{
connected: false
},
opts: [
name: {:local, :zkb_kills_provider},
reconnect: true,
reconnect_after: 5_000,
max_reconnects: :infinity
]
},
preloader_child
]
|> Enum.reject(&is_nil/1)
Supervisor.init(children, strategy: :one_for_one)
end
end

View File

@@ -1,164 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider.KillsCache do
@moduledoc """
Provides helper functions for putting/fetching kill data
"""
require Logger
alias WandererApp.Cache
@killmail_ttl :timer.hours(24)
@system_kills_ttl :timer.hours(1)
# Base (average) expiry of 15 minutes for "recently fetched" systems
@base_full_fetch_expiry_ms 900_000
@jitter_percent 0.1
def killmail_ttl, do: @killmail_ttl
def system_kills_ttl, do: @system_kills_ttl
@doc """
Store the killmail data, keyed by killmail_id, with a 24h TTL.
"""
def put_killmail(killmail_id, kill_data) do
Logger.debug(fn -> "[KillsCache] Storing killmail => killmail_id=#{killmail_id}" end)
Cache.put(killmail_key(killmail_id), kill_data, ttl: @killmail_ttl)
end
@doc """
Fetch kills for `system_id` from the local cache only.
Returns a list of killmail maps (could be empty).
"""
def fetch_cached_kills(system_id) do
killmail_ids = get_system_killmail_ids(system_id)
Logger.debug(fn -> "[KillsCache] fetch_cached_kills => system_id=#{system_id}, count=#{length(killmail_ids)}" end)
killmail_ids
|> Enum.map(&get_killmail/1)
|> Enum.reject(&is_nil/1)
end
@doc """
Fetch cached kills for multiple solar system IDs.
Returns a map of `%{ solar_system_id => list_of_kills }`.
"""
def fetch_cached_kills_for_systems(system_ids) when is_list(system_ids) do
Enum.reduce(system_ids, %{}, fn sid, acc ->
kills_list = fetch_cached_kills(sid)
Map.put(acc, sid, kills_list)
end)
end
@doc """
Fetch the killmail data (if any) from the cache, by killmail_id.
"""
def get_killmail(killmail_id) do
Cache.get(killmail_key(killmail_id))
end
@doc """
Adds `killmail_id` to the list of killmail IDs for the system
if its not already present. The TTL is 24 hours.
"""
def add_killmail_id_to_system_list(solar_system_id, killmail_id) do
Cache.update(
system_kills_list_key(solar_system_id),
[],
fn existing_list ->
existing_list = existing_list || []
if killmail_id in existing_list do
existing_list
else
existing_list ++ [killmail_id]
end
end,
ttl: @killmail_ttl
)
end
@doc """
Returns a list of killmail IDs for the given system, or [] if none.
"""
def get_system_killmail_ids(solar_system_id) do
Cache.get(system_kills_list_key(solar_system_id)) || []
end
@doc """
Increments the kill count for a system by `amount`. The TTL is 1 hour.
"""
def incr_system_kill_count(solar_system_id, amount \\ 1) do
Cache.incr(
system_kills_key(solar_system_id),
amount,
default: 0,
ttl: @system_kills_ttl
)
end
@doc """
Returns the integer count of kills for this system in the last hour, or 0.
"""
def get_system_kill_count(solar_system_id) do
Cache.get(system_kills_key(solar_system_id)) || 0
end
@doc """
Check if the system is still in its "recently fetched" window.
We store an `expires_at` timestamp (in ms). If `now < expires_at`,
this system is still considered "recently fetched".
"""
def recently_fetched?(system_id) do
case Cache.lookup(fetched_timestamp_key(system_id)) do
{:ok, expires_at_ms} when is_integer(expires_at_ms) ->
now_ms = current_time_ms()
now_ms < expires_at_ms
_ ->
false
end
end
@doc """
Puts a jittered `expires_at` in the cache for `system_id`,
marking it as fully fetched for ~15 minutes (+/- 10%).
"""
def put_full_fetched_timestamp(system_id) do
now_ms = current_time_ms()
max_jitter = round(@base_full_fetch_expiry_ms * @jitter_percent)
# random offset in range [-max_jitter..+max_jitter]
offset = :rand.uniform(2 * max_jitter + 1) - (max_jitter + 1)
final_expiry_ms = max(@base_full_fetch_expiry_ms + offset, 60_000)
expires_at_ms = now_ms + final_expiry_ms
Logger.debug(fn -> "[KillsCache] Marking system=#{system_id} recently_fetched? until #{expires_at_ms} (ms)" end)
Cache.put(fetched_timestamp_key(system_id), expires_at_ms)
end
@doc """
Returns how many ms remain until this system's "recently fetched" window ends.
If it's already expired (or doesn't exist), returns -1.
"""
def fetch_age_ms(system_id) do
now_ms = current_time_ms()
case Cache.lookup(fetched_timestamp_key(system_id)) do
{:ok, expires_at_ms} when is_integer(expires_at_ms) ->
if now_ms < expires_at_ms do
expires_at_ms - now_ms
else
-1
end
_ ->
-1
end
end
defp killmail_key(killmail_id), do: "zkb_killmail_#{killmail_id}"
defp system_kills_key(solar_system_id), do: "zkb_kills_#{solar_system_id}"
defp system_kills_list_key(solar_system_id), do: "zkb_kills_list_#{solar_system_id}"
defp fetched_timestamp_key(system_id), do: "zkb_system_fetched_at_#{system_id}"
defp current_time_ms() do
DateTime.utc_now() |> DateTime.to_unix(:millisecond)
end
end

View File

@@ -1,278 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider.Fetcher do
@moduledoc """
Low-level API for fetching killmails from zKillboard + ESI.
"""
require Logger
use Retry
alias WandererApp.Zkb.KillsProvider.{Parser, KillsCache, ZkbApi}
alias WandererApp.Utils.HttpUtil
@page_size 200
@max_pages 2
@doc """
Fetch killmails for multiple systems, returning a map of system_id => kills.
"""
def fetch_kills_for_systems(system_ids, since_hours, state, _opts \\ [])
when is_list(system_ids) do
zkill_preload_disabled = WandererApp.Env.zkill_preload_disabled?()
if not zkill_preload_disabled do
try do
{final_map, final_state} =
Enum.reduce(system_ids, {%{}, state}, fn sid, {acc_map, acc_st} ->
case fetch_kills_for_system(sid, since_hours, acc_st) do
{:ok, kills, new_st} ->
{Map.put(acc_map, sid, kills), new_st}
{:error, reason, new_st} ->
Logger.debug(fn -> "[Fetcher] system=#{sid} => error=#{inspect(reason)}" end)
{Map.put(acc_map, sid, {:error, reason}), new_st}
end
end)
Logger.debug(fn ->
"[Fetcher] fetch_kills_for_systems => done, final_map_size=#{map_size(final_map)} calls=#{final_state.calls_count}"
end)
{:ok, final_map}
rescue
e ->
Logger.error(
"[Fetcher] EXCEPTION in fetch_kills_for_systems => #{Exception.message(e)}"
)
{:error, e}
end
else
{:error, :kills_disabled}
end
end
@doc """
Fetch killmails for a single system within `since_hours` cutoff.
Options:
- `:limit` => integer limit on how many kills to fetch (optional).
If `limit` is nil (or not set), we fetch until we exhaust pages or older kills.
- `:force` => if true, ignore the "recently fetched" check and forcibly refetch.
Returns `{:ok, kills, updated_state}` on success, or `{:error, reason, updated_state}`.
"""
def fetch_kills_for_system(system_id, since_hours, state, opts \\ []) do
zkill_preload_disabled = WandererApp.Env.zkill_preload_disabled?()
if not zkill_preload_disabled do
limit = Keyword.get(opts, :limit, nil)
force? = Keyword.get(opts, :force, false)
log_prefix = "[Fetcher] fetch_kills_for_system => system=#{system_id}"
# Check the "recently fetched" cache if not forced
if not force? and KillsCache.recently_fetched?(system_id) do
cached_kills = KillsCache.fetch_cached_kills(system_id)
final = maybe_take(cached_kills, limit)
Logger.debug(fn ->
"#{log_prefix}, recently_fetched?=true => returning #{length(final)} cached kills"
end)
{:ok, final, state}
else
Logger.debug(fn ->
"#{log_prefix}, hours=#{since_hours}, limit=#{inspect(limit)}, force=#{force?}"
end)
cutoff_dt = hours_ago(since_hours)
result =
retry with:
exponential_backoff(300)
|> randomize()
|> cap(5_000)
|> expiry(120_000) do
case do_multi_page_fetch(system_id, cutoff_dt, 1, 0, limit, state) do
{:ok, new_st, total_fetched} ->
# Mark system as fully fetched (to prevent repeated calls).
KillsCache.put_full_fetched_timestamp(system_id)
final_kills = KillsCache.fetch_cached_kills(system_id) |> maybe_take(limit)
Logger.debug(fn ->
"#{log_prefix}, total_fetched=#{total_fetched}, final_cached=#{length(final_kills)}, calls_count=#{new_st.calls_count}"
end)
{:ok, final_kills, new_st}
{:error, :rate_limited, _new_st} ->
raise ":rate_limited"
{:error, reason, _new_st} ->
raise "#{log_prefix}, reason=#{inspect(reason)}"
end
end
case result do
{:ok, kills, new_st} ->
{:ok, kills, new_st}
error ->
Logger.error("#{log_prefix}, EXHAUSTED => error=#{inspect(error)}")
{:error, error, state}
end
end
else
raise ":kills_disabled"
end
rescue
e ->
Logger.error("[Fetcher] EXCEPTION in fetch_kills_for_system => #{Exception.message(e)}")
{:error, e, state}
end
defp do_multi_page_fetch(_system_id, _cutoff_dt, page, total_so_far, _limit, state)
when page > @max_pages do
# No more pages
{:ok, state, total_so_far}
end
defp do_multi_page_fetch(system_id, cutoff_dt, page, total_so_far, limit, state) do
Logger.debug(
"[Fetcher] do_multi_page_fetch => system=#{system_id}, page=#{page}, total_so_far=#{total_so_far}, limit=#{inspect(limit)}"
)
with {:ok, st1} <- increment_calls_count(state),
{:ok, st2, partials} <- ZkbApi.fetch_and_parse_page(system_id, page, st1) do
Logger.debug(fn ->
"[Fetcher] system=#{system_id}, page=#{page}, partials_count=#{length(partials)}"
end)
{_count_stored, older_found?, total_now} =
Enum.reduce_while(partials, {0, false, total_so_far}, fn partial,
{acc_count, had_older, acc_total} ->
# If we have a limit and reached it, stop immediately
if reached_limit?(limit, acc_total) do
{:halt, {acc_count, had_older, acc_total}}
else
case parse_partial(partial, cutoff_dt) do
:older ->
# Found an older kill => we can halt the entire multi-page fetch
{:halt, {acc_count, true, acc_total}}
:ok ->
{:cont, {acc_count + 1, false, acc_total + 1}}
:skip ->
{:cont, {acc_count, had_older, acc_total}}
end
end
end)
cond do
# If we found older kills, stop now
older_found? ->
{:ok, st2, total_now}
# If we have a limit and just reached or exceeded it
reached_limit?(limit, total_now) ->
{:ok, st2, total_now}
# If partials < @page_size, no more kills are left
length(partials) < @page_size ->
{:ok, st2, total_now}
# Otherwise, keep going to next page
true ->
do_multi_page_fetch(system_id, cutoff_dt, page + 1, total_now, limit, st2)
end
else
{:error, :rate_limited, stx} ->
{:error, :rate_limited, stx}
{:error, reason, stx} ->
{:error, reason, stx}
other ->
Logger.warning("[Fetcher] Unexpected result => #{inspect(other)}")
{:error, :unexpected, state}
end
end
defp parse_partial(
%{"killmail_id" => kill_id, "zkb" => %{"hash" => kill_hash}} = partial,
cutoff_dt
) do
# If we've already cached this kill, skip
if KillsCache.get_killmail(kill_id) do
:skip
else
# Actually fetch the full kill from ESI
case fetch_full_killmail(kill_id, kill_hash) do
{:ok, full_km} ->
# Delegate the time check & storing to Parser
Parser.parse_full_and_store(full_km, partial, cutoff_dt)
{:error, reason} ->
Logger.warning("[Fetcher] ESI fail => kill_id=#{kill_id}, reason=#{inspect(reason)}")
:skip
end
end
end
defp parse_partial(_other, _cutoff_dt), do: :skip
defp fetch_full_killmail(k_id, k_hash) do
retry with: exponential_backoff(300) |> randomize() |> cap(5_000) |> expiry(30_000),
rescue_only: [RuntimeError] do
case WandererApp.Esi.get_killmail(k_id, k_hash) do
{:ok, full_km} ->
{:ok, full_km}
{:error, :timeout} ->
Logger.warning("[Fetcher] ESI get_killmail timeout => kill_id=#{k_id}, retrying...")
raise "ESI timeout, will retry"
{:error, :not_found} ->
Logger.warning("[Fetcher] ESI get_killmail not_found => kill_id=#{k_id}")
{:error, :not_found}
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.warning(
"[Fetcher] ESI get_killmail retriable error => kill_id=#{k_id}, reason=#{inspect(reason)}"
)
raise "ESI error: #{inspect(reason)}, will retry"
else
Logger.warning(
"[Fetcher] ESI get_killmail failed => kill_id=#{k_id}, reason=#{inspect(reason)}"
)
{:error, reason}
end
error ->
Logger.warning(
"[Fetcher] ESI get_killmail failed => kill_id=#{k_id}, reason=#{inspect(error)}"
)
error
end
end
end
defp hours_ago(h),
do: DateTime.utc_now() |> DateTime.add(-h * 3600, :second)
defp increment_calls_count(%{calls_count: c} = st),
do: {:ok, %{st | calls_count: c + 1}}
defp reached_limit?(nil, _count_so_far), do: false
defp reached_limit?(limit, count_so_far) when is_integer(limit),
do: count_so_far >= limit
defp maybe_take(kills, nil), do: kills
defp maybe_take(kills, limit), do: Enum.take(kills, limit)
end

View File

@@ -1,505 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider.Parser do
@moduledoc """
Helper for parsing & storing a killmail from the ESI data (plus zKB partial).
Responsible for:
- Parsing the raw JSON structures,
- Combining partial & full kill data,
- Checking whether kills are 'too old',
- Storing in KillsCache, etc.
"""
require Logger
alias WandererApp.Zkb.KillsProvider.KillsCache
alias WandererApp.Utils.HttpUtil
use Retry
# Maximum retries for enrichment calls
@doc """
Merges the 'partial' from zKB and the 'full' killmail from ESI, checks its time
vs. `cutoff_dt`.
Returns:
- `:ok` if we parsed & stored successfully,
- `:older` if killmail time is older than `cutoff_dt`,
- `:skip` if we cannot parse or store for some reason.
"""
def parse_full_and_store(full_km, partial_zkb, cutoff_dt) when is_map(full_km) do
# Attempt to parse the killmail_time
case parse_killmail_time(full_km) do
{:ok, km_dt} ->
if older_than_cutoff?(km_dt, cutoff_dt) do
:older
else
# Merge the "zkb" portion from the partial into the full killmail
enriched = Map.merge(full_km, %{"zkb" => partial_zkb["zkb"]})
parse_and_store_killmail(enriched)
end
_ ->
:skip
end
end
def parse_full_and_store(_full_km, _partial_zkb, _cutoff_dt),
do: :skip
@doc """
Parse a raw killmail (`full_km`) and store it if valid.
Returns:
- `:ok` if successfully parsed & stored,
- `:skip` otherwise
"""
def parse_and_store_killmail(%{"killmail_id" => _kill_id} = full_km) do
parsed_map = do_parse(full_km)
if is_nil(parsed_map) or is_nil(parsed_map["kill_time"]) do
:skip
else
store_killmail(parsed_map)
:ok
end
end
def parse_and_store_killmail(_),
do: :skip
defp do_parse(%{"killmail_id" => kill_id} = km) do
victim = Map.get(km, "victim", %{})
attackers = Map.get(km, "attackers", [])
kill_time_dt =
case DateTime.from_iso8601("#{Map.get(km, "killmail_time", "")}") do
{:ok, dt, _off} -> dt
_ -> nil
end
npc_flag = get_in(km, ["zkb", "npc"]) || false
%{
"killmail_id" => kill_id,
"kill_time" => kill_time_dt,
"solar_system_id" => km["solar_system_id"],
"zkb" => Map.get(km, "zkb", %{}),
"attacker_count" => length(attackers),
"total_value" => get_in(km, ["zkb", "totalValue"]) || 0,
"victim" => victim,
"attackers" => attackers,
"npc" => npc_flag
}
end
defp do_parse(_),
do: nil
@doc """
Extracts & returns {:ok, DateTime} from the "killmail_time" field, or :skip on failure.
"""
def parse_killmail_time(full_km) do
killmail_time_str = Map.get(full_km, "killmail_time", "")
case DateTime.from_iso8601(killmail_time_str) do
{:ok, dt, _offset} ->
{:ok, dt}
_ ->
:skip
end
end
defp older_than_cutoff?(%DateTime{} = dt, %DateTime{} = cutoff_dt),
do: DateTime.compare(dt, cutoff_dt) == :lt
defp store_killmail(%{"killmail_id" => nil}), do: :ok
defp store_killmail(%{"killmail_id" => kill_id} = parsed) do
final = build_kill_data(parsed)
if final do
enriched = maybe_enrich_killmail(final)
KillsCache.put_killmail(kill_id, enriched)
system_id = enriched["solar_system_id"]
KillsCache.add_killmail_id_to_system_list(system_id, kill_id)
if within_last_hour?(enriched["kill_time"]) do
KillsCache.incr_system_kill_count(system_id)
end
else
Logger.warning(
"[Parser] store_killmail => build_kill_data returned nil for kill_id=#{kill_id}"
)
end
end
defp store_killmail(_),
do: :ok
defp build_kill_data(%{
"killmail_id" => kill_id,
"kill_time" => kill_time_dt,
"solar_system_id" => sys_id,
"zkb" => zkb,
"victim" => victim,
"attackers" => attackers,
"attacker_count" => attacker_count,
"total_value" => total_value,
"npc" => npc
}) do
victim_map = extract_victim_fields(victim)
final_blow_map = extract_final_blow_fields(attackers)
%{
"killmail_id" => kill_id,
"kill_time" => kill_time_dt,
"solar_system_id" => sys_id,
"zkb" => zkb,
"victim_char_id" => victim_map.char_id,
"victim_corp_id" => victim_map.corp_id,
"victim_alliance_id" => victim_map.alliance_id,
"victim_ship_type_id" => victim_map.ship_type_id,
"final_blow_char_id" => final_blow_map.char_id,
"final_blow_corp_id" => final_blow_map.corp_id,
"final_blow_alliance_id" => final_blow_map.alliance_id,
"final_blow_ship_type_id" => final_blow_map.ship_type_id,
"attacker_count" => attacker_count,
"total_value" => total_value,
"npc" => npc
}
end
defp build_kill_data(_),
do: nil
defp extract_victim_fields(%{
"character_id" => cid,
"corporation_id" => corp,
"alliance_id" => alli,
"ship_type_id" => st_id
}),
do: %{char_id: cid, corp_id: corp, alliance_id: alli, ship_type_id: st_id}
defp extract_victim_fields(%{
"character_id" => cid,
"corporation_id" => corp,
"ship_type_id" => st_id
}),
do: %{char_id: cid, corp_id: corp, alliance_id: nil, ship_type_id: st_id}
defp extract_victim_fields(_),
do: %{char_id: nil, corp_id: nil, alliance_id: nil, ship_type_id: nil}
defp extract_final_blow_fields(attackers) when is_list(attackers) do
final = Enum.find(attackers, fn a -> a["final_blow"] == true end)
extract_attacker_fields(final)
end
defp extract_final_blow_fields(_),
do: %{char_id: nil, corp_id: nil, alliance_id: nil, ship_type_id: nil}
defp extract_attacker_fields(nil),
do: %{char_id: nil, corp_id: nil, alliance_id: nil, ship_type_id: nil}
defp extract_attacker_fields(%{
"character_id" => cid,
"corporation_id" => corp,
"alliance_id" => alli,
"ship_type_id" => st_id
}),
do: %{char_id: cid, corp_id: corp, alliance_id: alli, ship_type_id: st_id}
defp extract_attacker_fields(%{
"character_id" => cid,
"corporation_id" => corp,
"ship_type_id" => st_id
}),
do: %{char_id: cid, corp_id: corp, alliance_id: nil, ship_type_id: st_id}
defp extract_attacker_fields(%{"ship_type_id" => st_id} = attacker) do
%{
char_id: Map.get(attacker, "character_id"),
corp_id: Map.get(attacker, "corporation_id"),
alliance_id: Map.get(attacker, "alliance_id"),
ship_type_id: st_id
}
end
defp extract_attacker_fields(_),
do: %{char_id: nil, corp_id: nil, alliance_id: nil, ship_type_id: nil}
defp maybe_enrich_killmail(km) do
km
|> enrich_victim()
|> enrich_final_blow()
end
defp enrich_victim(km) do
km
|> maybe_put_character_name("victim_char_id", "victim_char_name")
|> maybe_put_corp_info("victim_corp_id", "victim_corp_ticker", "victim_corp_name")
|> maybe_put_alliance_info(
"victim_alliance_id",
"victim_alliance_ticker",
"victim_alliance_name"
)
|> maybe_put_ship_name("victim_ship_type_id", "victim_ship_name")
end
defp enrich_final_blow(km) do
km
|> maybe_put_character_name("final_blow_char_id", "final_blow_char_name")
|> maybe_put_corp_info("final_blow_corp_id", "final_blow_corp_ticker", "final_blow_corp_name")
|> maybe_put_alliance_info(
"final_blow_alliance_id",
"final_blow_alliance_ticker",
"final_blow_alliance_name"
)
|> maybe_put_ship_name("final_blow_ship_type_id", "final_blow_ship_name")
end
defp maybe_put_character_name(km, id_key, name_key) do
case Map.get(km, id_key) do
nil ->
km
0 ->
km
eve_id ->
result =
retry with: exponential_backoff(200) |> randomize() |> cap(2_000) |> expiry(10_000),
rescue_only: [RuntimeError] do
case WandererApp.Esi.get_character_info(eve_id) do
{:ok, %{"name" => char_name}} ->
{:ok, char_name}
{:error, :timeout} ->
Logger.debug(fn -> "[Parser] Character info timeout, retrying => id=#{eve_id}" end)
raise "Character info timeout, will retry"
{:error, :not_found} ->
Logger.debug(fn -> "[Parser] Character not found => id=#{eve_id}" end)
:skip
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.debug(fn ->
"[Parser] Character info retriable error => id=#{eve_id}, reason=#{inspect(reason)}"
end)
raise "Character info error: #{inspect(reason)}, will retry"
else
Logger.debug(fn ->
"[Parser] Character info failed => id=#{eve_id}, reason=#{inspect(reason)}"
end)
:skip
end
error ->
Logger.debug(fn ->
"[Parser] Character info failed => id=#{eve_id}, reason=#{inspect(error)}"
end)
:skip
end
end
case result do
{:ok, char_name} -> Map.put(km, name_key, char_name)
_ -> km
end
end
end
defp maybe_put_corp_info(km, id_key, ticker_key, name_key) do
case Map.get(km, id_key) do
nil ->
km
0 ->
km
corp_id ->
result =
retry with: exponential_backoff(200) |> randomize() |> cap(2_000) |> expiry(10_000),
rescue_only: [RuntimeError] do
case WandererApp.Esi.get_corporation_info(corp_id) do
{:ok, %{"ticker" => ticker, "name" => corp_name}} ->
{:ok, {ticker, corp_name}}
{:error, :timeout} ->
Logger.debug(fn ->
"[Parser] Corporation info timeout, retrying => id=#{corp_id}"
end)
raise "Corporation info timeout, will retry"
{:error, :not_found} ->
Logger.debug(fn -> "[Parser] Corporation not found => id=#{corp_id}" end)
:skip
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.debug(fn ->
"[Parser] Corporation info retriable error => id=#{corp_id}, reason=#{inspect(reason)}"
end)
raise "Corporation info error: #{inspect(reason)}, will retry"
else
Logger.warning(
"[Parser] Failed to fetch corp info: ID=#{corp_id}, reason=#{inspect(reason)}"
)
:skip
end
error ->
Logger.warning(
"[Parser] Failed to fetch corp info: ID=#{corp_id}, reason=#{inspect(error)}"
)
:skip
end
end
case result do
{:ok, {ticker, corp_name}} ->
km
|> Map.put(ticker_key, ticker)
|> Map.put(name_key, corp_name)
_ ->
km
end
end
end
defp maybe_put_alliance_info(km, id_key, ticker_key, name_key) do
case Map.get(km, id_key) do
nil ->
km
0 ->
km
alliance_id ->
result =
retry with: exponential_backoff(200) |> randomize() |> cap(2_000) |> expiry(10_000),
rescue_only: [RuntimeError] do
case WandererApp.Esi.get_alliance_info(alliance_id) do
{:ok, %{"ticker" => alliance_ticker, "name" => alliance_name}} ->
{:ok, {alliance_ticker, alliance_name}}
{:error, :timeout} ->
Logger.debug(fn ->
"[Parser] Alliance info timeout, retrying => id=#{alliance_id}"
end)
raise "Alliance info timeout, will retry"
{:error, :not_found} ->
Logger.debug(fn -> "[Parser] Alliance not found => id=#{alliance_id}" end)
:skip
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.debug(fn ->
"[Parser] Alliance info retriable error => id=#{alliance_id}, reason=#{inspect(reason)}"
end)
raise "Alliance info error: #{inspect(reason)}, will retry"
else
Logger.debug(fn ->
"[Parser] Alliance info failed => id=#{alliance_id}, reason=#{inspect(reason)}"
end)
:skip
end
error ->
Logger.debug(fn ->
"[Parser] Alliance info failed => id=#{alliance_id}, reason=#{inspect(error)}"
end)
:skip
end
end
case result do
{:ok, {alliance_ticker, alliance_name}} ->
km
|> Map.put(ticker_key, alliance_ticker)
|> Map.put(name_key, alliance_name)
_ ->
km
end
end
end
defp maybe_put_ship_name(km, id_key, name_key) do
case Map.get(km, id_key) do
nil ->
km
0 ->
km
type_id ->
result =
retry with: exponential_backoff(200) |> randomize() |> cap(2_000) |> expiry(10_000),
rescue_only: [RuntimeError] do
case WandererApp.CachedInfo.get_ship_type(type_id) do
{:ok, nil} ->
:skip
{:ok, %{name: ship_name}} ->
{:ok, ship_name}
{:error, :timeout} ->
Logger.debug(fn -> "[Parser] Ship type timeout, retrying => id=#{type_id}" end)
raise "Ship type timeout, will retry"
{:error, :not_found} ->
Logger.debug(fn -> "[Parser] Ship type not found => id=#{type_id}" end)
:skip
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.debug(fn ->
"[Parser] Ship type retriable error => id=#{type_id}, reason=#{inspect(reason)}"
end)
raise "Ship type error: #{inspect(reason)}, will retry"
else
Logger.warning(
"[Parser] Failed to fetch ship type: ID=#{type_id}, reason=#{inspect(reason)}"
)
:skip
end
error ->
Logger.warning(
"[Parser] Failed to fetch ship type: ID=#{type_id}, reason=#{inspect(error)}"
)
:skip
end
end
case result do
{:ok, ship_name} -> Map.put(km, name_key, ship_name)
_ -> km
end
end
end
# Utility
defp within_last_hour?(nil), do: false
defp within_last_hour?(%DateTime{} = dt),
do: DateTime.diff(DateTime.utc_now(), dt, :minute) < 60
end

View File

@@ -1,148 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider.Websocket do
@moduledoc """
Handles real-time kills from zKillboard WebSocket.
Always fetches from ESI to get killmail_time, victim, attackers, etc.
"""
require Logger
alias WandererApp.Zkb.KillsProvider.Parser
alias WandererApp.Esi
alias WandererApp.Utils.HttpUtil
use Retry
@heartbeat_interval 1_000
# Called by `KillsProvider.handle_connect`
def handle_connect(_status, _headers, %{connected: _} = state) do
Logger.info("[KillsProvider.Websocket] Connected => killstream")
new_state = Map.put(state, :connected, true)
handle_subscribe("killstream", new_state)
end
# Called by `KillsProvider.handle_in`
def handle_in({:text, frame}, state) do
Logger.debug(fn -> "[KillsProvider.Websocket] Received frame => #{frame}" end)
partial = Jason.decode!(frame)
parse_and_store_zkb_partial(partial)
{:ok, state}
end
# Called for control frames
def handle_control({:pong, _msg}, state),
do: {:ok, state}
def handle_control({:ping, _}, state) do
Process.send_after(self(), :heartbeat, @heartbeat_interval)
{:ok, state}
end
# Called by the process mailbox
def handle_info(:heartbeat, state) do
payload = Jason.encode!(%{"action" => "pong"})
{:reply, {:text, payload}, state}
end
def handle_info(_other, state), do: {:ok, state}
# Called on disconnect
def handle_disconnect(code, reason, _old_state) do
Logger.warning(
"[KillsProvider.Websocket] Disconnected => code=#{code}, reason=#{inspect(reason)} => reconnecting"
)
:reconnect
end
# Called on errors
def handle_error({err, _reason}, state) when err in [:encoding_failed, :casting_failed],
do: {:ignore, state}
def handle_error(_error, _state),
do: :reconnect
# Called on terminate
def handle_terminate(reason, _state) do
Logger.warning("[KillsProvider.Websocket] Terminating => #{inspect(reason)}")
end
defp handle_subscribe(channel, state) do
Logger.debug(fn -> "[KillsProvider.Websocket] Subscribing to #{channel}" end)
payload = Jason.encode!(%{"action" => "sub", "channel" => channel})
{:reply, {:text, payload}, state}
end
# The partial from zKillboard has killmail_id + zkb.hash, but no time/victim/attackers
defp parse_and_store_zkb_partial(
%{"killmail_id" => kill_id, "zkb" => %{"hash" => kill_hash}} = partial
) do
Logger.debug(fn ->
"[KillsProvider.Websocket] parse_and_store_zkb_partial => kill_id=#{kill_id}"
end)
result =
retry with: exponential_backoff(300) |> randomize() |> cap(5_000) |> expiry(30_000),
rescue_only: [RuntimeError] do
case Esi.get_killmail(kill_id, kill_hash) do
{:ok, full_esi_data} ->
# Merge partial zKB fields (like totalValue) onto ESI data
enriched = Map.merge(full_esi_data, %{"zkb" => partial["zkb"]})
Parser.parse_and_store_killmail(enriched)
:ok
{:error, :timeout} ->
Logger.warning(
"[KillsProvider.Websocket] ESI get_killmail timeout => kill_id=#{kill_id}, retrying..."
)
raise "ESI timeout, will retry"
{:error, :not_found} ->
Logger.warning(
"[KillsProvider.Websocket] ESI get_killmail not_found => kill_id=#{kill_id}"
)
:skip
{:error, reason} ->
if HttpUtil.retriable_error?(reason) do
Logger.warning(
"[KillsProvider.Websocket] ESI get_killmail retriable error => kill_id=#{kill_id}, reason=#{inspect(reason)}"
)
raise "ESI error: #{inspect(reason)}, will retry"
else
Logger.warning(
"[KillsProvider.Websocket] ESI get_killmail failed => kill_id=#{kill_id}, reason=#{inspect(reason)}"
)
:skip
end
error ->
Logger.warning(
"[KillsProvider.Websocket] ESI get_killmail failed => kill_id=#{kill_id}, reason=#{inspect(error)}"
)
:skip
end
end
case result do
:ok ->
:ok
:skip ->
:skip
{:error, reason} ->
Logger.error(
"[KillsProvider.Websocket] ESI get_killmail exhausted retries => kill_id=#{kill_id}, reason=#{inspect(reason)}"
)
:skip
end
end
defp parse_and_store_zkb_partial(_),
do: :skip
end

View File

@@ -1,80 +0,0 @@
defmodule WandererApp.Zkb.KillsProvider.ZkbApi do
@moduledoc """
A small module for making HTTP requests to zKillboard and
parsing JSON responses, separate from the multi-page logic.
"""
require Logger
alias ExRated
# 5 calls per second allowed
@exrated_bucket :zkb_preloader_provider
@exrated_interval_ms 1_000
@exrated_max_requests 5
@zkillboard_api "https://zkillboard.com/api"
@doc """
Perform rate-limit check before fetching a single page from zKillboard and parse the response.
Returns:
- `{:ok, updated_state, partials_list}` on success
- `{:error, reason, updated_state}` if error
"""
def fetch_and_parse_page(system_id, page, %{calls_count: _} = state) do
with :ok <- check_rate(),
{:ok, resp} <- do_req_get(system_id, page),
partials when is_list(partials) <- parse_response_body(resp) do
{:ok, state, partials}
else
{:error, :rate_limited} ->
{:error, :rate_limited, state}
{:error, reason} ->
{:error, reason, state}
_other ->
{:error, :unexpected, state}
end
end
defp do_req_get(system_id, page) do
url = "#{@zkillboard_api}/kills/systemID/#{system_id}/page/#{page}/"
Logger.debug(fn -> "[ZkbApi] GET => system=#{system_id}, page=#{page}, url=#{url}" end)
try do
resp = Req.get!(url, decode_body: :json)
if resp.status == 200 do
{:ok, resp}
else
{:error, {:http_status, resp.status}}
end
rescue
e ->
Logger.error("""
[ZkbApi] do_req_get => exception: #{Exception.message(e)}
#{Exception.format_stacktrace(__STACKTRACE__)}
""")
{:error, :exception}
end
end
defp parse_response_body(%{status: 200, body: body}) when is_list(body),
do: body
defp parse_response_body(_),
do: :not_list
defp check_rate do
case ExRated.check_rate(@exrated_bucket, @exrated_interval_ms, @exrated_max_requests) do
{:ok, _count} ->
:ok
{:error, limit} ->
Logger.debug(fn -> "[ZkbApi] RATE_LIMIT => limit=#{inspect(limit)}" end)
{:error, :rate_limited}
end
end
end

View File

@@ -9,7 +9,6 @@ defmodule WandererAppWeb.MapAPIController do
alias WandererApp.MapSystemRepo
alias WandererApp.MapCharacterSettingsRepo
alias WandererApp.MapConnectionRepo
alias WandererApp.Zkb.KillsProvider.KillsCache
alias WandererAppWeb.Helpers.APIUtils
alias WandererAppWeb.Schemas.{ApiSchemas, ResponseSchemas}
@@ -423,7 +422,20 @@ defmodule WandererAppWeb.MapAPIController do
|| params["hour_ago"] # legacy typo
) do
solar_ids = Enum.map(systems, & &1.solar_system_id)
kills_map = KillsCache.fetch_cached_kills_for_systems(solar_ids)
# Fetch cached kills for each system from cache
kills_map =
Enum.reduce(solar_ids, %{}, fn sid, acc ->
kill_list_key = "zkb:kills:list:#{sid}"
kill_ids = WandererApp.Cache.get(kill_list_key) || []
kills_list =
kill_ids
|> Enum.map(fn kill_id ->
killmail_key = "zkb:killmail:#{kill_id}"
WandererApp.Cache.get(killmail_key)
end)
|> Enum.reject(&is_nil/1)
Map.put(acc, sid, kills_list)
end)
data =
Enum.map(systems, fn sys ->

View File

@@ -4,7 +4,7 @@ defmodule WandererAppWeb.Plugs.CheckKillsDisabled do
def init(opts), do: opts
def call(conn, _opts) do
if WandererApp.Env.zkill_preload_disabled?() do
if not WandererApp.Env.wanderer_kills_service_enabled?() do
conn
|> send_resp(403, "Map kill feed is disabled")
|> halt()

View File

@@ -134,15 +134,16 @@ defmodule WandererAppWeb.MapCoreEventHandler do
is_version_valid? = to_string(version) == to_string(app_version)
if is_version_valid? do
assigns
|> Map.get(:map_id)
|> case do
map_id = Map.get(assigns, :map_id)
case map_id do
map_id when not is_nil(map_id) ->
maybe_start_map(map_id)
_ ->
WandererApp.Cache.insert("map_#{map_slug}:ui_loaded", true)
end
else
end
{:noreply, socket |> assign(:is_version_valid?, is_version_valid?)}
@@ -479,9 +480,24 @@ defmodule WandererAppWeb.MapCoreEventHandler do
events
end
# Load initial kill counts
kills_data = case WandererApp.Map.get_map(map_id) do
{:ok, %{systems: systems}} ->
systems
|> Enum.map(fn {solar_system_id, _system} ->
kills_count = case WandererApp.Cache.get("zkb:kills:#{solar_system_id}") do
count when is_integer(count) and count >= 0 -> count
_ -> 0
end
%{solar_system_id: solar_system_id, kills: kills_count}
end)
_ ->
nil
end
initial_data =
%{
kills: nil,
kills: kills_data,
present_characters:
present_character_ids
|> WandererApp.Character.get_character_eve_ids!(),

View File

@@ -1,41 +1,93 @@
defmodule WandererAppWeb.MapKillsEventHandler do
@moduledoc """
Handles kills-related UI/server events.
Uses cache data populated by the WandererKills WebSocket service.
"""
use WandererAppWeb, :live_component
require Logger
alias WandererAppWeb.{MapEventHandler, MapCoreEventHandler}
alias WandererApp.Zkb.KillsProvider
alias WandererApp.Zkb.KillsProvider.KillsCache
def handle_server_event(
%{event: :init_kills},
%{
assigns: %{
map_id: map_id
}
} = socket
%{assigns: %{map_id: map_id} = assigns} = socket
) do
{:ok, kills} = WandererApp.Cache.lookup("map_#{map_id}:zkb_kills", Map.new())
socket
|> MapEventHandler.push_map_event(
"map_updated",
%{
kills:
kills
|> Enum.filter(fn {_, kills} -> kills > 0 end)
|> Enum.map(&map_ui_kill/1)
}
# Get kill counts from cache
case WandererApp.Map.get_map(map_id) do
{:ok, %{systems: systems}} ->
kill_counts =
systems
|> Enum.into(%{}, fn {solar_system_id, _system} ->
# Use explicit cache lookup with validation from WandererApp.Cache
kills_count =
case WandererApp.Cache.get("zkb:kills:#{solar_system_id}") do
count when is_integer(count) and count >= 0 ->
count
nil ->
0
invalid_data ->
Logger.warning(
"[#{__MODULE__}] Invalid kill count data for system #{solar_system_id}: #{inspect(invalid_data)}"
)
0
end
{solar_system_id, kills_count}
end)
|> Enum.filter(fn {_system_id, count} -> count > 0 end)
|> Enum.into(%{})
kills_payload = kill_counts
|> Enum.map(fn {system_id, kills} ->
%{solar_system_id: system_id, kills: kills}
end)
MapEventHandler.push_map_event(
socket,
"kills_updated",
kills_payload
)
error ->
Logger.warning("[#{__MODULE__}] Failed to get map #{map_id}: #{inspect(error)}")
socket
end
end
def handle_server_event(%{event: :update_system_kills, payload: solar_system_id}, socket) do
# Get kill count for the specific system
kills_count = case WandererApp.Cache.get("zkb:kills:#{solar_system_id}") do
count when is_integer(count) and count >= 0 ->
count
nil ->
0
invalid_data ->
Logger.warning("[#{__MODULE__}] Invalid kill count data for new system #{solar_system_id}: #{inspect(invalid_data)}")
0
end
# Only send update if there are kills
if kills_count > 0 do
MapEventHandler.push_map_event(socket, "kills_updated", [%{solar_system_id: solar_system_id, kills: kills_count}])
else
socket
end
end
def handle_server_event(%{event: :kills_updated, payload: kills}, socket) do
kills =
kills
|> Enum.map(&map_ui_kill/1)
|> Enum.map(fn {system_id, count} ->
%{solar_system_id: system_id, kills: count}
end)
socket
|> MapEventHandler.push_map_event(
@@ -100,22 +152,53 @@ defmodule WandererAppWeb.MapKillsEventHandler do
%{"system_id" => sid, "since_hours" => sh} = payload,
socket
) do
with {:ok, system_id} <- parse_id(sid),
{:ok, since_hours} <- parse_id(sh) do
kills_from_cache = KillsCache.fetch_cached_kills(system_id)
reply_payload = %{"system_id" => system_id, "kills" => kills_from_cache}
Task.async(fn ->
case KillsProvider.Fetcher.fetch_kills_for_system(system_id, since_hours, %{
calls_count: 0
}) do
{:ok, fresh_kills, _new_state} ->
{:detailed_kills_updated, %{system_id => fresh_kills}}
{:error, reason, _new_state} ->
Logger.warning("[#{__MODULE__}] fetch_kills_for_system => error=#{inspect(reason)}")
{:system_kills_error, {system_id, reason}}
handle_get_system_kills(sid, sh, payload, socket)
end
def handle_ui_event(
"get_systems_kills",
%{"system_ids" => sids, "since_hours" => sh} = payload,
socket
) do
handle_get_systems_kills(sids, sh, payload, socket)
end
def handle_ui_event(event, payload, socket) do
MapCoreEventHandler.handle_ui_event(event, payload, socket)
end
defp handle_get_system_kills(sid, sh, payload, socket) do
with {:ok, system_id} <- parse_id(sid),
{:ok, _since_hours} <- parse_id(sh) do
cache_key = "map:#{socket.assigns.map_id}:zkb:detailed_kills"
# Get from WandererApp.Cache (not Cachex)
kills_data =
case WandererApp.Cache.get(cache_key) do
cached_map when is_map(cached_map) ->
# Validate cache structure and extract system kills
case Map.get(cached_map, system_id) do
kills when is_list(kills) -> kills
_ -> []
end
nil ->
[]
invalid_data ->
Logger.warning(
"[#{__MODULE__}] Invalid cache data structure for key: #{cache_key}, got: #{inspect(invalid_data)}"
)
# Clear invalid cache entry
WandererApp.Cache.delete(cache_key)
[]
end
reply_payload = %{"system_id" => system_id, "kills" => kills_data}
Logger.debug(fn ->
"[#{__MODULE__}] get_system_kills => system_id=#{system_id}, cached_kills=#{length(kills_data)}"
end)
{:reply, reply_payload, socket}
@@ -126,74 +209,43 @@ defmodule WandererAppWeb.MapKillsEventHandler do
end
end
def handle_ui_event(
"get_systems_kills",
%{"system_ids" => sids, "since_hours" => sh} = payload,
socket
) do
with {:ok, since_hours} <- parse_id(sh),
defp handle_get_systems_kills(sids, sh, payload, socket) do
with {:ok, _since_hours} <- parse_id(sh),
{:ok, parsed_ids} <- parse_system_ids(sids) do
Logger.debug(fn ->
"[#{__MODULE__}] get_systems_kills => system_ids=#{inspect(parsed_ids)}, since_hours=#{since_hours}"
end)
# Get the cutoff time based on since_hours
cutoff = DateTime.utc_now() |> DateTime.add(-since_hours * 3600, :second)
cache_key = "map:#{socket.assigns.map_id}:zkb:detailed_kills"
Logger.debug(fn ->
"[#{__MODULE__}] get_systems_kills => cutoff=#{DateTime.to_iso8601(cutoff)}"
end)
# Fetch and filter kills for each system
cached_map =
Enum.reduce(parsed_ids, %{}, fn sid, acc ->
# Get all cached kills for this system
all_kills = KillsCache.fetch_cached_kills(sid)
# Filter kills based on the cutoff time
filtered_kills =
Enum.filter(all_kills, fn kill ->
kill_time = kill["kill_time"]
case kill_time do
%DateTime{} = dt ->
# Keep kills that occurred after the cutoff
DateTime.compare(dt, cutoff) != :lt
time when is_binary(time) ->
# Try to parse the string time
case DateTime.from_iso8601(time) do
{:ok, dt, _} -> DateTime.compare(dt, cutoff) != :lt
_ -> false
end
# If it's something else (nil, or a weird format), skip
_ ->
false
# Get from WandererApp.Cache (not Cachex)
filtered_data =
case WandererApp.Cache.get(cache_key) do
cached_map when is_map(cached_map) ->
# Validate and filter cached data
parsed_ids
|> Enum.reduce(%{}, fn system_id, acc ->
case Map.get(cached_map, system_id) do
kills when is_list(kills) -> Map.put(acc, system_id, kills)
_ -> acc
end
end)
Logger.debug(fn ->
"[#{__MODULE__}] get_systems_kills => system_id=#{sid}, all_kills=#{length(all_kills)}, filtered_kills=#{length(filtered_kills)}"
end)
nil ->
%{}
Map.put(acc, sid, filtered_kills)
end)
invalid_data ->
Logger.warning(
"[#{__MODULE__}] Invalid cache data structure for key: #{cache_key}, got: #{inspect(invalid_data)}"
)
reply_payload = %{"systems_kills" => cached_map}
Task.async(fn ->
case KillsProvider.Fetcher.fetch_kills_for_systems(parsed_ids, since_hours, %{
calls_count: 0
}) do
{:ok, systems_map} ->
{:detailed_kills_updated, systems_map}
{:error, reason} ->
Logger.warning("[#{__MODULE__}] fetch_kills_for_systems => error=#{inspect(reason)}")
{:systems_kills_error, {parsed_ids, reason}}
# Clear invalid cache entry
WandererApp.Cache.delete(cache_key)
%{}
end
end)
# filtered_data is already the final result, not wrapped in a tuple
systems_data = filtered_data
reply_payload = %{"systems_kills" => systems_data}
{:reply, reply_payload, socket}
else
@@ -203,10 +255,6 @@ defmodule WandererAppWeb.MapKillsEventHandler do
end
end
def handle_ui_event(event, payload, socket) do
MapCoreEventHandler.handle_ui_event(event, payload, socket)
end
defp parse_id(value) when is_binary(value) do
case Integer.parse(value) do
{int, ""} -> {:ok, int}
@@ -233,9 +281,4 @@ defmodule WandererAppWeb.MapKillsEventHandler do
end
defp parse_system_ids(_), do: :error
defp map_ui_kill({solar_system_id, kills}),
do: %{solar_system_id: solar_system_id, kills: kills}
defp map_ui_kill(_kill), do: %{}
end

View File

@@ -5,12 +5,15 @@ defmodule WandererAppWeb.MapSystemsEventHandler do
alias WandererAppWeb.{MapEventHandler, MapCoreEventHandler}
def handle_server_event(%{event: :add_system, payload: system}, socket),
do:
def handle_server_event(%{event: :add_system, payload: system}, socket) do
# Schedule kill update for the new system after a short delay to allow subscription
Process.send_after(self(), %{event: :update_system_kills, payload: system.solar_system_id}, 2000)
socket
|> MapEventHandler.push_map_event("add_systems", [
MapEventHandler.map_ui_system(system)
])
end
def handle_server_event(%{event: :update_system, payload: system}, socket),
do:

View File

@@ -142,7 +142,8 @@ defmodule WandererAppWeb.MapEventHandler do
@map_kills_events [
:init_kills,
:kills_updated,
:detailed_kills_updated
:detailed_kills_updated,
:update_system_kills
]
@map_kills_ui_events [

View File

@@ -65,6 +65,8 @@ defmodule WandererApp.MixProject do
{:phoenix_live_reload, "~> 1.5.3", only: :dev},
{:phoenix_live_view, "~> 1.0.0-rc.7", override: true},
{:phoenix_pubsub, "~> 2.1"},
{:phoenix_gen_socket_client, "~> 4.0"},
{:websocket_client, "~> 1.5"},
{:floki, ">= 0.30.0", only: :test},
{:phoenix_live_dashboard, "~> 0.8.3"},
{:phoenix_ddos, "~> 1.1"},

View File

@@ -93,6 +93,7 @@
"phoenix": {:hex, :phoenix, "1.7.20", "6bababaf27d59f5628f9b608de902a021be2cecefb8231e1dbdc0a2e2e480e9b", [:mix], [{:castore, ">= 0.0.0", [hex: :castore, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: true]}, {:phoenix_pubsub, "~> 2.1", [hex: :phoenix_pubsub, repo: "hexpm", optional: false]}, {:phoenix_template, "~> 1.0", [hex: :phoenix_template, repo: "hexpm", optional: false]}, {:phoenix_view, "~> 2.0", [hex: :phoenix_view, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.7", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:plug_crypto, "~> 1.2 or ~> 2.0", [hex: :plug_crypto, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:websock_adapter, "~> 0.5.3", [hex: :websock_adapter, repo: "hexpm", optional: false]}], "hexpm", "6be2ab98302e8784a31829e0d50d8bdfa81a23cd912c395bafd8b8bfb5a086c2"},
"phoenix_ddos": {:hex, :phoenix_ddos, "1.1.19", "4a15054480627e437d02b4ab9d6316a3755db1275ff2699a8b9a5aeed751be50", [:mix], [{:cachex, ">= 3.0.0", [hex: :cachex, repo: "hexpm", optional: false]}, {:phoenix, ">= 1.5.0", [hex: :phoenix, repo: "hexpm", optional: false]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, ">= 0.0.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "9c6c39893644fd8bd7363e890e8d2c5981238224678db1d3e62f7fc94cac3ee6"},
"phoenix_ecto": {:hex, :phoenix_ecto, "4.6.2", "3b83b24ab5a2eb071a20372f740d7118767c272db386831b2e77638c4dcc606d", [:mix], [{:ecto, "~> 3.5", [hex: :ecto, repo: "hexpm", optional: false]}, {:phoenix_html, "~> 2.14.2 or ~> 3.0 or ~> 4.1", [hex: :phoenix_html, repo: "hexpm", optional: true]}, {:plug, "~> 1.9", [hex: :plug, repo: "hexpm", optional: false]}, {:postgrex, "~> 0.16 or ~> 1.0", [hex: :postgrex, repo: "hexpm", optional: true]}], "hexpm", "3f94d025f59de86be00f5f8c5dd7b5965a3298458d21ab1c328488be3b5fcd59"},
"phoenix_gen_socket_client": {:hex, :phoenix_gen_socket_client, "4.0.0", "ab49bb8ef3d48d721f3381d758b78ba148b5eb77d8d8780df0b8c364fe2b2449", [:mix], [{:jason, "~> 1.1", [hex: :jason, repo: "hexpm", optional: true]}, {:websocket_client, "~> 1.2", [hex: :websocket_client, repo: "hexpm", optional: true]}], "hexpm", "f6a8266c4f287e082216ab8e3ebf4f42e1a43dd81c8de3ae45cfd9489d4f1f8f"},
"phoenix_html": {:hex, :phoenix_html, "4.2.1", "35279e2a39140068fc03f8874408d58eef734e488fc142153f055c5454fd1c08", [:mix], [], "hexpm", "cff108100ae2715dd959ae8f2a8cef8e20b593f8dfd031c9cba92702cf23e053"},
"phoenix_html_helpers": {:hex, :phoenix_html_helpers, "1.0.1", "7eed85c52eff80a179391036931791ee5d2f713d76a81d0d2c6ebafe1e11e5ec", [:mix], [{:phoenix_html, "~> 4.0", [hex: :phoenix_html, repo: "hexpm", optional: false]}, {:plug, "~> 1.5", [hex: :plug, repo: "hexpm", optional: true]}], "hexpm", "cffd2385d1fa4f78b04432df69ab8da63dc5cf63e07b713a4dcf36a3740e3090"},
"phoenix_live_dashboard": {:hex, :phoenix_live_dashboard, "0.8.4", "4508e481f791ce62ec6a096e13b061387158cbeefacca68c6c1928e1305e23ed", [:mix], [{:ecto, "~> 3.6.2 or ~> 3.7", [hex: :ecto, repo: "hexpm", optional: true]}, {:ecto_mysql_extras, "~> 0.5", [hex: :ecto_mysql_extras, repo: "hexpm", optional: true]}, {:ecto_psql_extras, "~> 0.7", [hex: :ecto_psql_extras, repo: "hexpm", optional: true]}, {:ecto_sqlite3_extras, "~> 1.1.7 or ~> 1.2.0", [hex: :ecto_sqlite3_extras, repo: "hexpm", optional: true]}, {:mime, "~> 1.6 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:phoenix_live_view, "~> 0.19 or ~> 1.0", [hex: :phoenix_live_view, repo: "hexpm", optional: false]}, {:telemetry_metrics, "~> 0.6 or ~> 1.0", [hex: :telemetry_metrics, repo: "hexpm", optional: false]}], "hexpm", "2984aae96994fbc5c61795a73b8fb58153b41ff934019cfb522343d2d3817d59"},
@@ -145,6 +146,7 @@
"version_tasks": {:hex, :version_tasks, "0.12.0", "df384f454369f5f922a541cdc21da2db643c7424c03994986dab2b1702a5b724", [:mix], [{:git_cli, "~> 0.2", [hex: :git_cli, repo: "hexpm", optional: false]}], "hexpm", "c85e0ec9ad498795609ad849b6dbc668876cecb993fce1f4073016a5b87ee430"},
"websock": {:hex, :websock, "0.5.3", "2f69a6ebe810328555b6fe5c831a851f485e303a7c8ce6c5f675abeb20ebdadc", [:mix], [], "hexpm", "6105453d7fac22c712ad66fab1d45abdf049868f253cf719b625151460b8b453"},
"websock_adapter": {:hex, :websock_adapter, "0.5.8", "3b97dc94e407e2d1fc666b2fb9acf6be81a1798a2602294aac000260a7c4a47d", [:mix], [{:bandit, ">= 0.6.0", [hex: :bandit, repo: "hexpm", optional: true]}, {:plug, "~> 1.14", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.6", [hex: :plug_cowboy, repo: "hexpm", optional: true]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "315b9a1865552212b5f35140ad194e67ce31af45bcee443d4ecb96b5fd3f3782"},
"websocket_client": {:hex, :websocket_client, "1.5.0", "e825f23c51a867681a222148ed5200cc4a12e4fb5ff0b0b35963e916e2b5766b", [:rebar3], [], "hexpm", "2b9b201cc5c82b9d4e6966ad8e605832eab8f4ddb39f57ac62f34cb208b68de9"},
"x509": {:hex, :x509, "0.8.9", "03c47e507171507d3d3028d802f48dd575206af2ef00f764a900789dfbe17476", [:mix], [], "hexpm", "ea3fb16a870a199cb2c45908a2c3e89cc934f0434173dc0c828136f878f11661"},
"yamerl": {:hex, :yamerl, "0.10.0", "4ff81fee2f1f6a46f1700c0d880b24d193ddb74bd14ef42cb0bcf46e81ef2f8e", [:rebar3], [], "hexpm", "346adb2963f1051dc837a2364e4acf6eb7d80097c0f53cbdc3046ec8ec4b4e6e"},
"yaml_elixir": {:hex, :yaml_elixir, "2.9.0", "9a256da867b37b8d2c1ffd5d9de373a4fda77a32a45b452f1708508ba7bbcb53", [:mix], [{:yamerl, "~> 0.10", [hex: :yamerl, repo: "hexpm", optional: false]}], "hexpm", "0cb0e7d4c56f5e99a6253ed1a670ed0e39c13fc45a6da054033928607ac08dfc"},

View File

@@ -0,0 +1,117 @@
defmodule WandererApp.Kills.StorageTest do
use ExUnit.Case
alias WandererApp.Kills.{Storage, CacheKeys}
setup do
# Clear cache before each test
WandererApp.Cache.delete_all()
:ok
end
describe "kill count race condition handling" do
test "incremental updates are skipped when recent websocket update exists" do
system_id = 30000142
# Simulate websocket update
assert :ok = Storage.store_kill_count(system_id, 100)
# Immediately try incremental update (within 5 seconds)
assert :ok = Storage.update_kill_count(system_id, 5, :timer.minutes(5))
# Count should still be 100, not 105
assert {:ok, 100} = Storage.get_kill_count(system_id)
end
test "incremental updates work after websocket update timeout" do
system_id = 30000143
# Simulate websocket update
assert :ok = Storage.store_kill_count(system_id, 100)
# Manually update metadata to simulate old timestamp
metadata_key = CacheKeys.kill_count_metadata(system_id)
old_timestamp = System.system_time(:millisecond) - 10_000 # 10 seconds ago
WandererApp.Cache.insert(metadata_key, %{
"source" => "websocket",
"timestamp" => old_timestamp,
"absolute_count" => 100
}, ttl: :timer.minutes(5))
# Try incremental update (after timeout)
assert :ok = Storage.update_kill_count(system_id, 5, :timer.minutes(5))
# Count should now be 105
assert {:ok, 105} = Storage.get_kill_count(system_id)
end
test "incremental updates work when no metadata exists" do
system_id = 30000144
# Set initial count without metadata (simulating old data)
key = CacheKeys.system_kill_count(system_id)
WandererApp.Cache.insert(key, 50, ttl: :timer.minutes(5))
# Try incremental update
assert :ok = Storage.update_kill_count(system_id, 5, :timer.minutes(5))
# Count should be 55
assert {:ok, 55} = Storage.get_kill_count(system_id)
end
test "reconcile_kill_count fixes discrepancies" do
system_id = 30000145
# Set up mismatched count and list
count_key = CacheKeys.system_kill_count(system_id)
list_key = CacheKeys.system_kill_list(system_id)
# Count says 100, but list only has 50
WandererApp.Cache.insert(count_key, 100, ttl: :timer.minutes(5))
WandererApp.Cache.insert(list_key, Enum.to_list(1..50), ttl: :timer.minutes(5))
# Reconcile
assert :ok = Storage.reconcile_kill_count(system_id)
# Count should now match list length
assert {:ok, 50} = Storage.get_kill_count(system_id)
end
end
describe "store_killmails/3" do
test "stores individual killmails and updates system list" do
system_id = 30000146
killmails = [
%{"killmail_id" => 123, "kill_time" => "2024-01-01T12:00:00Z"},
%{"killmail_id" => 124, "kill_time" => "2024-01-01T12:01:00Z"}
]
assert :ok = Storage.store_killmails(system_id, killmails, :timer.minutes(5))
# Check individual killmails are stored
assert {:ok, %{"killmail_id" => 123}} = Storage.get_killmail(123)
assert {:ok, %{"killmail_id" => 124}} = Storage.get_killmail(124)
# Check system list is updated
list_key = CacheKeys.system_kill_list(system_id)
assert [124, 123] = WandererApp.Cache.get(list_key)
end
test "handles missing killmail_id gracefully" do
system_id = 30000147
killmails = [
%{"kill_time" => "2024-01-01T12:00:00Z"}, # Missing killmail_id
%{"killmail_id" => 125, "kill_time" => "2024-01-01T12:01:00Z"}
]
# Should still store the valid killmail
assert :ok = Storage.store_killmails(system_id, killmails, :timer.minutes(5))
# Only the valid killmail is stored
assert {:ok, %{"killmail_id" => 125}} = Storage.get_killmail(125)
# System list only contains valid ID
list_key = CacheKeys.system_kill_list(system_id)
assert [125] = WandererApp.Cache.get(list_key)
end
end
end