Files
wanderer/lib/wanderer_app/map/server/map_server_impl.ex

561 lines
17 KiB
Elixir

defmodule WandererApp.Map.Server.Impl do
@moduledoc """
Holds state for a map and exposes an interface to managing the map instance
"""
require Logger
alias WandererApp.Map.Server.{AclsImpl, CharactersImpl, ConnectionsImpl, SystemsImpl}
@enforce_keys [
:map_id
]
defstruct [
:map_id,
:rtree_name,
map: nil,
map_opts: []
]
@systems_cleanup_timeout :timer.minutes(30)
@characters_cleanup_timeout :timer.minutes(1)
@connections_cleanup_timeout :timer.minutes(2)
@pubsub_client Application.compile_env(:wanderer_app, :pubsub_client)
@backup_state_timeout :timer.minutes(1)
@update_presence_timeout :timer.seconds(1)
@update_characters_timeout :timer.seconds(1)
@update_tracked_characters_timeout :timer.seconds(1)
def new(), do: __struct__()
def new(args), do: __struct__(args)
def init(args) do
map_id = args[:map_id]
Logger.info("Starting map server for #{map_id}")
ErrorTracker.set_context(%{map_id: map_id})
WandererApp.Cache.insert("map_#{map_id}:started", false)
%{
map_id: map_id,
rtree_name: Module.concat([map_id, DDRT.DynamicRtree])
}
|> new()
end
def load_state(%__MODULE__{map_id: map_id} = state) do
with {:ok, map} <-
WandererApp.MapRepo.get(map_id, [
:owner,
:characters,
acls: [
:owner_id,
members: [:role, :eve_character_id, :eve_corporation_id, :eve_alliance_id]
]
]),
{:ok, systems} <- WandererApp.MapSystemRepo.get_visible_by_map(map_id),
{:ok, connections} <- WandererApp.MapConnectionRepo.get_by_map(map_id),
{:ok, subscription_settings} <-
WandererApp.Map.SubscriptionManager.get_active_map_subscription(map_id) do
state
|> init_map(
map,
subscription_settings,
systems,
connections
)
|> SystemsImpl.init_map_systems(systems)
|> init_map_cache()
else
error ->
Logger.error("Failed to load map state: #{inspect(error, pretty: true)}")
state
end
end
def start_map(%__MODULE__{map: map, map_id: map_id} = state) do
with :ok <- AclsImpl.track_acls(map.acls |> Enum.map(& &1.id)) do
@pubsub_client.subscribe(
WandererApp.PubSub,
"maps:#{map_id}"
)
Process.send_after(self(), :update_characters, @update_characters_timeout)
Process.send_after(self(), :update_tracked_characters, 100)
Process.send_after(self(), :update_presence, @update_presence_timeout)
Process.send_after(self(), :cleanup_connections, 5000)
Process.send_after(self(), :cleanup_systems, 10_000)
Process.send_after(self(), :cleanup_characters, :timer.minutes(5))
Process.send_after(self(), :backup_state, @backup_state_timeout)
WandererApp.Cache.insert("map_#{map_id}:started", true)
broadcast!(map_id, :map_server_started)
:telemetry.execute([:wanderer_app, :map, :started], %{count: 1})
state
else
error ->
Logger.error("Failed to start map: #{inspect(error, pretty: true)}")
state
end
end
def stop_map(%{map_id: map_id} = state) do
Logger.debug(fn -> "Stopping map server for #{map_id}" end)
WandererApp.Cache.delete("map_#{map_id}:started")
:telemetry.execute([:wanderer_app, :map, :stopped], %{count: 1})
state
|> maybe_stop_rtree()
end
def get_map(%{map: map} = _state), do: {:ok, map}
defdelegate get_characters(state), to: CharactersImpl
defdelegate add_character(state, character, track_character), to: CharactersImpl
def remove_character(%{map_id: map_id} = state, character_id) do
CharactersImpl.remove_character(map_id, character_id)
state
end
def untrack_characters(%{map_id: map_id} = state, characters_ids) do
CharactersImpl.untrack_characters(map_id, characters_ids)
state
end
defdelegate add_system(state, system_info, user_id, character_id), to: SystemsImpl
defdelegate delete_systems(
state,
removed_ids,
user_id,
character_id
),
to: SystemsImpl
defdelegate update_system_name(state, update), to: SystemsImpl
defdelegate update_system_description(state, update), to: SystemsImpl
defdelegate update_system_status(state, update), to: SystemsImpl
defdelegate update_system_tag(state, update), to: SystemsImpl
defdelegate update_system_temporary_name(state, update), to: SystemsImpl
defdelegate update_system_locked(state, update), to: SystemsImpl
defdelegate update_system_labels(state, update), to: SystemsImpl
defdelegate update_system_linked_sig_eve_id(state, update), to: SystemsImpl
defdelegate update_system_position(state, update), to: SystemsImpl
defdelegate add_hub(state, hub_info), to: SystemsImpl
defdelegate remove_hub(state, hub_info), to: SystemsImpl
defdelegate add_connection(state, connection_info), to: ConnectionsImpl
defdelegate delete_connection(state, connection_info), to: ConnectionsImpl
defdelegate get_connection_info(state, connection_info), to: ConnectionsImpl
defdelegate update_connection_time_status(state, connection_update), to: ConnectionsImpl
defdelegate update_connection_type(state, connection_update), to: ConnectionsImpl
defdelegate update_connection_mass_status(state, connection_update), to: ConnectionsImpl
defdelegate update_connection_ship_size_type(state, connection_update), to: ConnectionsImpl
defdelegate update_connection_locked(state, connection_update), to: ConnectionsImpl
defdelegate update_connection_custom_info(state, connection_update), to: ConnectionsImpl
def import_settings(%{map_id: map_id} = state, settings, user_id) do
WandererApp.Cache.put(
"map_#{map_id}:importing",
true
)
state =
state
|> maybe_import_systems(settings, user_id, nil)
|> maybe_import_connections(settings, user_id)
|> maybe_import_hubs(settings, user_id)
WandererApp.Cache.take("map_#{map_id}:importing")
state
end
def update_subscription_settings(%{map: map} = state, subscription_settings),
do: %{
state
| map: map |> WandererApp.Map.update_subscription_settings!(subscription_settings)
}
def handle_event(:update_characters, state) do
Process.send_after(self(), :update_characters, @update_characters_timeout)
CharactersImpl.update_characters(state)
state
end
def handle_event(:update_tracked_characters, %{map_id: map_id} = state) do
Process.send_after(self(), :update_tracked_characters, @update_tracked_characters_timeout)
CharactersImpl.update_tracked_characters(map_id)
state
end
def handle_event(:update_presence, %{map_id: map_id} = state) do
Process.send_after(self(), :update_presence, @update_presence_timeout)
update_presence(map_id)
state
end
def handle_event(:backup_state, state) do
Process.send_after(self(), :backup_state, @backup_state_timeout)
{:ok, _map_state} = state |> save_map_state()
state
end
def handle_event(
{:map_acl_updated, added_acls, removed_acls},
state
) do
state |> AclsImpl.handle_map_acl_updated(added_acls, removed_acls)
end
def handle_event({:acl_updated, %{acl_id: acl_id}}, %{map_id: map_id} = state) do
AclsImpl.handle_acl_updated(map_id, acl_id)
state
end
def handle_event(:cleanup_connections, state) do
Process.send_after(self(), :cleanup_connections, @connections_cleanup_timeout)
state |> ConnectionsImpl.cleanup_connections()
end
def handle_event(:cleanup_characters, %{map_id: map_id, map: %{owner_id: owner_id}} = state) do
Process.send_after(self(), :cleanup_characters, @characters_cleanup_timeout)
CharactersImpl.cleanup_characters(map_id, owner_id)
state
end
def handle_event(:cleanup_systems, state) do
Process.send_after(self(), :cleanup_systems, @systems_cleanup_timeout)
state |> SystemsImpl.cleanup_systems()
end
def handle_event(:subscription_settings_updated, %{map: map, map_id: map_id} = state) do
{:ok, subscription_settings} =
WandererApp.Map.SubscriptionManager.get_active_map_subscription(map_id)
%{
state
| map:
map
|> WandererApp.Map.update_subscription_settings!(subscription_settings)
}
end
def handle_event({:options_updated, options}, %{map: map} = state) do
map |> WandererApp.Map.update_options!(options)
%{state | map_opts: map_options(options)}
end
def handle_event({ref, _result}, %{map_id: _map_id} = state) do
Process.demonitor(ref, [:flush])
state
end
def handle_event(msg, state) do
Logger.warning("Unhandled event: #{inspect(msg)}")
state
end
def broadcast!(map_id, event, payload \\ nil) do
if can_broadcast?(map_id) do
@pubsub_client.broadcast!(WandererApp.PubSub, map_id, %{event: event, payload: payload})
end
:ok
end
defp can_broadcast?(map_id),
do:
not WandererApp.Cache.lookup!("map_#{map_id}:importing", false) and
WandererApp.Cache.lookup!("map_#{map_id}:started", false)
def get_update_map(update, attributes),
do:
{:ok,
Enum.reduce(attributes, Map.new(), fn attribute, map ->
map |> Map.put_new(attribute, get_in(update, [Access.key(attribute)]))
end)}
defp map_options(options) do
[
layout: options |> Map.get("layout", "left_to_right"),
store_custom_labels:
options |> Map.get("store_custom_labels", "false") |> String.to_existing_atom(),
show_linked_signature_id:
options |> Map.get("show_linked_signature_id", "false") |> String.to_existing_atom(),
show_linked_signature_id_temp_name:
options
|> Map.get("show_linked_signature_id_temp_name", "false")
|> String.to_existing_atom(),
show_temp_system_name:
options |> Map.get("show_temp_system_name", "false") |> String.to_existing_atom(),
restrict_offline_showing:
options |> Map.get("restrict_offline_showing", "false") |> String.to_existing_atom()
]
end
defp save_map_state(%{map_id: map_id} = _state) do
systems_last_activity =
map_id
|> WandererApp.Map.list_systems!()
|> Enum.reduce(%{}, fn %{id: system_id} = _system, acc ->
case WandererApp.Cache.get("map_#{map_id}:system_#{system_id}:last_activity") do
nil ->
acc
value ->
acc |> Map.put_new(system_id, value)
end
end)
connections_eol_time =
map_id
|> WandererApp.Map.list_connections!()
|> Enum.reduce(%{}, fn %{id: connection_id} = _connection, acc ->
case WandererApp.Cache.get("map_#{map_id}:conn_#{connection_id}:mark_eol_time") do
nil ->
acc
value ->
acc |> Map.put_new(connection_id, value)
end
end)
WandererApp.Api.MapState.create(%{
map_id: map_id,
systems_last_activity: systems_last_activity,
connections_eol_time: connections_eol_time
})
end
defp maybe_stop_rtree(%{rtree_name: rtree_name} = state) do
case Process.whereis(rtree_name) do
nil ->
:ok
pid when is_pid(pid) ->
GenServer.stop(pid, :normal)
end
state
end
defp init_map_cache(%__MODULE__{map_id: map_id} = state) do
case WandererApp.Api.MapState.by_map_id(map_id) do
{:ok,
%{
systems_last_activity: systems_last_activity,
connections_eol_time: connections_eol_time
}} ->
SystemsImpl.init_last_activity_cache(map_id, systems_last_activity)
ConnectionsImpl.init_eol_cache(map_id, connections_eol_time)
state
_ ->
state
end
end
defp init_map(
state,
%{id: map_id, characters: characters} = initial_map,
subscription_settings,
systems,
connections
) do
{:ok, options} = WandererApp.MapRepo.options_to_form_data(initial_map)
map =
initial_map
|> WandererApp.Map.new()
|> WandererApp.Map.update_options!(options)
|> WandererApp.Map.update_subscription_settings!(subscription_settings)
|> WandererApp.Map.add_systems!(systems)
|> WandererApp.Map.add_connections!(connections)
|> WandererApp.Map.add_characters!(characters)
character_ids =
map_id
|> WandererApp.Map.get_map!()
|> Map.get(:characters, [])
WandererApp.Cache.insert("map_#{map_id}:invalidate_character_ids", character_ids)
%{state | map: map, map_opts: map_options(options)}
end
def maybe_import_systems(state, %{"systems" => systems} = _settings, user_id, character_id) do
state =
systems
|> Enum.reduce(state, fn %{
"description" => description,
"id" => id,
"labels" => labels,
"locked" => locked,
"name" => name,
"position" => %{"x" => x, "y" => y},
"status" => status,
"tag" => tag,
"temporary_name" => temporary_name
} = _system,
acc ->
acc
|> add_system(
%{
solar_system_id: id |> String.to_integer(),
coordinates: %{"x" => round(x), "y" => round(y)}
},
user_id,
character_id
)
|> update_system_name(%{solar_system_id: id |> String.to_integer(), name: name})
|> update_system_description(%{
solar_system_id: id |> String.to_integer(),
description: description
})
|> update_system_status(%{solar_system_id: id |> String.to_integer(), status: status})
|> update_system_tag(%{solar_system_id: id |> String.to_integer(), tag: tag})
|> update_system_temporary_name(%{
solar_system_id: id |> String.to_integer(),
temporary_name: temporary_name
})
|> update_system_locked(%{solar_system_id: id |> String.to_integer(), locked: locked})
|> update_system_labels(%{solar_system_id: id |> String.to_integer(), labels: labels})
end)
removed_system_ids =
systems
|> Enum.filter(fn system -> not system["visible"] end)
|> Enum.map(fn system -> system["id"] end)
|> Enum.map(&String.to_integer/1)
state
|> delete_systems(removed_system_ids, user_id, character_id)
end
def maybe_import_connections(state, %{"connections" => connections} = _settings, _user_id) do
connections
|> Enum.reduce(state, fn %{
"source" => source,
"target" => target,
"mass_status" => mass_status,
"time_status" => time_status,
"ship_size_type" => ship_size_type
} = _system,
acc ->
source_id = source |> String.to_integer()
target_id = target |> String.to_integer()
acc
|> add_connection(%{
solar_system_source_id: source_id,
solar_system_target_id: target_id
})
|> update_connection_time_status(%{
solar_system_source_id: source_id,
solar_system_target_id: target_id,
time_status: time_status
})
|> update_connection_mass_status(%{
solar_system_source_id: source_id,
solar_system_target_id: target_id,
mass_status: mass_status
})
|> update_connection_ship_size_type(%{
solar_system_source_id: source_id,
solar_system_target_id: target_id,
ship_size_type: ship_size_type
})
end)
end
def maybe_import_hubs(state, %{"hubs" => hubs} = _settings, _user_id) do
hubs
|> Enum.reduce(state, fn hub, acc ->
solar_system_id = hub |> String.to_integer()
acc
|> add_hub(%{solar_system_id: solar_system_id})
end)
end
defp update_presence(map_id) do
case WandererApp.Cache.lookup!("map_#{map_id}:started", false) and
WandererApp.Cache.get_and_remove!("map_#{map_id}:presence_updated", false) do
true ->
{:ok, presence_character_ids} =
WandererApp.Cache.lookup("map_#{map_id}:presence_character_ids", [])
characters_ids =
map_id
|> WandererApp.Map.get_map!()
|> Map.get(:characters, [])
not_present_character_ids =
characters_ids
|> Enum.filter(fn character_id ->
not Enum.member?(presence_character_ids, character_id)
end)
CharactersImpl.track_characters(map_id, presence_character_ids)
CharactersImpl.untrack_characters(map_id, not_present_character_ids)
broadcast!(
map_id,
:present_characters_updated,
presence_character_ids
|> WandererApp.Character.get_character_eve_ids!()
)
:ok
_ ->
:ok
end
end
end