44 KiB
Database Package Documentation
Overview
The database package is a core component of PentAGI that provides a robust, type-safe interface for interacting with PostgreSQL database operations. Built on top of sqlc, this package automatically generates Go code from SQL queries, ensuring compile-time safety and eliminating the need for manual ORM mapping.
PentAGI uses PostgreSQL with the pgvector extension to support vector embeddings for AI-powered semantic search and memory storage capabilities.
Architecture
Database Technology Stack
- Database Engine: PostgreSQL 15+ with pgvector extension
- Code Generation: sqlc for type-safe SQL-to-Go compilation
- ORM Support: GORM v1 for advanced operations and HTTP server handlers
- Schema Management: Database migrations located in
backend/migrations/ - Vector Operations: pgvector extension for AI embeddings and semantic search
Entity Relationship Model
The database follows PentAGI's hierarchical data model for penetration testing workflows:
Flow (Top-level workflow)
├── Task (Major testing phases)
│ └── SubTask (Specific agent assignments)
│ └── Action (Individual operations)
│ ├── Artifact (Output files/data)
│ └── Memory (Knowledge/observations)
└── Assistant (AI assistants for flows)
└── AssistantLog (Assistant interaction logs)
Additional supporting entities include:
- Container: Docker containers for isolated execution
- User: System users with role-based access
- MsgChain: LLM conversation chains
- ToolCall: Function calls made by AI agents
- Various Logs: Comprehensive audit trail for all operations
SQL Query Organization
The database package is built on a comprehensive set of SQL queries organized by entity type in the backend/sqlc/models/ directory. Each file contains CRUD operations and specialized queries for its respective entity.
Query File Structure
| File | Entity | Purpose |
|---|---|---|
flows.sql |
Flow | Top-level workflow management and analytics |
tasks.sql |
Task | Task lifecycle and status tracking |
subtasks.sql |
SubTask | Agent assignment and execution |
assistants.sql |
Assistant | AI assistant management |
containers.sql |
Container | Docker environment tracking |
users.sql |
User | User management and authentication |
roles.sql |
Role | Role-based access control |
prompts.sql |
Prompt | User-defined prompt templates |
providers.sql |
Provider | LLM provider configurations |
msgchains.sql |
MsgChain | LLM conversation chains and usage stats |
toolcalls.sql |
ToolCall | AI function call tracking and analytics |
screenshots.sql |
Screenshot | Visual artifacts storage |
analytics.sql |
Analytics | Flow execution time and hierarchy analytics |
| Logging Entities | ||
agentlogs.sql |
AgentLog | Inter-agent communication |
assistantlogs.sql |
AssistantLog | Human-assistant interactions |
msglogs.sql |
MsgLog | General message logging |
searchlogs.sql |
SearchLog | External search operations |
termlogs.sql |
TermLog | Terminal command execution |
vecstorelogs.sql |
VecStoreLog | Vector database operations |
Query Naming Conventions
sqlc queries follow consistent naming patterns:
-- CRUD Operations
-- name: Create[Entity] :one
-- name: Get[Entity] :one
-- name: Get[Entities] :many
-- name: Update[Entity] :one
-- name: Delete[Entity] :exec/:one
-- Scoped Operations
-- name: GetUser[Entity] :one
-- name: GetUser[Entities] :many
-- name: GetFlow[Entity] :one
-- name: GetFlow[Entities] :many
-- Specialized Queries
-- name: Get[Entity][Condition] :many
-- name: Update[Entity][Field] :one
Security and Multi-tenancy Patterns
Most queries implement user-scoped access through JOIN operations:
-- Example: User-scoped flow access
-- name: GetUserFlow :one
SELECT f.*
FROM flows f
INNER JOIN users u ON f.user_id = u.id
WHERE f.id = $1 AND f.user_id = $2 AND f.deleted_at IS NULL;
-- Example: Flow-scoped task access
-- name: GetFlowTasks :many
SELECT t.*
FROM tasks t
INNER JOIN flows f ON t.flow_id = f.id
WHERE t.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY t.created_at ASC;
Soft Delete Implementation
Critical entities implement soft deletes to maintain audit trails:
-- Soft delete operation
-- name: DeleteFlow :one
UPDATE flows
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = $1
RETURNING *;
-- All queries filter soft-deleted records
WHERE f.deleted_at IS NULL
Logging Query Patterns
Logging entities follow consistent patterns for audit trails:
-- name: CreateAgentLog :one
INSERT INTO agentlogs (
initiator, -- AI agent that initiated the action
executor, -- AI agent that executed the action
task, -- Description of the task
result, -- JSON result of the operation
flow_id, -- Associated flow
task_id, -- Associated task (nullable)
subtask_id -- Associated subtask (nullable)
) VALUES (
$1, $2, $3, $4, $5, $6, $7
) RETURNING *;
-- Hierarchical retrieval with security joins
-- name: GetFlowAgentLogs :many
SELECT al.*
FROM agentlogs al
INNER JOIN flows f ON al.flow_id = f.id
WHERE al.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY al.created_at ASC;
Complex Query Examples
Message Chain Management
-- Get conversation chains for a specific task
-- name: GetTaskPrimaryMsgChains :many
SELECT mc.*
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
WHERE (mc.task_id = $1 OR s.task_id = $1) AND mc.type = 'primary_agent'
ORDER BY mc.created_at DESC;
-- Update conversation usage tracking with duration
-- name: UpdateMsgChainUsage :one
UPDATE msgchains
SET
usage_in = usage_in + $1,
usage_out = usage_out + $2,
usage_cache_in = usage_cache_in + $3,
usage_cache_out = usage_cache_out + $4,
usage_cost_in = usage_cost_in + $5,
usage_cost_out = usage_cost_out + $6,
duration_seconds = duration_seconds + $7
WHERE id = $8
RETURNING *;
// Get usage statistics for a specific flow
-- name: GetFlowUsageStats :one
SELECT
COALESCE(SUM(mc.usage_in), 0) AS total_usage_in,
COALESCE(SUM(mc.usage_out), 0) AS total_usage_out,
COALESCE(SUM(mc.usage_cache_in), 0) AS total_usage_cache_in,
COALESCE(SUM(mc.usage_cache_out), 0) AS total_usage_cache_out,
COALESCE(SUM(mc.usage_cost_in), 0.0) AS total_usage_cost_in,
COALESCE(SUM(mc.usage_cost_out), 0.0) AS total_usage_cost_out
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
LEFT JOIN tasks t ON s.task_id = t.id OR mc.task_id = t.id
INNER JOIN flows f ON (mc.flow_id = f.id OR t.flow_id = f.id)
WHERE (mc.flow_id = $1 OR t.flow_id = $1) AND f.deleted_at IS NULL;
Container Management with Constraints
-- Upsert container with conflict resolution
-- name: CreateContainer :one
INSERT INTO containers (
type, name, image, status, flow_id, local_id, local_dir
) VALUES (
$1, $2, $3, $4, $5, $6, $7
)
ON CONFLICT ON CONSTRAINT containers_local_id_unique
DO UPDATE SET
type = EXCLUDED.type,
name = EXCLUDED.name,
image = EXCLUDED.image,
status = EXCLUDED.status,
flow_id = EXCLUDED.flow_id,
local_dir = EXCLUDED.local_dir
RETURNING *;
Role-Based Access Control
-- Complex role aggregation
-- name: GetUser :one
SELECT
u.*,
r.name AS role_name,
(
SELECT ARRAY_AGG(p.name)
FROM privileges p
WHERE p.role_id = r.id
) AS privileges
FROM users u
INNER JOIN roles r ON u.role_id = r.id
WHERE u.id = $1;
Code Generation with sqlc
Configuration
The package uses sqlc for code generation with the following configuration (sqlc/sqlc.yml):
version: "2"
sql:
- engine: "postgresql"
queries: ["models/*.sql"]
schema: ["../migrations/sql/*.sql"]
gen:
go:
package: "database"
out: "../pkg/database"
sql_package: "database/sql"
emit_interface: true
emit_json_tags: true
database:
uri: ${DATABASE_URL}
Generation Command
Code generation is performed using Docker to ensure consistency:
docker run --rm -v "$(pwd):/src" --network pentagi-network \
-e DATABASE_URL='postgres://postgres:postgres@pgvector:5432/pentagidb?sslmode=disable' \
-w /src sqlc/sqlc:1.27.0 generate -f sqlc/sqlc.yml
This command:
- Mounts the current directory into the container
- Connects to the PentAGI database network
- Uses the PostgreSQL database URL for schema introspection
- Generates type-safe Go code from SQL queries
Core Components
1. Database Interface (db.go)
Provides the foundational database transaction interface:
type DBTX interface {
ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
PrepareContext(context.Context, string) (*sql.Stmt, error)
QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}
type Queries struct {
db DBTX
}
Key Features:
- Generic database transaction interface
- Support for both direct database connections and transactions
- Thread-safe query execution
- Context-aware operations for timeout handling
2. Database Utilities (database.go)
Contains utility functions and GORM integration:
// Null value converters
func StringToNullString(s string) sql.NullString
func NullStringToPtrString(s sql.NullString) *string
func Int64ToNullInt64(i *int64) sql.NullInt64
func NullInt64ToInt64(i sql.NullInt64) *int64
func TimeToNullTime(t time.Time) sql.NullTime
// GORM configuration
func NewGorm(dsn, dbType string) (*gorm.DB, error)
Key Features:
- Null value handling for optional database fields
- GORM integration with custom logging
- Connection pooling configuration
- OpenTelemetry observability integration
3. Query Interface (querier.go)
Auto-generated interface containing all database operations:
type Querier interface {
// Flow operations
CreateFlow(ctx context.Context, arg CreateFlowParams) (Flow, error)
GetFlows(ctx context.Context) ([]Flow, error)
GetUserFlow(ctx context.Context, arg GetUserFlowParams) (Flow, error)
UpdateFlowStatus(ctx context.Context, arg UpdateFlowStatusParams) (Flow, error)
DeleteFlow(ctx context.Context, id int64) (Flow, error)
// Task operations
CreateTask(ctx context.Context, arg CreateTaskParams) (Task, error)
GetFlowTasks(ctx context.Context, flowID int64) ([]Task, error)
UpdateTaskStatus(ctx context.Context, arg UpdateTaskStatusParams) (Task, error)
// ... 150+ additional methods
}
Features:
- Complete CRUD operations for all entities
- User-scoped queries for multi-tenancy
- Efficient joins with foreign key relationships
- Soft delete support for critical entities
4. Model Converters (converter/converter.go)
Converts database models to GraphQL schema types:
func ConvertFlows(flows []database.Flow, containers []database.Container) []*model.Flow
func ConvertFlow(flow database.Flow, containers []database.Container) *model.Flow
func ConvertTasks(tasks []database.Task, subtasks []database.Subtask) []*model.Task
func ConvertAssistants(assistants []database.Assistant) []*model.Assistant
Key Functions:
- Transform database types to GraphQL models
- Handle relationship mapping (flows → tasks → subtasks)
- Null value processing for optional fields
- Aggregation of related entities
Data Models
Core Workflow Entities
Flow
Top-level penetration testing workflow:
id,title,status(active/completed/failed)model,model_provider_name,model_provider_typefor AI configurationlanguagefor localizationtool_call_id_templatefor customizing tool call ID formatfunctionsas JSON for AI behaviortrace_idfor observabilityuser_idfor multi-tenancy- Soft delete with
deleted_at
Note: Prompts are no longer stored in flows. They are managed separately through the prompts table and loaded dynamically based on PROMPT_TYPE.
Task
Major phases within a flow:
id,flow_id,title,status(pending/running/done/failed)inputfor task parametersresultJSON for task outputs- Creation and update timestamps
SubTask
Specific assignments for AI agents:
id,task_id,title,descriptionstatus(created/waiting/running/finished/failed)resultandcontextJSON fields- Agent type classification
Supporting Entities
Container
Docker execution environments:
type(primary/secondary),name,imagestatus(starting/running/stopped)local_idfor Docker integrationlocal_dirfor volume mapping
Assistant
AI assistants for interactive flows:
title,status,model,model_provider_name,model_provider_typelanguagefor localizationtool_call_id_templatefor customizing tool call ID formatfunctionsconfiguration as JSONuse_agentsflag for delegation behaviormsgchain_idfor conversation tracking- Flow association and soft delete
Note: Prompts are managed separately through the prompts table, not stored in assistants.
Message Chains (MsgChain)
LLM conversation management and usage tracking:
type(primary_agent/assistant/generator/refiner/reporter/etc.)model,model_providerfor tracking- Token usage tracking:
usage_in,usage_out- input/output tokensusage_cache_in,usage_cache_out- cached tokens (for prompt caching)usage_cost_in,usage_cost_out- cost tracking in currency units
- Duration tracking:
duration_seconds- pre-calculated execution duration (DOUBLE PRECISION, NOT NULL, DEFAULT 0.0)- Automatically incremented during updates using delta from backend
- Provides fast analytics without real-time calculations
chainJSON for conversation history- Multi-level association (flow/task/subtask)
- Creation and update timestamps for temporal analysis
Provider
LLM provider configurations for multi-provider support:
type- PROVIDER_TYPE enum (openai/anthropic/gemini/bedrock/deepseek/glm/kimi/qwen/ollama/custom)name- user-defined provider nameconfig- JSON configuration for API keys and settingsuser_id- user ownership- Soft delete with
deleted_at - Unique constraint on (name, user_id) for active providers
Prompt
Centralized prompt template management:
type- PROMPT_TYPE enum (primary_agent/assistant/pentester/coder/etc.)prompt- template contentuser_id- user ownership- Creation and update timestamps
Logging Entities
The package provides comprehensive logging for all system operations:
- AgentLog: Inter-agent communication and delegation
- AssistantLog: Human-assistant interactions
- MsgLog: General message logging (thoughts/browser/terminal/file/search/advice/ask/input/done)
- SearchLog: External search operations (google/tavily/traversaal/browser/duckduckgo/perplexity/sploitus/searxng)
- TermLog: Terminal command execution (stdin/stdout/stderr)
- ToolCall: AI function calling with duration tracking
duration_seconds- pre-calculated execution duration (DOUBLE PRECISION, NOT NULL, DEFAULT 0.0)- Automatically incremented during status updates using delta from backend
- Only counts completed toolcalls (finished/failed) in analytics
- VecStoreLog: Vector database operations
LLM Usage Analytics
The database package provides comprehensive analytics for tracking LLM usage, costs, and performance across all levels of the workflow hierarchy. This enables detailed monitoring of AI resource consumption and cost optimization.
Usage Tracking Fields
The msgchains table tracks six key metrics for each conversation:
| Field | Type | Description |
|---|---|---|
usage_in |
BIGINT | Input tokens consumed |
usage_out |
BIGINT | Output tokens generated |
usage_cache_in |
BIGINT | Cached input tokens (for prompt caching) |
usage_cache_out |
BIGINT | Cached output tokens |
usage_cost_in |
DOUBLE PRECISION | Input cost in currency units |
usage_cost_out |
DOUBLE PRECISION | Output cost in currency units |
Analytics Queries
1. Hierarchical Usage Statistics
Get aggregated usage for specific entities:
// Get total usage for a flow
stats, err := db.GetFlowUsageStats(ctx, flowID)
// Get total usage for a task
stats, err := db.GetTaskUsageStats(ctx, taskID)
// Get total usage for a subtask
stats, err := db.GetSubtaskUsageStats(ctx, subtaskID)
// Get usage for all flows (grouped by flow_id)
allStats, err := db.GetAllFlowsUsageStats(ctx)
Each query returns:
type UsageStats struct {
TotalUsageIn int64 // Total input tokens
TotalUsageOut int64 // Total output tokens
TotalUsageCacheIn int64 // Total cached input tokens
TotalUsageCacheOut int64 // Total cached output tokens
TotalUsageCostIn float64 // Total input cost
TotalUsageCostOut float64 // Total output cost
}
2. Provider and Model Analytics
Track usage by LLM provider or specific model:
-- Get usage statistics grouped by provider
-- name: GetUsageStatsByProvider :many
SELECT
mc.model_provider,
COALESCE(SUM(mc.usage_in), 0) AS total_usage_in,
COALESCE(SUM(mc.usage_out), 0) AS total_usage_out,
COALESCE(SUM(mc.usage_cache_in), 0) AS total_usage_cache_in,
COALESCE(SUM(mc.usage_cache_out), 0) AS total_usage_cache_out,
COALESCE(SUM(mc.usage_cost_in), 0.0) AS total_usage_cost_in,
COALESCE(SUM(mc.usage_cost_out), 0.0) AS total_usage_cost_out
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
LEFT JOIN tasks t ON s.task_id = t.id OR mc.task_id = t.id
INNER JOIN flows f ON (mc.flow_id = f.id OR t.flow_id = f.id)
WHERE f.deleted_at IS NULL
GROUP BY mc.model_provider
ORDER BY mc.model_provider;
-- Get usage statistics grouped by model
-- name: GetUsageStatsByModel :many
-- Similar structure, GROUP BY mc.model, mc.model_provider
Usage example:
// Analyze costs per provider
providerStats, err := db.GetUsageStatsByProvider(ctx)
for _, stat := range providerStats {
totalCost := stat.TotalUsageCostIn + stat.TotalUsageCostOut
fmt.Printf("Provider: %s, Total Cost: $%.2f\n",
stat.ModelProvider, totalCost)
}
// Compare model efficiency
modelStats, err := db.GetUsageStatsByModel(ctx)
3. Agent Type Analytics
Track usage by agent type (primary_agent, assistant, pentester, coder, etc.):
// Get usage by type across all flows
typeStats, err := db.GetUsageStatsByType(ctx)
// Get usage by type for a specific flow
flowTypeStats, err := db.GetUsageStatsByTypeForFlow(ctx, flowID)
This helps identify which agent types consume the most resources.
4. Temporal Analytics
Analyze usage trends over time:
// Last 7 days
weekStats, err := db.GetUsageStatsByDayLastWeek(ctx)
// Last 30 days
monthStats, err := db.GetUsageStatsByDayLastMonth(ctx)
// Last 90 days
quarterStats, err := db.GetUsageStatsByDayLast3Months(ctx)
Each query returns daily aggregates:
type DailyUsageStats struct {
Date time.Time
TotalUsageIn int64
TotalUsageOut int64
TotalUsageCacheIn int64
TotalUsageCacheOut int64
TotalUsageCostIn float64
TotalUsageCostOut float64
}
Usage Tracking Implementation
When making LLM API calls, update usage metrics with duration:
// After receiving LLM response
startTime := time.Now()
// ... make LLM API call ...
durationDelta := time.Since(startTime).Seconds()
_, err := db.UpdateMsgChainUsage(ctx, database.UpdateMsgChainUsageParams{
UsageIn: response.Usage.PromptTokens,
UsageOut: response.Usage.CompletionTokens,
UsageCacheIn: response.Usage.PromptCacheTokens,
UsageCacheOut: response.Usage.CompletionCacheTokens,
UsageCostIn: calculateCost(response.Usage.PromptTokens, inputRate),
UsageCostOut: calculateCost(response.Usage.CompletionTokens, outputRate),
DurationSeconds: durationDelta,
ID: msgChainID,
})
Performance Considerations
All analytics queries are optimized with appropriate indexes:
- Soft delete filtering:
flows_deleted_at_idx- partial index for active flows only - Time-based queries:
msgchains_created_at_idx- for temporal filtering - Provider analytics:
msgchains_model_provider_idx- for grouping by provider - Model analytics:
msgchains_model_provider_composite_idx- composite index - Type analytics:
msgchains_type_flow_id_idx- for flow-scoped type queries
These indexes ensure fast query execution even with millions of message chain records.
Analytics-Specific Indexes
Additional indexes optimized for analytics queries:
Assistants Analytics:
assistants_deleted_at_idx- Partial index for soft delete filtering (WHERE deleted_at IS NULL)assistants_created_at_idx- Temporal queries and sorting by creation dateassistants_flow_id_deleted_at_idx- Flow-scoped queries with soft delete (GetFlowAssistants)assistants_flow_id_created_at_idx- Temporal analytics by flow (GetFlowsStatsByDay*)
Subtasks Analytics:
subtasks_task_id_status_idx- Task-scoped queries with status filteringsubtasks_status_created_at_idx- Execution time analytics (excludes created/waiting)
Toolcalls Analytics:
toolcalls_flow_id_status_idx- Flow-scoped completed toolcalls countingtoolcalls_name_status_idx- Function-based analytics with status filtering
MsgChains Analytics:
msgchains_type_task_id_subtask_id_idx- Hierarchical msgchain lookup by typemsgchains_type_created_at_idx- Temporal analytics grouped by msgchain type
Tasks Analytics:
tasks_flow_id_status_idx- Flow-scoped task queries with status filtering
Cost Optimization Strategies
Use analytics data to optimize LLM costs:
- Identify expensive flows:
GetAllFlowsUsageStats()to find high-cost workflows - Compare providers:
GetUsageStatsByProvider()to choose cost-effective providers - Optimize agent types:
GetUsageStatsByType()to reduce token usage per agent - Monitor trends: Temporal queries to detect unusual spikes in usage
- Cache effectiveness: Compare
usage_cache_invsusage_into measure prompt caching benefits
Example cost analysis:
// Calculate cache savings
stats, _ := db.GetFlowUsageStats(ctx, flowID)
regularTokens := stats.TotalUsageIn + stats.TotalUsageOut
cachedTokens := stats.TotalUsageCacheIn + stats.TotalUsageCacheOut
cacheRatio := float64(cachedTokens) / float64(regularTokens+cachedTokens)
savings := stats.TotalUsageCostIn * (cacheRatio * 0.9) // Assuming 90% cache discount
fmt.Printf("Cache effectiveness: %.1f%%\n", cacheRatio*100)
fmt.Printf("Estimated savings: $%.2f\n", savings)
Flows and Structure Analytics
The database package provides comprehensive analytics for tracking flow structure, execution metrics, and assistant usage across the workflow hierarchy.
Flow Structure Queries
1. Flow-Level Statistics
Get structural metrics for specific flows:
// Get structure stats for a flow
stats, err := db.GetFlowStats(ctx, flowID)
// Returns: total_tasks_count, total_subtasks_count, total_assistants_count
// Get total stats for all user's flows
allStats, err := db.GetUserTotalFlowsStats(ctx, userID)
// Returns: total_flows_count, total_tasks_count, total_subtasks_count, total_assistants_count
Each query returns:
type FlowStats struct {
TotalTasksCount int64
TotalSubtasksCount int64
TotalAssistantsCount int64
}
type FlowsStats struct {
TotalFlowsCount int64
TotalTasksCount int64
TotalSubtasksCount int64
TotalAssistantsCount int64
}
2. Temporal Flow Statistics
Track flow creation and structure over time:
-- Get flows stats by day for the last week
-- name: GetFlowsStatsByDayLastWeek :many
SELECT
DATE(f.created_at) AS date,
COALESCE(COUNT(DISTINCT f.id), 0)::bigint AS total_flows_count,
COALESCE(COUNT(DISTINCT t.id), 0)::bigint AS total_tasks_count,
COALESCE(COUNT(DISTINCT s.id), 0)::bigint AS total_subtasks_count,
COALESCE(COUNT(DISTINCT a.id), 0)::bigint AS total_assistants_count
FROM flows f
LEFT JOIN tasks t ON f.id = t.flow_id
LEFT JOIN subtasks s ON t.id = s.task_id
LEFT JOIN assistants a ON f.id = a.flow_id AND a.deleted_at IS NULL
WHERE f.created_at >= NOW() - INTERVAL '7 days'
AND f.deleted_at IS NULL AND f.user_id = $1
GROUP BY DATE(f.created_at)
ORDER BY date DESC;
Usage example:
// Analyze flow trends
weekStats, err := db.GetFlowsStatsByDayLastWeek(ctx, userID)
for _, stat := range weekStats {
fmt.Printf("Date: %s, Flows: %d, Tasks: %d, Subtasks: %d, Assistants: %d\n",
stat.Date, stat.TotalFlowsCount, stat.TotalTasksCount,
stat.TotalSubtasksCount, stat.TotalAssistantsCount)
}
// Available for different periods
monthStats, err := db.GetFlowsStatsByDayLastMonth(ctx, userID)
quarterStats, err := db.GetFlowsStatsByDayLast3Months(ctx, userID)
Flow Execution Time Analytics
Track actual execution time and tool usage across the flow hierarchy using pre-calculated duration metrics.
Analytics Queries (analytics.sql)
-- name: GetFlowsForPeriodLastWeek :many
-- Get flow IDs created in the last week for analytics
SELECT id, title
FROM flows
WHERE created_at >= NOW() - INTERVAL '7 days'
AND deleted_at IS NULL AND user_id = $1
ORDER BY created_at DESC;
-- name: GetTasksForFlow :many
-- Get all tasks for a flow
SELECT id, title, created_at, updated_at
FROM tasks
WHERE flow_id = $1
ORDER BY id ASC;
-- name: GetSubtasksForTasks :many
-- Get all subtasks for multiple tasks
SELECT id, task_id, title, status, created_at, updated_at
FROM subtasks
WHERE task_id = ANY(@task_ids::BIGINT[])
ORDER BY id ASC;
-- name: GetMsgchainsForFlow :many
-- Get all msgchains for a flow (including task and subtask level)
SELECT id, type, flow_id, task_id, subtask_id, duration_seconds, created_at, updated_at
FROM msgchains
WHERE flow_id = $1
ORDER BY created_at ASC;
-- name: GetToolcallsForFlow :many
-- Get all toolcalls for a flow
SELECT tc.id, tc.status, tc.flow_id, tc.task_id, tc.subtask_id,
tc.duration_seconds, tc.created_at, tc.updated_at
FROM toolcalls tc
LEFT JOIN tasks t ON tc.task_id = t.id
LEFT JOIN subtasks s ON tc.subtask_id = s.id
INNER JOIN flows f ON tc.flow_id = f.id
WHERE tc.flow_id = $1 AND f.deleted_at IS NULL
AND (tc.task_id IS NULL OR t.id IS NOT NULL)
AND (tc.subtask_id IS NULL OR s.id IS NOT NULL)
ORDER BY tc.created_at ASC;
-- name: GetAssistantsCountForFlow :one
-- Get total count of assistants for a specific flow
SELECT COALESCE(COUNT(id), 0)::bigint AS total_assistants_count
FROM assistants
WHERE flow_id = $1 AND deleted_at IS NULL;
Usage example:
// Get execution statistics for flows in a period
flows, _ := db.GetFlowsForPeriodLastWeek(ctx, userID)
for _, flow := range flows {
// Get hierarchical data
tasks, _ := db.GetTasksForFlow(ctx, flow.ID)
// Collect task IDs
taskIDs := make([]int64, len(tasks))
for i, task := range tasks {
taskIDs[i] = task.ID
}
// Get all subtasks for these tasks
subtasks, _ := db.GetSubtasksForTasks(ctx, taskIDs)
// Get msgchains and toolcalls
msgchains, _ := db.GetMsgchainsForFlow(ctx, flow.ID)
toolcalls, _ := db.GetToolcallsForFlow(ctx, flow.ID)
// Get assistants count
assistantsCount, _ := db.GetAssistantsCountForFlow(ctx, flow.ID)
// Build execution stats using converter functions
stats := converter.BuildFlowExecutionStats(
flow.ID, flow.Title, tasks, subtasks, msgchains, toolcalls,
int(assistantsCount),
)
fmt.Printf("Flow: %s, Duration: %.2fs, Toolcalls: %d, Assistants: %d\n",
stats.FlowTitle, stats.TotalDurationSeconds,
stats.TotalToolcallsCount, stats.TotalAssistantsCount)
}
Assistant Usage Tracking
The database tracks assistant usage across flows:
// Get assistant count for a flow
count, err := db.GetAssistantsCountForFlow(ctx, flowID)
// Get all assistants for a flow
assistants, err := db.GetFlowAssistants(ctx, flowID)
// User-scoped assistant access
userAssistants, err := db.GetUserFlowAssistants(ctx, database.GetUserFlowAssistantsParams{
FlowID: flowID,
UserID: userID,
})
Assistant metrics help understand:
- Interactive flow usage: Flows with high assistant counts indicate heavy user interaction
- Delegation patterns: Assistants with
use_agentsflag show delegation behavior - Resource allocation: Track assistant-to-flow ratio for capacity planning
Usage Patterns
Basic Query Operations
// Initialize queries
db := database.New(sqlConnection)
// Create a new flow
flow, err := db.CreateFlow(ctx, database.CreateFlowParams{
Title: "Security Assessment",
Status: "active",
Model: "gpt-4",
ModelProviderName: "my-openai",
ModelProviderType: "openai",
Language: "en",
ToolCallIDTemplate: "call_{r:24:x}",
Functions: []byte(`{"tools": ["nmap", "sqlmap"]}`),
UserID: userID,
})
// Retrieve user's flows
flows, err := db.GetUserFlows(ctx, userID)
// Update flow status
updatedFlow, err := db.UpdateFlowStatus(ctx, database.UpdateFlowStatusParams{
Status: "completed",
ID: flowID,
})
Transaction Support
tx, err := sqlDB.BeginTx(ctx, nil)
if err != nil {
return err
}
defer tx.Rollback()
queries := db.WithTx(tx)
// Perform multiple operations atomically
task, err := queries.CreateTask(ctx, taskParams)
if err != nil {
return err
}
subtask, err := queries.CreateSubtask(ctx, subtaskParams)
if err != nil {
return err
}
return tx.Commit()
User-Scoped Operations
Most queries include user-scoped variants for multi-tenancy:
// Admin access - all flows
allFlows, err := db.GetFlows(ctx)
// User access - only user's flows
userFlows, err := db.GetUserFlows(ctx, userID)
// User-scoped flow access with validation
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
ID: flowID,
UserID: userID,
})
Integration with PentAGI
GraphQL API Integration
The database package integrates with PentAGI's GraphQL API through the converter package:
// In GraphQL resolvers
func (r *queryResolver) Flows(ctx context.Context) ([]*model.Flow, error) {
userID := auth.GetUserID(ctx)
// Fetch from database
flows, err := r.DB.GetUserFlows(ctx, userID)
if err != nil {
return nil, err
}
containers, err := r.DB.GetUserContainers(ctx, userID)
if err != nil {
return nil, err
}
// Convert to GraphQL models
return converter.ConvertFlows(flows, containers), nil
}
AI Agent Integration
The package supports AI agent operations through specialized queries:
// Log agent interactions
agentLog, err := db.CreateAgentLog(ctx, database.CreateAgentLogParams{
Initiator: "pentester",
Executor: "researcher",
Task: "Analyze target application",
Result: resultJSON,
FlowID: flowID,
TaskID: sql.NullInt64{Int64: taskID, Valid: true},
})
// Track tool calls with duration updates
toolCall, err := db.CreateToolcall(ctx, database.CreateToolcallParams{
CallID: callID,
Status: "received",
Name: "nmap_scan",
Args: argsJSON,
FlowID: flowID,
TaskID: sql.NullInt64{Int64: taskID, Valid: true},
SubtaskID: sql.NullInt64{Int64: subtaskID, Valid: true},
})
// Update status with duration delta
startTime := time.Now()
// ... execute toolcall ...
durationDelta := time.Since(startTime).Seconds()
_, err = db.UpdateToolcallFinishedResult(ctx, database.UpdateToolcallFinishedResultParams{
Result: resultJSON,
DurationSeconds: durationDelta,
ID: toolCall.ID,
})
Vector Database Operations
For AI memory and semantic search:
// Log vector operations
vecLog, err := db.CreateVectorStoreLog(ctx, database.CreateVectorStoreLogParams{
Initiator: "memorist",
Executor: "vector_db",
Filter: "vulnerability_data",
Query: "SQL injection techniques",
Action: "search",
Result: resultsJSON,
FlowID: flowID,
})
Best Practices
Error Handling
Always handle database errors appropriately:
flow, err := db.GetUserFlow(ctx, params)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, fmt.Errorf("flow not found")
}
return nil, fmt.Errorf("database error: %w", err)
}
Context Usage
Use context for timeout and cancellation:
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
flows, err := db.GetFlows(ctx)
Null Value Handling
Use provided utilities for null values:
// Converting optional strings
description := database.StringToNullString(optionalDesc)
// Converting back to pointers
descPtr := database.NullStringToPtrString(task.Description)
Security Considerations
Multi-tenancy
All user-facing operations use user-scoped queries to prevent unauthorized access:
GetUserFlows()instead ofGetFlows()GetUserFlowTasks()instead ofGetFlowTasks()- User ID validation in all operations
Soft Deletes
Critical entities use soft deletes to maintain audit trails:
-- Flows and assistants are soft deleted
UPDATE flows SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1
-- Most queries automatically filter soft-deleted records
WHERE f.deleted_at IS NULL
SQL Injection Prevention
sqlc generates parameterized queries that prevent SQL injection:
-- Safe parameterized query
SELECT * FROM flows WHERE user_id = $1 AND id = $2
Performance Considerations
Query Optimization
The database package is designed with performance in mind:
Indexed Queries: All foreign key relationships and frequently queried fields are properly indexed:
-- Primary keys and foreign keys are automatically indexed
-- Common query patterns use indexes for filtering and grouping
-- Flow indexes
CREATE INDEX flows_status_idx ON flows(status);
CREATE INDEX flows_title_idx ON flows(title);
CREATE INDEX flows_language_idx ON flows(language);
CREATE INDEX flows_model_provider_name_idx ON flows(model_provider_name);
CREATE INDEX flows_model_provider_type_idx ON flows(model_provider_type);
CREATE INDEX flows_user_id_idx ON flows(user_id);
CREATE INDEX flows_trace_id_idx ON flows(trace_id);
CREATE INDEX flows_deleted_at_idx ON flows(deleted_at) WHERE deleted_at IS NULL;
-- Task indexes
CREATE INDEX tasks_status_idx ON tasks(status);
CREATE INDEX tasks_title_idx ON tasks(title);
CREATE INDEX tasks_flow_id_idx ON tasks(flow_id);
-- Subtask indexes
CREATE INDEX subtasks_status_idx ON subtasks(status);
CREATE INDEX subtasks_title_idx ON subtasks(title);
CREATE INDEX subtasks_task_id_idx ON subtasks(task_id);
-- MsgChain indexes for analytics and duration tracking
CREATE INDEX msgchains_type_idx ON msgchains(type);
CREATE INDEX msgchains_flow_id_idx ON msgchains(flow_id);
CREATE INDEX msgchains_task_id_idx ON msgchains(task_id);
CREATE INDEX msgchains_subtask_id_idx ON msgchains(subtask_id);
CREATE INDEX msgchains_created_at_idx ON msgchains(created_at);
CREATE INDEX msgchains_model_provider_idx ON msgchains(model_provider);
CREATE INDEX msgchains_model_idx ON msgchains(model);
CREATE INDEX msgchains_model_provider_composite_idx ON msgchains(model, model_provider);
CREATE INDEX msgchains_created_at_flow_id_idx ON msgchains(created_at, flow_id);
CREATE INDEX msgchains_type_flow_id_idx ON msgchains(type, flow_id);
-- Toolcalls indexes for analytics and duration tracking
CREATE INDEX toolcalls_flow_id_idx ON toolcalls(flow_id);
CREATE INDEX toolcalls_task_id_idx ON toolcalls(task_id);
CREATE INDEX toolcalls_subtask_id_idx ON toolcalls(subtask_id);
CREATE INDEX toolcalls_status_idx ON toolcalls(status);
CREATE INDEX toolcalls_name_idx ON toolcalls(name);
CREATE INDEX toolcalls_created_at_idx ON toolcalls(created_at);
CREATE INDEX toolcalls_call_id_idx ON toolcalls(call_id);
-- Assistants indexes for analytics
CREATE INDEX assistants_flow_id_idx ON assistants(flow_id);
CREATE INDEX assistants_deleted_at_idx ON assistants(deleted_at) WHERE deleted_at IS NULL;
CREATE INDEX assistants_created_at_idx ON assistants(created_at);
CREATE INDEX assistants_flow_id_deleted_at_idx ON assistants(flow_id, deleted_at) WHERE deleted_at IS NULL;
CREATE INDEX assistants_flow_id_created_at_idx ON assistants(flow_id, created_at) WHERE deleted_at IS NULL;
-- Additional analytics indexes
CREATE INDEX subtasks_task_id_status_idx ON subtasks(task_id, status);
CREATE INDEX subtasks_status_created_at_idx ON subtasks(status, created_at);
CREATE INDEX toolcalls_flow_id_status_idx ON toolcalls(flow_id, status);
CREATE INDEX toolcalls_name_status_idx ON toolcalls(name, status);
CREATE INDEX msgchains_type_task_id_subtask_id_idx ON msgchains(type, task_id, subtask_id);
CREATE INDEX msgchains_type_created_at_idx ON msgchains(type, created_at);
CREATE INDEX tasks_flow_id_status_idx ON tasks(flow_id, status);
-- Provider indexes
CREATE INDEX providers_user_id_idx ON providers(user_id);
CREATE INDEX providers_type_idx ON providers(type);
CREATE INDEX providers_name_user_id_idx ON providers(name, user_id);
CREATE UNIQUE INDEX providers_name_user_id_unique ON providers(name, user_id) WHERE deleted_at IS NULL;
Note: Some indexes on large text fields (tasks.input, tasks.result, subtasks.description, subtasks.result) have been removed to improve write performance. These fields should use full-text search when needed.
Efficient Joins: User-scoped queries use INNER JOINs to leverage PostgreSQL query planner:
-- Efficient user-scoped access with proper join order
SELECT t.*
FROM tasks t
INNER JOIN flows f ON t.flow_id = f.id -- Fast foreign key join
WHERE f.user_id = $1 AND f.deleted_at IS NULL;
Batch Operations: Use transaction batching for bulk operations:
tx, err := db.BeginTx(ctx, nil)
defer tx.Rollback()
queries := database.New(tx)
for _, item := range items {
if _, err := queries.CreateSubtask(ctx, item); err != nil {
return err
}
}
return tx.Commit()
Connection Pooling
The package provides optimized connection pooling through GORM:
func NewGorm(dsn, dbType string) (*gorm.DB, error) {
db, err := gorm.Open(dbType, dsn)
if err != nil {
return nil, err
}
// Optimized connection settings
db.DB().SetMaxIdleConns(5)
db.DB().SetMaxOpenConns(20)
db.DB().SetConnMaxLifetime(time.Hour)
return db, nil
}
Vector Operations
For pgvector operations, consider:
- Batch embedding inserts for better performance
- Appropriate vector dimensions (typically 512-1536)
- Index configuration for similarity searches
Debugging and Troubleshooting
Query Logging
Enable query logging for debugging:
// GORM logger captures all SQL operations
db.SetLogger(&GormLogger{})
db.LogMode(true)
Log Output Example:
INFO[0000] SELECT * FROM flows WHERE user_id = '1' AND deleted_at IS NULL component=pentagi-gorm duration=2.5ms rows_returned=3
Common Issues and Solutions
1. Foreign Key Constraint Violations
Error: pq: insert or update on table "tasks" violates foreign key constraint
Solution: Ensure parent entities exist before creating child entities:
// Verify flow exists and user has access
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
ID: flowID,
UserID: userID,
})
if err != nil {
return fmt.Errorf("invalid flow: %w", err)
}
// Now safe to create task
task, err := db.CreateTask(ctx, taskParams)
2. Soft Delete Issues
Error: Records not appearing in queries after "deletion"
Solution: Check soft delete filters in custom queries:
-- Always include soft delete filter
WHERE f.deleted_at IS NULL
3. Null Value Handling
Error: sql: Scan error on column index 2: unsupported Scan
Solution: Use proper null value converters:
// When creating
description := database.StringToNullString(optionalDesc)
// When reading
descPtr := database.NullStringToPtrString(row.Description)
Query Performance Analysis
Use PostgreSQL's EXPLAIN for performance analysis:
-- Analyze query performance
EXPLAIN ANALYZE SELECT f.*, COUNT(t.id) as task_count
FROM flows f
LEFT JOIN tasks t ON f.id = t.flow_id
WHERE f.user_id = $1 AND f.deleted_at IS NULL
GROUP BY f.id;
Extending the Database Package
Adding New Entities
- Create migration: Add schema in
backend/migrations/sql/ - Create SQL queries: Add
.sqlfile inbackend/sqlc/models/ - Regenerate code: Run sqlc generation command
- Add converters: Update
converter/converter.gofor GraphQL integration
Example New Entity:
-- backend/sqlc/models/vulnerabilities.sql
-- name: CreateVulnerability :one
INSERT INTO vulnerabilities (
title, severity, description, flow_id
) VALUES (
$1, $2, $3, $4
) RETURNING *;
-- name: GetFlowVulnerabilities :many
SELECT v.*
FROM vulnerabilities v
INNER JOIN flows f ON v.flow_id = f.id
WHERE v.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY v.severity DESC, v.created_at DESC;
Custom Query Patterns
Follow established patterns for consistency:
-- Pattern: User-scoped access
-- name: GetUser[Entity] :one/:many
SELECT [entity].*
FROM [entity] [alias]
INNER JOIN flows f ON [alias].flow_id = f.id
INNER JOIN users u ON f.user_id = u.id
WHERE [conditions] AND f.user_id = $user_id AND f.deleted_at IS NULL;
-- Pattern: Hierarchical retrieval
-- name: Get[Parent][Children] :many
SELECT [child].*
FROM [child] [child_alias]
INNER JOIN [parent] [parent_alias] ON [child_alias].[parent_id] = [parent_alias].id
WHERE [parent_alias].id = $1 AND [filters];
Integration Testing
Test database operations with real PostgreSQL:
func TestCreateFlow(t *testing.T) {
// Setup test database
db := setupTestDB(t)
defer cleanupTestDB(t, db)
queries := database.New(db)
// Test operation
flow, err := queries.CreateFlow(ctx, database.CreateFlowParams{
Title: "Test Flow",
Status: "active",
ModelProvider: "openai",
UserID: 1,
})
assert.NoError(t, err)
assert.Equal(t, "Test Flow", flow.Title)
}
Security Guidelines
Input Validation
Always validate inputs before database operations:
func validateFlowInput(params CreateFlowParams) error {
if len(params.Title) > 255 {
return fmt.Errorf("title too long")
}
if !isValidStatus(params.Status) {
return fmt.Errorf("invalid status")
}
return nil
}
Access Control
Implement consistent access control patterns:
// Always verify user ownership
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
ID: flowID,
UserID: currentUserID,
})
if err != nil {
return fmt.Errorf("access denied or flow not found")
}
Audit Logging
Use logging entities for security audit trails:
// Log sensitive operations
_, err = db.CreateAgentLog(ctx, database.CreateAgentLogParams{
Initiator: "system",
Executor: "user_action",
Task: "flow_deletion",
Result: []byte(fmt.Sprintf(`{"flow_id": %d, "user_id": %d}`, flowID, userID)),
FlowID: flowID,
})
Conclusion
The database package provides a robust, secure, and performant foundation for PentAGI's data layer. By leveraging sqlc for code generation, implementing consistent security patterns, and maintaining comprehensive audit trails, it ensures reliable operation of the autonomous penetration testing system.
Key benefits:
- Type Safety: Compile-time verification of SQL queries
- Performance: Optimized queries with proper indexing
- Security: Multi-tenancy and soft delete support
- Observability: Comprehensive logging and tracing
- Maintainability: Consistent patterns and generated code
For developers working with this package, follow the established patterns for security, performance, and maintainability to ensure smooth integration with the broader PentAGI ecosystem.
This documentation provides a comprehensive overview of the database package's architecture, functionality, and integration within the PentAGI system.