40 KiB
Controller Package Documentation
Table of Contents
- Controller Package Documentation
- Table of Contents
- Overview and Role in the System
- Core Concepts and Main Interfaces
- Entity Lifecycle and State Management
- Log Management and Event Publication
- Integration with Providers, Tools, and Subscriptions
- Internal Structure and Concurrency Model
- Extensibility, Error Handling, and Best Practices
Overview and Role in the System
The controller package is a central part of the backend architecture, responsible for orchestrating the lifecycle and logic of flows, assistants, tasks, subtasks, and various types of logs in the system. It acts as a high-level service layer, mediating between the database, providers, tools, and the event subscription system.
Key Responsibilities
- Managing the creation, loading, execution, and termination of flows and their associated entities (assistants, tasks, subtasks).
- Providing thread-safe controllers and workers for each logical entity (flow, assistant, task, subtask, logs, screenshots).
- Integrating with the database layer for persistent storage and retrieval of all entities.
- Interfacing with provider abstractions for LLMs, tools, and execution environments.
- Publishing events to the subscription system for real-time updates.
- Ensuring correct state transitions and error handling for all managed entities.
- Supporting both autonomous pentesting flows and interactive assistant conversations.
Architectural Integration
The controller package interacts with the following key packages:
database: For all persistent storage and retrieval operations.providersandtools: For LLM, tool execution, and agent chain management.graph/subscriptions: For publishing real-time events about entity changes.observability/langfuse: For tracing and logging of operations.config,docker, andtemplates: For configuration, container management, and prompt templating.
High-Level Architecture
The following diagram reflects the actual architecture of the controller package, showing all main controllers, their relationships, and integration points with other system components.
flowchart TD
subgraph Controller
FC[FlowController]
FW[FlowWorker]
AW[AssistantWorker]
TC[TaskController]
TW[TaskWorker]
STC[SubtaskController]
STW[SubtaskWorker]
subgraph LogControllers[Log Controllers]
ALC[AgentLogController]
ASLC[AssistantLogController]
MLC[MsgLogController]
SLC[SearchLogController]
TLC[TermLogController]
VSLC[VectorStoreLogController]
SC[ScreenshotController]
end
end
DB[(Database)]
LLMProviders[[ProviderController]]
ToolsExecutor[[FlowToolsExecutor]]
GraphQLSubscriptions((Subscriptions))
Obs>Observability]
Docker[[Docker]]
FC -- manages --> FW
FW -- manages --> AW
FW -- manages --> TC
TC -- manages --> TW
TW -- manages --> STC
STC -- manages --> STW
FC -- uses --> LogControllers
FW -- uses --> LogControllers
AW -- uses --> LogControllers
FC -- uses --> DB
FC -- uses --> LLMProviders
FC -- uses --> ToolsExecutor
FC -- uses --> GraphQLSubscriptions
FC -- uses --> Obs
FC -- uses --> Docker
Role in the System
The controller package is the main orchestrator for all user and system-initiated operations related to flows and their sub-entities. It ensures that all business logic, state transitions, and side effects (such as event publication and logging) are handled consistently and safely. It supports two main operational modes:
- Autonomous Pentesting Mode: Complete flows with automated task generation and execution
- Interactive Assistant Mode: Conversational AI assistants that can optionally use agents and tools
Core Concepts and Main Interfaces
The controller package is built around a set of core concepts and interfaces that encapsulate the logic for managing flows, assistants, tasks, subtasks, and various types of logs. Each logical entity is represented by a controller (managing multiple entities) and a worker (managing a single entity instance).
Main Interfaces and Their Hierarchy
Flow Management
// flows.go
type FlowController interface {
CreateFlow(
ctx context.Context,
userID int64,
input string,
prvtype provider.ProviderType,
functions *tools.Functions,
) (FlowWorker, error)
CreateAssistant(
ctx context.Context,
userID int64,
flowID int64,
input string,
useAgents bool,
prvtype provider.ProviderType,
functions *tools.Functions,
) (AssistantWorker, error)
LoadFlows(ctx context.Context) error
ListFlows(ctx context.Context) []FlowWorker
GetFlow(ctx context.Context, flowID int64) (FlowWorker, error)
StopFlow(ctx context.Context, flowID int64) error
FinishFlow(ctx context.Context, flowID int64) error
}
// flow.go
type FlowWorker interface {
GetFlowID() int64
GetUserID() int64
GetTitle() string
GetContext() *FlowContext
GetStatus(ctx context.Context) (database.FlowStatus, error)
SetStatus(ctx context.Context, status database.FlowStatus) error
AddAssistant(ctx context.Context, aw AssistantWorker) error
GetAssistant(ctx context.Context, assistantID int64) (AssistantWorker, error)
DeleteAssistant(ctx context.Context, assistantID int64) error
ListAssistants(ctx context.Context) []AssistantWorker
ListTasks(ctx context.Context) []TaskWorker
PutInput(ctx context.Context, input string) error
Finish(ctx context.Context) error
Stop(ctx context.Context) error
}
Assistant Management
// assistant.go
type AssistantWorker interface {
GetAssistantID() int64
GetUserID() int64
GetFlowID() int64
GetTitle() string
GetStatus(ctx context.Context) (database.AssistantStatus, error)
SetStatus(ctx context.Context, status database.AssistantStatus) error
PutInput(ctx context.Context, input string, useAgents bool) error
Finish(ctx context.Context) error
Stop(ctx context.Context) error
}
Task and Subtask Management
// tasks.go
type TaskController interface {
CreateTask(ctx context.Context, input string, updater FlowUpdater) (TaskWorker, error)
LoadTasks(ctx context.Context, flowID int64, updater FlowUpdater) error
ListTasks(ctx context.Context) []TaskWorker
GetTask(ctx context.Context, taskID int64) (TaskWorker, error)
}
// task.go
type TaskWorker interface {
GetTaskID() int64
GetFlowID() int64
GetUserID() int64
GetTitle() string
IsCompleted() bool
IsWaiting() bool
GetStatus(ctx context.Context) (database.TaskStatus, error)
SetStatus(ctx context.Context, status database.TaskStatus) error
GetResult(ctx context.Context) (string, error)
SetResult(ctx context.Context, result string) error
PutInput(ctx context.Context, input string) error
Run(ctx context.Context) error
Finish(ctx context.Context) error
}
// subtasks.go
type SubtaskController interface {
LoadSubtasks(ctx context.Context, taskID int64, updater TaskUpdater) error
GenerateSubtasks(ctx context.Context) error
RefineSubtasks(ctx context.Context) error
PopSubtask(ctx context.Context, updater TaskUpdater) (SubtaskWorker, error)
ListSubtasks(ctx context.Context) []SubtaskWorker
GetSubtask(ctx context.Context, subtaskID int64) (SubtaskWorker, error)
}
// subtask.go
type SubtaskWorker interface {
GetMsgChainID() int64
GetSubtaskID() int64
GetTaskID() int64
GetFlowID() int64
GetUserID() int64
GetTitle() string
GetDescription() string
IsCompleted() bool
IsWaiting() bool
GetStatus(ctx context.Context) (database.SubtaskStatus, error)
SetStatus(ctx context.Context, status database.SubtaskStatus) error
GetResult(ctx context.Context) (string, error)
SetResult(ctx context.Context, result string) error
PutInput(ctx context.Context, input string) error
Run(ctx context.Context) error
Finish(ctx context.Context) error
}
Log Management
The system includes seven different types of logs, each with its own controller and worker interfaces:
Agent Logs
// alogs.go
type AgentLogController interface {
NewFlowAgentLog(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowAgentLogWorker, error)
ListFlowsAgentLog(ctx context.Context) ([]FlowAgentLogWorker, error)
GetFlowAgentLog(ctx context.Context, flowID int64) (FlowAgentLogWorker, error)
}
// alog.go
type FlowAgentLogWorker interface {
PutLog(
ctx context.Context,
initiator database.MsgchainType,
executor database.MsgchainType,
task string,
result string,
taskID *int64,
subtaskID *int64,
) (int64, error)
GetLog(ctx context.Context, msgID int64) (database.Agentlog, error)
}
Assistant Logs
// aslogs.go
type AssistantLogController interface {
NewFlowAssistantLog(
ctx context.Context, flowID int64, assistantID int64, pub subscriptions.FlowPublisher,
) (FlowAssistantLogWorker, error)
ListFlowsAssistantLog(ctx context.Context, flowID int64) ([]FlowAssistantLogWorker, error)
GetFlowAssistantLog(ctx context.Context, flowID int64, assistantID int64) (FlowAssistantLogWorker, error)
}
// aslog.go
type FlowAssistantLogWorker interface {
PutMsg(
ctx context.Context,
msgType database.MsglogType,
taskID, subtaskID *int64,
streamID int64,
thinking, msg string,
) (int64, error)
PutFlowAssistantMsg(
ctx context.Context,
msgType database.MsglogType,
thinking, msg string,
) (int64, error)
PutFlowAssistantMsgResult(
ctx context.Context,
msgType database.MsglogType,
thinking, msg, result string,
resultFormat database.MsglogResultFormat,
) (int64, error)
StreamFlowAssistantMsg(
ctx context.Context,
chunk *providers.StreamMessageChunk,
) error
UpdateMsgResult(
ctx context.Context,
msgID, streamID int64,
result string,
resultFormat database.MsglogResultFormat,
) error
}
Message Logs
// msglogs.go
type MsgLogController interface {
NewFlowMsgLog(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowMsgLogWorker, error)
ListFlowsMsgLog(ctx context.Context) ([]FlowMsgLogWorker, error)
GetFlowMsgLog(ctx context.Context, flowID int64) (FlowMsgLogWorker, error)
}
// msglog.go
type FlowMsgLogWorker interface {
PutMsg(
ctx context.Context,
msgType database.MsglogType,
taskID, subtaskID *int64,
streamID int64,
thinking, msg string,
) (int64, error)
PutFlowMsg(
ctx context.Context,
msgType database.MsglogType,
thinking, msg string,
) (int64, error)
PutFlowMsgResult(
ctx context.Context,
msgType database.MsglogType,
thinking, msg, result string,
resultFormat database.MsglogResultFormat,
) (int64, error)
PutTaskMsg(
ctx context.Context,
msgType database.MsglogType,
taskID int64,
thinking, msg string,
) (int64, error)
PutTaskMsgResult(
ctx context.Context,
msgType database.MsglogType,
taskID int64,
thinking, msg, result string,
resultFormat database.MsglogResultFormat,
) (int64, error)
PutSubtaskMsg(
ctx context.Context,
msgType database.MsglogType,
taskID, subtaskID int64,
thinking, msg string,
) (int64, error)
PutSubtaskMsgResult(
ctx context.Context,
msgType database.MsglogType,
taskID, subtaskID int64,
thinking, msg, result string,
resultFormat database.MsglogResultFormat,
) (int64, error)
UpdateMsgResult(
ctx context.Context,
msgID, streamID int64,
result string,
resultFormat database.MsglogResultFormat,
) error
}
Search Logs
// slogs.go
type SearchLogController interface {
NewFlowSearchLog(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowSearchLogWorker, error)
ListFlowsSearchLog(ctx context.Context) ([]FlowSearchLogWorker, error)
GetFlowSearchLog(ctx context.Context, flowID int64) (FlowSearchLogWorker, error)
}
// slog.go
type FlowSearchLogWorker interface {
PutLog(
ctx context.Context,
initiator database.MsgchainType,
executor database.MsgchainType,
engine database.SearchengineType,
query string,
result string,
taskID *int64,
subtaskID *int64,
) (int64, error)
GetLog(ctx context.Context, msgID int64) (database.Searchlog, error)
}
Terminal Logs
// termlogs.go
type TermLogController interface {
NewFlowTermLog(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowTermLogWorker, error)
ListFlowsTermLog(ctx context.Context) ([]FlowTermLogWorker, error)
GetFlowTermLog(ctx context.Context, flowID int64) (FlowTermLogWorker, error)
GetFlowContainers(ctx context.Context, flowID int64) ([]database.Container, error)
}
// termlog.go
type FlowTermLogWorker interface {
PutMsg(ctx context.Context, msgType database.TermlogType, msg string, containerID int64) (int64, error)
GetMsg(ctx context.Context, msgID int64) (database.Termlog, error)
GetContainers(ctx context.Context) ([]database.Container, error)
}
Vector Store Logs
// vslogs.go
type VectorStoreLogController interface {
NewFlowVectorStoreLog(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowVectorStoreLogWorker, error)
ListFlowsVectorStoreLog(ctx context.Context) ([]FlowVectorStoreLogWorker, error)
GetFlowVectorStoreLog(ctx context.Context, flowID int64) (FlowVectorStoreLogWorker, error)
}
// vslog.go
type FlowVectorStoreLogWorker interface {
PutLog(
ctx context.Context,
initiator database.MsgchainType,
executor database.MsgchainType,
filter json.RawMessage,
query string,
action database.VecstoreActionType,
result string,
taskID *int64,
subtaskID *int64,
) (int64, error)
GetLog(ctx context.Context, msgID int64) (database.Vecstorelog, error)
}
Screenshots
// screenshots.go
type ScreenshotController interface {
NewFlowScreenshot(ctx context.Context, flowID int64, pub subscriptions.FlowPublisher) (FlowScreenshotWorker, error)
ListFlowsScreenshot(ctx context.Context) ([]FlowScreenshotWorker, error)
GetFlowScreenshot(ctx context.Context, flowID int64) (FlowScreenshotWorker, error)
}
// screenshot.go
type FlowScreenshotWorker interface {
PutScreenshot(ctx context.Context, name, url string) (int64, error)
GetScreenshot(ctx context.Context, screenshotID int64) (database.Screenshot, error)
}
Supporting Types and Constants
// context.go
type FlowContext struct {
DB database.Querier
UserID int64
FlowID int64
FlowTitle string
Executor tools.FlowToolsExecutor
Provider providers.FlowProvider
Publisher subscriptions.FlowPublisher
TermLog FlowTermLogWorker
MsgLog FlowMsgLogWorker
Screenshot FlowScreenshotWorker
}
type TaskContext struct {
TaskID int64
TaskTitle string
TaskInput string
FlowContext
}
type SubtaskContext struct {
MsgChainID int64
SubtaskID int64
SubtaskTitle string
SubtaskDescription string
TaskContext
}
// Updater interfaces for status propagation
type FlowUpdater interface {
SetStatus(ctx context.Context, status database.FlowStatus) error
}
type TaskUpdater interface {
SetStatus(ctx context.Context, status database.TaskStatus) error
}
Interface Hierarchy Diagram
classDiagram
class FlowController {
+CreateFlow()
+CreateAssistant()
+LoadFlows()
+ListFlows()
+GetFlow()
+StopFlow()
+FinishFlow()
}
class FlowWorker {
+GetFlowID()
+GetUserID()
+GetTitle()
+GetContext()
+GetStatus()
+SetStatus()
+AddAssistant()
+GetAssistant()
+DeleteAssistant()
+ListAssistants()
+ListTasks()
+PutInput()
+Finish()
+Stop()
}
class AssistantWorker {
+GetAssistantID()
+GetUserID()
+GetFlowID()
+GetTitle()
+GetStatus()
+SetStatus()
+PutInput()
+Finish()
+Stop()
}
class TaskController {
+CreateTask()
+LoadTasks()
+ListTasks()
+GetTask()
}
class TaskWorker {
+GetTaskID()
+IsCompleted()
+IsWaiting()
+GetStatus()
+SetStatus()
+GetResult()
+SetResult()
+PutInput()
+Run()
+Finish()
}
class SubtaskController {
+LoadSubtasks()
+GenerateSubtasks()
+RefineSubtasks()
+PopSubtask()
+ListSubtasks()
+GetSubtask()
}
class SubtaskWorker {
+GetMsgChainID()
+GetSubtaskID()
+IsCompleted()
+IsWaiting()
+GetStatus()
+SetStatus()
+PutInput()
+Run()
+Finish()
}
class LogControllers {
<<interface>>
+NewFlowLog()
+GetFlowLog()
+ListFlowsLog()
}
class LogWorkers {
<<interface>>
+PutLog()
+GetLog()
}
FlowController --> FlowWorker : manages
FlowWorker --> AssistantWorker : manages
FlowWorker --> TaskController : contains
TaskController --> TaskWorker : manages
TaskWorker --> SubtaskController : contains
SubtaskController --> SubtaskWorker : manages
FlowController --> LogControllers : uses
LogControllers --> LogWorkers : creates
FlowWorker --> LogWorkers : uses
AssistantWorker --> LogWorkers : uses
Entity Lifecycle and State Management
The controller package implements a strict lifecycle and state management system for all major entities: flows, assistants, tasks, and subtasks. Each entity has a well-defined set of states, and transitions are managed through controller and worker methods, with all changes persisted to the database and broadcast via the subscription system.
Flow Lifecycle
States
Created(database.FlowStatusCreated)Running(database.FlowStatusRunning)Waiting(database.FlowStatusWaiting)Finished(database.FlowStatusFinished)Failed(database.FlowStatusFailed)
State Transitions
- Flows are created in the
Createdstate. - When tasks start running, flows transition to
Running. - If tasks are waiting for input, flows move to
Waiting. - On completion of all tasks, flows become
Finished. - On error, flows become
Failed.
State Management
- Transitions are managed via
SetStatus(FlowWorker), with updates persisted and events published. - Flow state is influenced by task state (e.g., if tasks are waiting, the flow is waiting).
- Flows can be stopped (graceful termination) or finished (completion of all tasks).
State Diagram
stateDiagram-v2
[*] --> Created: CreateFlow()
Created --> Running: PutInput() / Start Task
Running --> Waiting: Task waiting for input
Running --> Finished: All tasks completed successfully
Running --> Failed: Task failed / Error
Waiting --> Running: PutInput() / Resume Task
Waiting --> Finished: Finish()
Waiting --> Failed: Error
Finished --> [*]
Failed --> [*]
Assistant Lifecycle
States
Created(database.AssistantStatusCreated)Running(database.AssistantStatusRunning)Waiting(database.AssistantStatusWaiting)Finished(database.AssistantStatusFinished)Failed(database.AssistantStatusFailed)
State Transitions
- Assistants are created in the
Createdstate. - When processing input, they transition to
Running. - If waiting for user input, they move to
Waiting. - On completion or user termination, they become
Finished. - On error, they become
Failed.
State Management
- Transitions are managed via
SetStatus(AssistantWorker). - Assistants support streaming responses with real-time updates.
- Multiple assistants can exist within a single flow.
- Assistant execution can be stopped or finished independently.
stateDiagram-v2
[*] --> Created: CreateAssistant()
Created --> Running: PutInput()
Running --> Waiting: Waiting for user input
Running --> Finished: Conversation ended
Running --> Failed: Error in processing
Waiting --> Running: PutInput()
Waiting --> Finished: Finish()
Finished --> [*]
Failed --> [*]
Task Lifecycle
States
Created(database.TaskStatusCreated)Running(database.TaskStatusRunning)Waiting(database.TaskStatusWaiting)Finished(database.TaskStatusFinished)Failed(database.TaskStatusFailed)
State Transitions
- Tasks are created in the
Createdstate when a flow receives input. - They transition to
Runningwhen execution begins. - If subtasks require input, tasks move to
Waiting. - On successful completion, tasks become
Finished. - On error, tasks become
Failed.
State Management
- Transitions are managed via
SetStatus(TaskWorker), with updates affecting the parent flow status. - Task state is influenced by subtask state (e.g., if subtasks are waiting, the task is waiting).
- Tasks can be finished early or allowed to complete naturally.
stateDiagram-v2
[*] --> Created: CreateTask()
Created --> Running: Run()
Running --> Waiting: Subtask waiting for input
Running --> Finished: All subtasks completed
Running --> Failed: Subtask failed / Error
Waiting --> Running: PutInput() / Resume subtask
Waiting --> Finished: Finish()
Finished --> [*]
Failed --> [*]
Subtask Lifecycle
States
Created(database.SubtaskStatusCreated)Running(database.SubtaskStatusRunning)Waiting(database.SubtaskStatusWaiting)Finished(database.SubtaskStatusFinished)Failed(database.SubtaskStatusFailed)
State Transitions
- Subtasks are created in the
Createdstate when generated by task planning. - They transition to
Runningwhen execution begins. - If they require additional input, subtasks move to
Waiting. - On successful completion, subtasks become
Finished. - On error, subtasks become
Failed.
State Management
- Transitions are managed via
SetStatus(SubtaskWorker), with updates affecting the parent task status. - Subtasks are executed sequentially, with refinement between executions.
- Each subtask operates with its own message chain for AI provider communication.
stateDiagram-v2
[*] --> Created: GenerateSubtasks()
Created --> Running: PopSubtask() / Run()
Running --> Waiting: Provider waiting for input
Running --> Finished: Provider completed successfully
Running --> Failed: Provider failed / Error
Waiting --> Running: PutInput()
Waiting --> Finished: Finish()
Finished --> [*]
Failed --> [*]
Error Handling and Event Publication
- All state transitions are atomic and include error handling with proper rollback mechanisms.
- Failed states are terminal and require manual intervention or restart.
- All state changes are published via the subscription system for real-time updates.
- Errors are logged with full context and propagated up the hierarchy.
Example: State Transition (Task)
func (tw *taskWorker) SetStatus(ctx context.Context, status database.TaskStatus) error {
task, err := tw.taskCtx.DB.UpdateTaskStatus(ctx, database.UpdateTaskStatusParams{
Status: status,
ID: tw.taskCtx.TaskID,
})
if err != nil {
return fmt.Errorf("failed to set task %d status: %w", tw.taskCtx.TaskID, err)
}
subtasks, err := tw.taskCtx.DB.GetTaskSubtasks(ctx, tw.taskCtx.TaskID)
if err != nil {
return fmt.Errorf("failed to get task %d subtasks: %w", tw.taskCtx.TaskID, err)
}
tw.taskCtx.Publisher.TaskUpdated(ctx, task, subtasks)
tw.mx.Lock()
defer tw.mx.Unlock()
switch status {
case database.TaskStatusRunning:
tw.completed = false
tw.waiting = false
err = tw.updater.SetStatus(ctx, database.FlowStatusRunning)
case database.TaskStatusWaiting:
tw.completed = false
tw.waiting = true
err = tw.updater.SetStatus(ctx, database.FlowStatusWaiting)
case database.TaskStatusFinished, database.TaskStatusFailed:
tw.completed = true
tw.waiting = false
err = tw.updater.SetStatus(ctx, database.FlowStatusWaiting)
}
return err
}
Log Management and Event Publication
The controller package manages seven distinct types of logs, each serving specific purposes in the penetration testing workflow. All logs are handled through a consistent controller/worker pattern and support real-time event publication.
Log Types and Their Roles
-
Message Logs (
MsgLogController/FlowMsgLogWorker)- Records all AI agent communications and reasoning
- Supports thinking/reasoning capture for transparency
- Handles different message types (input, output, report, etc.)
- Supports result formatting (plain text, markdown, JSON)
-
Assistant Logs (
AssistantLogController/FlowAssistantLogWorker)- Records conversations with interactive AI assistants
- Supports real-time streaming for live chat experiences
- Manages multiple assistants per flow
- Includes streaming message chunks for progressive updates
-
Agent Logs (
AgentLogController/FlowAgentLogWorker)- Records interactions between different AI agents
- Tracks task delegation and agent communication
- Identifies initiator and executor agents
- Links to specific tasks and subtasks
-
Search Logs (
SearchLogController/FlowSearchLogWorker)- Records web searches and OSINT operations
- Tracks different search engines (Google, DuckDuckGo, etc.)
- Stores search queries and results
- Essential for reconnaissance phases
-
Terminal Logs (
TermLogController/FlowTermLogWorker)- Records all command-line interactions
- Tracks input/output from pentesting tools
- Associates with specific Docker containers
- Critical for audit trail and debugging
-
Vector Store Logs (
VectorStoreLogController/FlowVectorStoreLogWorker)- Records vector database operations
- Tracks similarity searches and embeddings
- Supports query filtering and metadata
- Used for knowledge management and context retrieval
-
Screenshots (
ScreenshotController/FlowScreenshotWorker)- Captures visual evidence during testing
- Stores screenshot URLs and metadata
- Links to specific flow contexts
- Essential for reporting and documentation
Log Lifecycle and Operations
All log types follow a consistent pattern:
- Creation: Log workers are created per flow by their respective controllers
- Logging: Messages/events are recorded via
PutLog()or similar methods - Retrieval: Historical logs can be retrieved via
GetLog()methods - Event Publication: Every log operation triggers real-time events
- Cleanup: Log workers are cleaned up when flows are finished
Special Features
- Message Truncation: Long messages are truncated to prevent database bloat
- Thread Safety: All log operations are protected by mutexes
- Streaming Support: Assistant logs support real-time streaming updates
- Context Linking: Most logs can be linked to specific tasks and subtasks
Event Publication
Every log operation publishes corresponding events:
// Example events published by log workers
pub.MessageLogAdded(ctx, msgLog)
pub.AgentLogAdded(ctx, agentLog)
pub.AssistantLogAdded(ctx, assistantLog)
pub.AssistantLogUpdated(ctx, assistantLog, isStreaming)
pub.SearchLogAdded(ctx, searchLog)
pub.TerminalLogAdded(ctx, termLog)
pub.VectorStoreLogAdded(ctx, vectorLog)
pub.ScreenshotAdded(ctx, screenshot)
Log Worker Creation and Management
Log controllers maintain thread-safe maps of workers:
// Example from MsgLogController
func (mlc *msgLogController) NewFlowMsgLog(
ctx context.Context,
flowID int64,
pub subscriptions.FlowPublisher,
) (FlowMsgLogWorker, error) {
mlc.mx.Lock()
defer mlc.mx.Unlock()
flw := NewFlowMsgLogWorker(mlc.db, flowID, pub)
mlc.flows[flowID] = flw
return flw, nil
}
Integration with Providers, Tools, and Subscriptions
The controller package is deeply integrated with external providers (LLM, tools), the tools execution layer, and the event subscription system. This integration is essential for orchestrating complex flows, executing tasks and subtasks, and providing real-time updates to clients.
Providers Integration
- The package uses the
providers.ProviderControllerinterface to create and manage provider instances for each flow and assistant. - Providers are responsible for LLM operations, agent chain management, and tool execution.
- Each flow is associated with a
FlowProvider, and assistants have their ownAssistantProvider. - Providers are injected into contexts and are accessible to all workers.
- Support for multiple provider types (OpenAI, Anthropic, etc.) through abstract interfaces.
Tools Integration
- The
tools.FlowToolsExecutoris created for each flow and is responsible for executing tool calls within the flow context. - The executor is configured with the provider's image, embedder, and all log providers.
- Tools are invoked as part of agent chain execution and are tightly coupled with the flow's lifecycle.
- Tools can access all logging capabilities for audit trails and debugging.
Subscriptions and Event Publication
- The
subscriptions.FlowPublisheris created for each flow and publishes all significant events. - The publisher is injected into all log workers and used to notify subscribers in real time.
- Events include entity creation, updates, log additions, and state changes.
- The event system is decoupled from core logic, ensuring atomic operations.
Dependency Injection and Context Propagation
All dependencies are injected through constructor parameters and context objects:
type flowWorkerCtx struct {
db database.Querier
cfg *config.Config
docker docker.DockerClient
provs providers.ProviderController
subs subscriptions.SubscriptionsController
flowProviderControllers
}
type flowProviderControllers struct {
mlc MsgLogController
aslc AssistantLogController
alc AgentLogController
slc SearchLogController
tlc TermLogController
vslc VectorStoreLogController
sc ScreenshotController
}
Integration Flow Diagrams
flowchart LR
subgraph Controllers
FC[FlowController]
FW[FlowWorker]
AW[AssistantWorker]
LogCtrl[Log Controllers]
end
subgraph External
DB[(Database)]
Providers[AI Providers]
Tools[Tools Executor]
Subs[Subscriptions]
Docker[Docker]
end
FC --> FW
FW --> AW
FC --> LogCtrl
FW --> LogCtrl
AW --> LogCtrl
Controllers --> DB
Controllers --> Providers
Controllers --> Tools
Controllers --> Subs
Controllers --> Docker
Internal Structure and Concurrency Model
The controller package is designed for safe concurrent operation in a multi-user, multi-flow environment. All controllers and workers use mutexes to ensure thread safety for all mutable state.
Mutex Usage and Thread Safety
- Each controller contains a
*sync.Mutexor*sync.RWMutexto guard access to internal maps and state. - All public methods that mutate or read shared state acquire the mutex for the duration of the operation.
- Workers also use mutexes to protect their internal state, especially for status flags and log operations.
- This design prevents race conditions and ensures that all operations are atomic and consistent.
Example: Controller Mutex Usage
type flowController struct {
db database.Querier
mx *sync.Mutex
flows map[int64]FlowWorker
// ... other dependencies ...
}
func (fc *flowController) CreateFlow(...) (FlowWorker, error) {
fc.mx.Lock()
defer fc.mx.Unlock()
// ... mutate fc.flows ...
}
Example: Worker Mutex Usage
type flowMsgLogWorker struct {
db database.Querier
mx *sync.Mutex
flowID int64
pub subscriptions.FlowPublisher
}
func (mlw *flowMsgLogWorker) PutMsg(...) (int64, error) {
mlw.mx.Lock()
defer mlw.mx.Unlock()
// ... perform log operation ...
}
State Storage and Management
- Controllers maintain maps of active workers for fast lookup and management.
- All access to these maps is guarded by mutexes.
- Workers encapsulate all state and dependencies for a single entity.
- Context objects (
FlowContext,TaskContext,SubtaskContext) pass dependencies down the hierarchy.
Worker Goroutines and Channels
FlowWorkerandAssistantWorkeruse goroutines and channels to process input asynchronously.- Dedicated goroutines run worker loops, processing input and managing execution.
- Synchronization is achieved using mutexes, channels, and wait groups for clean shutdown.
- Background processing enables responsive user interactions while maintaining system stability.
Assistant Streaming Example
// Assistant log worker supports streaming for real-time chat
func (aslw *flowAssistantLogWorker) workerMsgUpdater(
msgID, streamID int64,
ch chan *providers.StreamMessageChunk,
) {
// Processes streaming chunks in background goroutine
for chunk := range ch {
// Update database and publish events in real-time
processChunk(chunk)
}
}
Extensibility, Error Handling, and Best Practices
The controller package is designed for extensibility, robust error handling, and safe integration into larger systems.
Extensibility
- All major entities are abstracted via interfaces, making it easy to add new types or extend existing ones.
- New log types can be added by following the established controller/worker pattern.
- The pattern is consistent across all log types: Controller manages multiple workers, Worker handles single entity.
- Dependency injection via context and constructor parameters allows for easy testing and mocking.
- The use of context objects enables flexible propagation of dependencies and state.
Adding New Log Types
To add a new log type, follow this pattern:
- Create
LogControllerinterface andLogWorkerinterface - Implement controller with thread-safe worker map
- Implement worker with mutex protection and event publication
- Add to
flowProviderControllersstruct - Integrate into flow worker creation process
Error Handling
- All public methods return errors, wrapped with context using
fmt.Errorf. - Errors are logged using
logrusand propagated up the call stack. - State transitions on error are explicit: entities are set to
Failedstates. - Defensive checks are used throughout (nil checks, state verification, etc.).
- The
wrapErrorEndSpanutility provides consistent error handling with observability.
Error Handling Example
func wrapErrorEndSpan(ctx context.Context, span langfuse.Span, msg string, err error) error {
logrus.WithContext(ctx).WithError(err).Error(msg)
err = fmt.Errorf("%s: %w", msg, err)
span.End(
langfuse.WithEndSpanStatus(err.Error()),
langfuse.WithSpanLevel(langfuse.ObservationLevelError),
)
return err
}
Best Practices
- Always use contexts: All operations accept and propagate context for cancellation and tracing.
- Mutex discipline: Always use defer for mutex unlocking to prevent deadlocks.
- Error wrapping: Provide meaningful error messages with full context.
- Event publication: Publish events after successful database operations.
- State consistency: Ensure database and in-memory state remain synchronized.
- Resource cleanup: Implement proper cleanup in
Finish()methods. - Observability: Use tracing and logging for all significant operations.
The controller package provides a robust foundation for managing complex AI-driven penetration testing workflows while maintaining reliability, observability, and extensibility.