mirror of
https://github.com/wanderer-industries/wanderer
synced 2025-11-13 12:46:14 +00:00
Compare commits
4 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
63ca473113 | ||
|
|
0332d36a8e | ||
|
|
7df8284124 | ||
|
|
21ca630abd |
11
CHANGELOG.md
11
CHANGELOG.md
@@ -2,17 +2,6 @@
|
|||||||
|
|
||||||
<!-- changelog -->
|
<!-- changelog -->
|
||||||
|
|
||||||
## [v1.84.2](https://github.com/wanderer-industries/wanderer/compare/v1.84.1...v1.84.2) (2025-11-10)
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
### Bug Fixes:
|
|
||||||
|
|
||||||
* api: fixed api for get/update map systems
|
|
||||||
|
|
||||||
* add index for map/systems api
|
|
||||||
|
|
||||||
## [v1.84.1](https://github.com/wanderer-industries/wanderer/compare/v1.84.0...v1.84.1) (2025-11-01)
|
## [v1.84.1](https://github.com/wanderer-industries/wanderer/compare/v1.84.0...v1.84.1) (2025-11-01)
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -373,36 +373,36 @@ defmodule WandererApp.Map.Server.ConnectionsImpl do
|
|||||||
solar_system_target: solar_system_target
|
solar_system_target: solar_system_target
|
||||||
} = updated_connection
|
} = updated_connection
|
||||||
) do
|
) do
|
||||||
source_system =
|
with source_system when not is_nil(source_system) <-
|
||||||
WandererApp.Map.find_system_by_location(
|
WandererApp.Map.find_system_by_location(
|
||||||
|
map_id,
|
||||||
|
%{solar_system_id: solar_system_source}
|
||||||
|
),
|
||||||
|
target_system when not is_nil(source_system) <-
|
||||||
|
WandererApp.Map.find_system_by_location(
|
||||||
|
map_id,
|
||||||
|
%{solar_system_id: solar_system_target}
|
||||||
|
),
|
||||||
|
source_linked_signatures <-
|
||||||
|
find_linked_signatures(source_system, target_system),
|
||||||
|
target_linked_signatures <- find_linked_signatures(target_system, source_system) do
|
||||||
|
update_signatures_time_status(
|
||||||
map_id,
|
map_id,
|
||||||
%{solar_system_id: solar_system_source}
|
source_system.solar_system_id,
|
||||||
|
source_linked_signatures,
|
||||||
|
time_status
|
||||||
)
|
)
|
||||||
|
|
||||||
target_system =
|
update_signatures_time_status(
|
||||||
WandererApp.Map.find_system_by_location(
|
|
||||||
map_id,
|
map_id,
|
||||||
%{solar_system_id: solar_system_target}
|
target_system.solar_system_id,
|
||||||
|
target_linked_signatures,
|
||||||
|
time_status
|
||||||
)
|
)
|
||||||
|
else
|
||||||
source_linked_signatures =
|
error ->
|
||||||
find_linked_signatures(source_system, target_system)
|
Logger.error("Failed to update_linked_signature_time_status: #{inspect(error)}")
|
||||||
|
end
|
||||||
target_linked_signatures = find_linked_signatures(target_system, source_system)
|
|
||||||
|
|
||||||
update_signatures_time_status(
|
|
||||||
map_id,
|
|
||||||
source_system.solar_system_id,
|
|
||||||
source_linked_signatures,
|
|
||||||
time_status
|
|
||||||
)
|
|
||||||
|
|
||||||
update_signatures_time_status(
|
|
||||||
map_id,
|
|
||||||
target_system.solar_system_id,
|
|
||||||
target_linked_signatures,
|
|
||||||
time_status
|
|
||||||
)
|
|
||||||
end
|
end
|
||||||
|
|
||||||
defp find_linked_signatures(
|
defp find_linked_signatures(
|
||||||
|
|||||||
@@ -501,13 +501,16 @@ defmodule WandererApp.SecurityAudit do
|
|||||||
# Ensure event_type is properly formatted
|
# Ensure event_type is properly formatted
|
||||||
event_type = normalize_event_type(audit_entry.event_type)
|
event_type = normalize_event_type(audit_entry.event_type)
|
||||||
|
|
||||||
|
# Generate unique entity_id to avoid constraint violations
|
||||||
|
entity_id = generate_entity_id(audit_entry.session_id)
|
||||||
|
|
||||||
attrs = %{
|
attrs = %{
|
||||||
user_id: audit_entry.user_id,
|
entity_id: entity_id,
|
||||||
character_id: nil,
|
|
||||||
entity_id: hash_identifier(audit_entry.session_id),
|
|
||||||
entity_type: :security_event,
|
entity_type: :security_event,
|
||||||
event_type: event_type,
|
event_type: event_type,
|
||||||
event_data: encode_event_data(audit_entry)
|
event_data: encode_event_data(audit_entry),
|
||||||
|
user_id: audit_entry.user_id,
|
||||||
|
character_id: nil
|
||||||
}
|
}
|
||||||
|
|
||||||
case UserActivity.new(attrs) do
|
case UserActivity.new(attrs) do
|
||||||
@@ -619,8 +622,13 @@ defmodule WandererApp.SecurityAudit do
|
|||||||
defp convert_datetime(%NaiveDateTime{} = dt), do: NaiveDateTime.to_iso8601(dt)
|
defp convert_datetime(%NaiveDateTime{} = dt), do: NaiveDateTime.to_iso8601(dt)
|
||||||
defp convert_datetime(value), do: value
|
defp convert_datetime(value), do: value
|
||||||
|
|
||||||
defp generate_entity_id do
|
defp generate_entity_id(session_id \\ nil) do
|
||||||
"audit_#{DateTime.utc_now() |> DateTime.to_unix(:microsecond)}_#{System.unique_integer([:positive])}"
|
if session_id do
|
||||||
|
# Include high-resolution timestamp and unique component for guaranteed uniqueness
|
||||||
|
"#{hash_identifier(session_id)}_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
|
||||||
|
else
|
||||||
|
"audit_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
|
||||||
|
end
|
||||||
end
|
end
|
||||||
|
|
||||||
defp async_enabled? do
|
defp async_enabled? do
|
||||||
|
|||||||
@@ -88,20 +88,21 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
|||||||
def handle_cast({:log_event, audit_entry}, state) do
|
def handle_cast({:log_event, audit_entry}, state) do
|
||||||
# Add to buffer
|
# Add to buffer
|
||||||
buffer = [audit_entry | state.buffer]
|
buffer = [audit_entry | state.buffer]
|
||||||
|
buf_len = length(buffer)
|
||||||
|
|
||||||
# Update stats
|
# Update stats
|
||||||
stats = Map.update!(state.stats, :events_processed, &(&1 + 1))
|
stats = Map.update!(state.stats, :events_processed, &(&1 + 1))
|
||||||
|
|
||||||
# Check if we need to flush
|
# Check if we need to flush
|
||||||
cond do
|
cond do
|
||||||
length(buffer) >= state.batch_size ->
|
buf_len >= state.batch_size ->
|
||||||
# Flush immediately if batch size reached
|
# Flush immediately if batch size reached
|
||||||
{:noreply, do_flush(%{state | buffer: buffer, stats: stats})}
|
{:noreply, do_flush(%{state | buffer: buffer, stats: stats})}
|
||||||
|
|
||||||
length(buffer) >= @max_buffer_size ->
|
buf_len >= @max_buffer_size ->
|
||||||
# Force flush if max buffer size reached
|
# Force flush if max buffer size reached
|
||||||
Logger.warning("Security audit buffer overflow, forcing flush",
|
Logger.warning("Security audit buffer overflow, forcing flush",
|
||||||
buffer_size: length(buffer),
|
buffer_size: buf_len,
|
||||||
max_size: @max_buffer_size
|
max_size: @max_buffer_size
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -186,23 +187,66 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
|||||||
# Clear buffer
|
# Clear buffer
|
||||||
%{state | buffer: [], stats: stats}
|
%{state | buffer: [], stats: stats}
|
||||||
|
|
||||||
{:error, reason} ->
|
{:partial, success_count, failed_events} ->
|
||||||
Logger.error("Failed to flush security audit events",
|
failed_count = length(failed_events)
|
||||||
reason: inspect(reason),
|
|
||||||
event_count: length(events)
|
Logger.warning(
|
||||||
|
"Partial flush: stored #{success_count}, failed #{failed_count} audit events",
|
||||||
|
success_count: success_count,
|
||||||
|
failed_count: failed_count,
|
||||||
|
buffer_size: length(state.buffer)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Emit telemetry for monitoring
|
||||||
|
:telemetry.execute(
|
||||||
|
[:wanderer_app, :security_audit, :async_flush_partial],
|
||||||
|
%{success_count: success_count, failed_count: failed_count},
|
||||||
|
%{}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Update stats - count partial flush as both success and error
|
||||||
|
stats =
|
||||||
|
state.stats
|
||||||
|
|> Map.update!(:batches_flushed, &(&1 + 1))
|
||||||
|
|> Map.update!(:errors, &(&1 + 1))
|
||||||
|
|> Map.put(:last_flush, DateTime.utc_now())
|
||||||
|
|
||||||
|
# Extract just the events from failed_events tuples
|
||||||
|
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
|
||||||
|
|
||||||
|
remaining_buffer = Enum.reject(state.buffer, fn ev -> ev in failed_only end)
|
||||||
|
|
||||||
|
# Re-buffer failed events at the front, preserving newest-first ordering
|
||||||
|
# Reverse failed_only since flush reversed the buffer to oldest-first
|
||||||
|
new_buffer = Enum.reverse(failed_only) ++ remaining_buffer
|
||||||
|
buffer = handle_buffer_overflow(new_buffer, @max_buffer_size)
|
||||||
|
|
||||||
|
%{state | buffer: buffer, stats: stats}
|
||||||
|
|
||||||
|
{:error, failed_events} ->
|
||||||
|
failed_count = length(failed_events)
|
||||||
|
|
||||||
|
Logger.error("Failed to flush all #{failed_count} security audit events",
|
||||||
|
failed_count: failed_count,
|
||||||
|
buffer_size: length(state.buffer)
|
||||||
|
)
|
||||||
|
|
||||||
|
# Emit telemetry for monitoring
|
||||||
|
:telemetry.execute(
|
||||||
|
[:wanderer_app, :security_audit, :async_flush_failure],
|
||||||
|
%{count: 1, event_count: failed_count},
|
||||||
|
%{}
|
||||||
)
|
)
|
||||||
|
|
||||||
# Update error stats
|
# Update error stats
|
||||||
stats = Map.update!(state.stats, :errors, &(&1 + 1))
|
stats = Map.update!(state.stats, :errors, &(&1 + 1))
|
||||||
|
|
||||||
# Implement backoff - keep events in buffer but don't grow indefinitely
|
# Extract just the events from failed_events tuples
|
||||||
buffer =
|
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
|
||||||
if length(state.buffer) > @max_buffer_size do
|
|
||||||
Logger.warning("Dropping oldest audit events due to repeated flush failures")
|
# Since ALL events failed, the new buffer should only contain the failed events
|
||||||
Enum.take(state.buffer, @max_buffer_size)
|
# Reverse to maintain newest-first ordering (flush reversed to oldest-first)
|
||||||
else
|
buffer = handle_buffer_overflow(Enum.reverse(failed_only), @max_buffer_size)
|
||||||
state.buffer
|
|
||||||
end
|
|
||||||
|
|
||||||
%{state | buffer: buffer, stats: stats}
|
%{state | buffer: buffer, stats: stats}
|
||||||
end
|
end
|
||||||
@@ -213,34 +257,100 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
|
|||||||
events
|
events
|
||||||
# Ash bulk operations work better with smaller chunks
|
# Ash bulk operations work better with smaller chunks
|
||||||
|> Enum.chunk_every(50)
|
|> Enum.chunk_every(50)
|
||||||
|> Enum.reduce_while({:ok, 0}, fn chunk, {:ok, count} ->
|
|> Enum.reduce({0, []}, fn chunk, {total_success, all_failed} ->
|
||||||
case store_event_chunk(chunk) do
|
case store_event_chunk(chunk) do
|
||||||
{:ok, chunk_count} ->
|
{:ok, chunk_count} ->
|
||||||
{:cont, {:ok, count + chunk_count}}
|
{total_success + chunk_count, all_failed}
|
||||||
|
|
||||||
{:error, _} = error ->
|
{:partial, chunk_count, failed_events} ->
|
||||||
{:halt, error}
|
{total_success + chunk_count, all_failed ++ failed_events}
|
||||||
|
|
||||||
|
{:error, failed_events} ->
|
||||||
|
{total_success, all_failed ++ failed_events}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|> then(fn {success_count, failed_events_list} ->
|
||||||
|
# Derive the final return shape based on results
|
||||||
|
cond do
|
||||||
|
failed_events_list == [] ->
|
||||||
|
{:ok, success_count}
|
||||||
|
|
||||||
|
success_count == 0 ->
|
||||||
|
{:error, failed_events_list}
|
||||||
|
|
||||||
|
true ->
|
||||||
|
{:partial, success_count, failed_events_list}
|
||||||
end
|
end
|
||||||
end)
|
end)
|
||||||
end
|
end
|
||||||
|
|
||||||
|
defp handle_buffer_overflow(buffer, max_size) when length(buffer) > max_size do
|
||||||
|
dropped = length(buffer) - max_size
|
||||||
|
|
||||||
|
Logger.warning(
|
||||||
|
"Dropping #{dropped} oldest audit events due to buffer overflow",
|
||||||
|
buffer_size: length(buffer),
|
||||||
|
max_size: max_size
|
||||||
|
)
|
||||||
|
|
||||||
|
# Emit telemetry for dropped events
|
||||||
|
:telemetry.execute(
|
||||||
|
[:wanderer_app, :security_audit, :events_dropped],
|
||||||
|
%{count: dropped},
|
||||||
|
%{}
|
||||||
|
)
|
||||||
|
|
||||||
|
# Keep the newest events (take from the front since buffer is newest-first)
|
||||||
|
Enum.take(buffer, max_size)
|
||||||
|
end
|
||||||
|
|
||||||
|
defp handle_buffer_overflow(buffer, _max_size), do: buffer
|
||||||
|
|
||||||
defp store_event_chunk(events) do
|
defp store_event_chunk(events) do
|
||||||
# Transform events to Ash attributes
|
# Process each event and partition results
|
||||||
records =
|
{successes, failures} =
|
||||||
Enum.map(events, fn event ->
|
events
|
||||||
SecurityAudit.do_store_audit_entry(event)
|
|> Enum.map(fn event ->
|
||||||
|
case SecurityAudit.do_store_audit_entry(event) do
|
||||||
|
:ok ->
|
||||||
|
{:ok, event}
|
||||||
|
|
||||||
|
{:error, reason} ->
|
||||||
|
Logger.error("Failed to store individual audit event",
|
||||||
|
error: inspect(reason),
|
||||||
|
event_type: Map.get(event, :event_type),
|
||||||
|
user_id: Map.get(event, :user_id)
|
||||||
|
)
|
||||||
|
|
||||||
|
{:error, {event, reason}}
|
||||||
|
end
|
||||||
|
end)
|
||||||
|
|> Enum.split_with(fn
|
||||||
|
{:ok, _} -> true
|
||||||
|
{:error, _} -> false
|
||||||
end)
|
end)
|
||||||
|
|
||||||
# Count successful stores
|
successful_count = length(successes)
|
||||||
successful =
|
failed_count = length(failures)
|
||||||
Enum.count(records, fn
|
|
||||||
:ok -> true
|
|
||||||
_ -> false
|
|
||||||
end)
|
|
||||||
|
|
||||||
{:ok, successful}
|
# Extract failed events with reasons
|
||||||
rescue
|
failed_events = Enum.map(failures, fn {:error, event_reason} -> event_reason end)
|
||||||
error ->
|
|
||||||
{:error, error}
|
# Log if some events failed (telemetry will be emitted at flush level)
|
||||||
|
if failed_count > 0 do
|
||||||
|
Logger.debug("Chunk processing: #{failed_count} of #{length(events)} events failed")
|
||||||
|
end
|
||||||
|
|
||||||
|
# Return richer result shape
|
||||||
|
cond do
|
||||||
|
successful_count == 0 ->
|
||||||
|
{:error, failed_events}
|
||||||
|
|
||||||
|
failed_count > 0 ->
|
||||||
|
{:partial, successful_count, failed_events}
|
||||||
|
|
||||||
|
true ->
|
||||||
|
{:ok, successful_count}
|
||||||
|
end
|
||||||
end
|
end
|
||||||
end
|
end
|
||||||
|
|||||||
Reference in New Issue
Block a user