Compare commits

..

4 Commits

Author SHA1 Message Date
Dmitry Popov
63ca473113 Merge pull request #502 from guarzo/guarzo/asyncfix
Some checks are pending
Build Test / 🚀 Deploy to test env (fly.io) (push) Waiting to run
Build Test / 🛠 Build (1.17, 18.x, 27) (push) Waiting to run
Build / 🛠 Build (1.17, 18.x, 27) (push) Waiting to run
Build / 🛠 Build Docker Images (linux/amd64) (push) Blocked by required conditions
Build / 🛠 Build Docker Images (linux/arm64) (push) Blocked by required conditions
Build / merge (push) Blocked by required conditions
Build / 🏷 Create Release (push) Blocked by required conditions
🧪 Test Suite / Test Suite (push) Waiting to run
fix: resolve issue with async event processing
2025-11-12 15:10:08 +04:00
Dmitry Popov
0332d36a8e fix(core): fixed linked signature time status update
Some checks failed
Build Test / 🚀 Deploy to test env (fly.io) (push) Has been cancelled
Build Test / 🛠 Build (1.17, 18.x, 27) (push) Has been cancelled
Build / 🛠 Build (1.17, 18.x, 27) (push) Has been cancelled
Build / 🛠 Build Docker Images (linux/amd64) (push) Has been cancelled
Build / 🛠 Build Docker Images (linux/arm64) (push) Has been cancelled
Build / merge (push) Has been cancelled
Build / 🏷 Create Release (push) Has been cancelled
🧪 Test Suite / Test Suite (push) Has been cancelled
2025-11-11 10:51:43 +01:00
guarzo
7df8284124 fix: clean up id generation 2025-08-30 02:05:28 +00:00
guarzo
21ca630abd fix: resolve issue with async event processing 2025-08-30 02:05:28 +00:00
5 changed files with 183 additions and 76 deletions

View File

@@ -2,17 +2,6 @@
<!-- changelog -->
## [v1.84.2](https://github.com/wanderer-industries/wanderer/compare/v1.84.1...v1.84.2) (2025-11-10)
### Bug Fixes:
* api: fixed api for get/update map systems
* add index for map/systems api
## [v1.84.1](https://github.com/wanderer-industries/wanderer/compare/v1.84.0...v1.84.1) (2025-11-01)

View File

@@ -373,36 +373,36 @@ defmodule WandererApp.Map.Server.ConnectionsImpl do
solar_system_target: solar_system_target
} = updated_connection
) do
source_system =
WandererApp.Map.find_system_by_location(
with source_system when not is_nil(source_system) <-
WandererApp.Map.find_system_by_location(
map_id,
%{solar_system_id: solar_system_source}
),
target_system when not is_nil(source_system) <-
WandererApp.Map.find_system_by_location(
map_id,
%{solar_system_id: solar_system_target}
),
source_linked_signatures <-
find_linked_signatures(source_system, target_system),
target_linked_signatures <- find_linked_signatures(target_system, source_system) do
update_signatures_time_status(
map_id,
%{solar_system_id: solar_system_source}
source_system.solar_system_id,
source_linked_signatures,
time_status
)
target_system =
WandererApp.Map.find_system_by_location(
update_signatures_time_status(
map_id,
%{solar_system_id: solar_system_target}
target_system.solar_system_id,
target_linked_signatures,
time_status
)
source_linked_signatures =
find_linked_signatures(source_system, target_system)
target_linked_signatures = find_linked_signatures(target_system, source_system)
update_signatures_time_status(
map_id,
source_system.solar_system_id,
source_linked_signatures,
time_status
)
update_signatures_time_status(
map_id,
target_system.solar_system_id,
target_linked_signatures,
time_status
)
else
error ->
Logger.error("Failed to update_linked_signature_time_status: #{inspect(error)}")
end
end
defp find_linked_signatures(

View File

@@ -501,13 +501,16 @@ defmodule WandererApp.SecurityAudit do
# Ensure event_type is properly formatted
event_type = normalize_event_type(audit_entry.event_type)
# Generate unique entity_id to avoid constraint violations
entity_id = generate_entity_id(audit_entry.session_id)
attrs = %{
user_id: audit_entry.user_id,
character_id: nil,
entity_id: hash_identifier(audit_entry.session_id),
entity_id: entity_id,
entity_type: :security_event,
event_type: event_type,
event_data: encode_event_data(audit_entry)
event_data: encode_event_data(audit_entry),
user_id: audit_entry.user_id,
character_id: nil
}
case UserActivity.new(attrs) do
@@ -619,8 +622,13 @@ defmodule WandererApp.SecurityAudit do
defp convert_datetime(%NaiveDateTime{} = dt), do: NaiveDateTime.to_iso8601(dt)
defp convert_datetime(value), do: value
defp generate_entity_id do
"audit_#{DateTime.utc_now() |> DateTime.to_unix(:microsecond)}_#{System.unique_integer([:positive])}"
defp generate_entity_id(session_id \\ nil) do
if session_id do
# Include high-resolution timestamp and unique component for guaranteed uniqueness
"#{hash_identifier(session_id)}_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
else
"audit_#{:os.system_time(:microsecond)}_#{System.unique_integer([:positive])}"
end
end
defp async_enabled? do

View File

@@ -88,20 +88,21 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
def handle_cast({:log_event, audit_entry}, state) do
# Add to buffer
buffer = [audit_entry | state.buffer]
buf_len = length(buffer)
# Update stats
stats = Map.update!(state.stats, :events_processed, &(&1 + 1))
# Check if we need to flush
cond do
length(buffer) >= state.batch_size ->
buf_len >= state.batch_size ->
# Flush immediately if batch size reached
{:noreply, do_flush(%{state | buffer: buffer, stats: stats})}
length(buffer) >= @max_buffer_size ->
buf_len >= @max_buffer_size ->
# Force flush if max buffer size reached
Logger.warning("Security audit buffer overflow, forcing flush",
buffer_size: length(buffer),
buffer_size: buf_len,
max_size: @max_buffer_size
)
@@ -186,23 +187,66 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
# Clear buffer
%{state | buffer: [], stats: stats}
{:error, reason} ->
Logger.error("Failed to flush security audit events",
reason: inspect(reason),
event_count: length(events)
{:partial, success_count, failed_events} ->
failed_count = length(failed_events)
Logger.warning(
"Partial flush: stored #{success_count}, failed #{failed_count} audit events",
success_count: success_count,
failed_count: failed_count,
buffer_size: length(state.buffer)
)
# Emit telemetry for monitoring
:telemetry.execute(
[:wanderer_app, :security_audit, :async_flush_partial],
%{success_count: success_count, failed_count: failed_count},
%{}
)
# Update stats - count partial flush as both success and error
stats =
state.stats
|> Map.update!(:batches_flushed, &(&1 + 1))
|> Map.update!(:errors, &(&1 + 1))
|> Map.put(:last_flush, DateTime.utc_now())
# Extract just the events from failed_events tuples
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
remaining_buffer = Enum.reject(state.buffer, fn ev -> ev in failed_only end)
# Re-buffer failed events at the front, preserving newest-first ordering
# Reverse failed_only since flush reversed the buffer to oldest-first
new_buffer = Enum.reverse(failed_only) ++ remaining_buffer
buffer = handle_buffer_overflow(new_buffer, @max_buffer_size)
%{state | buffer: buffer, stats: stats}
{:error, failed_events} ->
failed_count = length(failed_events)
Logger.error("Failed to flush all #{failed_count} security audit events",
failed_count: failed_count,
buffer_size: length(state.buffer)
)
# Emit telemetry for monitoring
:telemetry.execute(
[:wanderer_app, :security_audit, :async_flush_failure],
%{count: 1, event_count: failed_count},
%{}
)
# Update error stats
stats = Map.update!(state.stats, :errors, &(&1 + 1))
# Implement backoff - keep events in buffer but don't grow indefinitely
buffer =
if length(state.buffer) > @max_buffer_size do
Logger.warning("Dropping oldest audit events due to repeated flush failures")
Enum.take(state.buffer, @max_buffer_size)
else
state.buffer
end
# Extract just the events from failed_events tuples
failed_only = Enum.map(failed_events, fn {event, _reason} -> event end)
# Since ALL events failed, the new buffer should only contain the failed events
# Reverse to maintain newest-first ordering (flush reversed to oldest-first)
buffer = handle_buffer_overflow(Enum.reverse(failed_only), @max_buffer_size)
%{state | buffer: buffer, stats: stats}
end
@@ -213,34 +257,100 @@ defmodule WandererApp.SecurityAudit.AsyncProcessor do
events
# Ash bulk operations work better with smaller chunks
|> Enum.chunk_every(50)
|> Enum.reduce_while({:ok, 0}, fn chunk, {:ok, count} ->
|> Enum.reduce({0, []}, fn chunk, {total_success, all_failed} ->
case store_event_chunk(chunk) do
{:ok, chunk_count} ->
{:cont, {:ok, count + chunk_count}}
{total_success + chunk_count, all_failed}
{:error, _} = error ->
{:halt, error}
{:partial, chunk_count, failed_events} ->
{total_success + chunk_count, all_failed ++ failed_events}
{:error, failed_events} ->
{total_success, all_failed ++ failed_events}
end
end)
|> then(fn {success_count, failed_events_list} ->
# Derive the final return shape based on results
cond do
failed_events_list == [] ->
{:ok, success_count}
success_count == 0 ->
{:error, failed_events_list}
true ->
{:partial, success_count, failed_events_list}
end
end)
end
defp handle_buffer_overflow(buffer, max_size) when length(buffer) > max_size do
dropped = length(buffer) - max_size
Logger.warning(
"Dropping #{dropped} oldest audit events due to buffer overflow",
buffer_size: length(buffer),
max_size: max_size
)
# Emit telemetry for dropped events
:telemetry.execute(
[:wanderer_app, :security_audit, :events_dropped],
%{count: dropped},
%{}
)
# Keep the newest events (take from the front since buffer is newest-first)
Enum.take(buffer, max_size)
end
defp handle_buffer_overflow(buffer, _max_size), do: buffer
defp store_event_chunk(events) do
# Transform events to Ash attributes
records =
Enum.map(events, fn event ->
SecurityAudit.do_store_audit_entry(event)
# Process each event and partition results
{successes, failures} =
events
|> Enum.map(fn event ->
case SecurityAudit.do_store_audit_entry(event) do
:ok ->
{:ok, event}
{:error, reason} ->
Logger.error("Failed to store individual audit event",
error: inspect(reason),
event_type: Map.get(event, :event_type),
user_id: Map.get(event, :user_id)
)
{:error, {event, reason}}
end
end)
|> Enum.split_with(fn
{:ok, _} -> true
{:error, _} -> false
end)
# Count successful stores
successful =
Enum.count(records, fn
:ok -> true
_ -> false
end)
successful_count = length(successes)
failed_count = length(failures)
{:ok, successful}
rescue
error ->
{:error, error}
# Extract failed events with reasons
failed_events = Enum.map(failures, fn {:error, event_reason} -> event_reason end)
# Log if some events failed (telemetry will be emitted at flush level)
if failed_count > 0 do
Logger.debug("Chunk processing: #{failed_count} of #{length(events)} events failed")
end
# Return richer result shape
cond do
successful_count == 0 ->
{:error, failed_events}
failed_count > 0 ->
{:partial, successful_count, failed_events}
true ->
{:ok, successful_count}
end
end
end

View File

@@ -3,7 +3,7 @@ defmodule WandererApp.MixProject do
@source_url "https://github.com/wanderer-industries/wanderer"
@version "1.84.2"
@version "1.84.1"
def project do
[