Files
2026-03-26 06:16:07 +03:00

44 KiB

Database Package Documentation

Overview

The database package is a core component of PentAGI that provides a robust, type-safe interface for interacting with PostgreSQL database operations. Built on top of sqlc, this package automatically generates Go code from SQL queries, ensuring compile-time safety and eliminating the need for manual ORM mapping.

PentAGI uses PostgreSQL with the pgvector extension to support vector embeddings for AI-powered semantic search and memory storage capabilities.

Architecture

Database Technology Stack

  • Database Engine: PostgreSQL 15+ with pgvector extension
  • Code Generation: sqlc for type-safe SQL-to-Go compilation
  • ORM Support: GORM v1 for advanced operations and HTTP server handlers
  • Schema Management: Database migrations located in backend/migrations/
  • Vector Operations: pgvector extension for AI embeddings and semantic search

Entity Relationship Model

The database follows PentAGI's hierarchical data model for penetration testing workflows:

Flow (Top-level workflow)
├── Task (Major testing phases)
│   └── SubTask (Specific agent assignments)
│       └── Action (Individual operations)
│           ├── Artifact (Output files/data)
│           └── Memory (Knowledge/observations)
└── Assistant (AI assistants for flows)
    └── AssistantLog (Assistant interaction logs)

Additional supporting entities include:

  • Container: Docker containers for isolated execution
  • User: System users with role-based access
  • MsgChain: LLM conversation chains
  • ToolCall: Function calls made by AI agents
  • Various Logs: Comprehensive audit trail for all operations

SQL Query Organization

The database package is built on a comprehensive set of SQL queries organized by entity type in the backend/sqlc/models/ directory. Each file contains CRUD operations and specialized queries for its respective entity.

Query File Structure

File Entity Purpose
flows.sql Flow Top-level workflow management and analytics
tasks.sql Task Task lifecycle and status tracking
subtasks.sql SubTask Agent assignment and execution
assistants.sql Assistant AI assistant management
containers.sql Container Docker environment tracking
users.sql User User management and authentication
roles.sql Role Role-based access control
prompts.sql Prompt User-defined prompt templates
providers.sql Provider LLM provider configurations
msgchains.sql MsgChain LLM conversation chains and usage stats
toolcalls.sql ToolCall AI function call tracking and analytics
screenshots.sql Screenshot Visual artifacts storage
analytics.sql Analytics Flow execution time and hierarchy analytics
Logging Entities
agentlogs.sql AgentLog Inter-agent communication
assistantlogs.sql AssistantLog Human-assistant interactions
msglogs.sql MsgLog General message logging
searchlogs.sql SearchLog External search operations
termlogs.sql TermLog Terminal command execution
vecstorelogs.sql VecStoreLog Vector database operations

Query Naming Conventions

sqlc queries follow consistent naming patterns:

-- CRUD Operations
-- name: Create[Entity] :one
-- name: Get[Entity] :one
-- name: Get[Entities] :many
-- name: Update[Entity] :one
-- name: Delete[Entity] :exec/:one

-- Scoped Operations
-- name: GetUser[Entity] :one
-- name: GetUser[Entities] :many
-- name: GetFlow[Entity] :one
-- name: GetFlow[Entities] :many

-- Specialized Queries
-- name: Get[Entity][Condition] :many
-- name: Update[Entity][Field] :one

Security and Multi-tenancy Patterns

Most queries implement user-scoped access through JOIN operations:

-- Example: User-scoped flow access
-- name: GetUserFlow :one
SELECT f.*
FROM flows f
INNER JOIN users u ON f.user_id = u.id
WHERE f.id = $1 AND f.user_id = $2 AND f.deleted_at IS NULL;

-- Example: Flow-scoped task access
-- name: GetFlowTasks :many
SELECT t.*
FROM tasks t
INNER JOIN flows f ON t.flow_id = f.id
WHERE t.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY t.created_at ASC;

Soft Delete Implementation

Critical entities implement soft deletes to maintain audit trails:

-- Soft delete operation
-- name: DeleteFlow :one
UPDATE flows
SET deleted_at = CURRENT_TIMESTAMP
WHERE id = $1
RETURNING *;

-- All queries filter soft-deleted records
WHERE f.deleted_at IS NULL

Logging Query Patterns

Logging entities follow consistent patterns for audit trails:

-- name: CreateAgentLog :one
INSERT INTO agentlogs (
  initiator,     -- AI agent that initiated the action
  executor,      -- AI agent that executed the action
  task,          -- Description of the task
  result,        -- JSON result of the operation
  flow_id,       -- Associated flow
  task_id,       -- Associated task (nullable)
  subtask_id     -- Associated subtask (nullable)
) VALUES (
  $1, $2, $3, $4, $5, $6, $7
) RETURNING *;

-- Hierarchical retrieval with security joins
-- name: GetFlowAgentLogs :many
SELECT al.*
FROM agentlogs al
INNER JOIN flows f ON al.flow_id = f.id
WHERE al.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY al.created_at ASC;

Complex Query Examples

Message Chain Management

-- Get conversation chains for a specific task
-- name: GetTaskPrimaryMsgChains :many
SELECT mc.*
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
WHERE (mc.task_id = $1 OR s.task_id = $1) AND mc.type = 'primary_agent'
ORDER BY mc.created_at DESC;

-- Update conversation usage tracking with duration
-- name: UpdateMsgChainUsage :one
UPDATE msgchains
SET 
  usage_in = usage_in + $1, 
  usage_out = usage_out + $2,
  usage_cache_in = usage_cache_in + $3,
  usage_cache_out = usage_cache_out + $4,
  usage_cost_in = usage_cost_in + $5,
  usage_cost_out = usage_cost_out + $6,
  duration_seconds = duration_seconds + $7
WHERE id = $8
RETURNING *;

// Get usage statistics for a specific flow
-- name: GetFlowUsageStats :one
SELECT
  COALESCE(SUM(mc.usage_in), 0) AS total_usage_in,
  COALESCE(SUM(mc.usage_out), 0) AS total_usage_out,
  COALESCE(SUM(mc.usage_cache_in), 0) AS total_usage_cache_in,
  COALESCE(SUM(mc.usage_cache_out), 0) AS total_usage_cache_out,
  COALESCE(SUM(mc.usage_cost_in), 0.0) AS total_usage_cost_in,
  COALESCE(SUM(mc.usage_cost_out), 0.0) AS total_usage_cost_out
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
LEFT JOIN tasks t ON s.task_id = t.id OR mc.task_id = t.id
INNER JOIN flows f ON (mc.flow_id = f.id OR t.flow_id = f.id)
WHERE (mc.flow_id = $1 OR t.flow_id = $1) AND f.deleted_at IS NULL;

Container Management with Constraints

-- Upsert container with conflict resolution
-- name: CreateContainer :one
INSERT INTO containers (
  type, name, image, status, flow_id, local_id, local_dir
) VALUES (
  $1, $2, $3, $4, $5, $6, $7
)
ON CONFLICT ON CONSTRAINT containers_local_id_unique
DO UPDATE SET
  type = EXCLUDED.type,
  name = EXCLUDED.name,
  image = EXCLUDED.image,
  status = EXCLUDED.status,
  flow_id = EXCLUDED.flow_id,
  local_dir = EXCLUDED.local_dir
RETURNING *;

Role-Based Access Control

-- Complex role aggregation
-- name: GetUser :one
SELECT
  u.*,
  r.name AS role_name,
  (
    SELECT ARRAY_AGG(p.name)
    FROM privileges p
    WHERE p.role_id = r.id
  ) AS privileges
FROM users u
INNER JOIN roles r ON u.role_id = r.id
WHERE u.id = $1;

Code Generation with sqlc

Configuration

The package uses sqlc for code generation with the following configuration (sqlc/sqlc.yml):

version: "2"
sql:
  - engine: "postgresql"
    queries: ["models/*.sql"]
    schema: ["../migrations/sql/*.sql"]
    gen:
      go:
        package: "database"
        out: "../pkg/database"
        sql_package: "database/sql"
        emit_interface: true
        emit_json_tags: true
    database:
      uri: ${DATABASE_URL}

Generation Command

Code generation is performed using Docker to ensure consistency:

docker run --rm -v "$(pwd):/src" --network pentagi-network \
  -e DATABASE_URL='postgres://postgres:postgres@pgvector:5432/pentagidb?sslmode=disable' \
  -w /src sqlc/sqlc:1.27.0 generate -f sqlc/sqlc.yml

This command:

  1. Mounts the current directory into the container
  2. Connects to the PentAGI database network
  3. Uses the PostgreSQL database URL for schema introspection
  4. Generates type-safe Go code from SQL queries

Core Components

1. Database Interface (db.go)

Provides the foundational database transaction interface:

type DBTX interface {
    ExecContext(context.Context, string, ...interface{}) (sql.Result, error)
    PrepareContext(context.Context, string) (*sql.Stmt, error)
    QueryContext(context.Context, string, ...interface{}) (*sql.Rows, error)
    QueryRowContext(context.Context, string, ...interface{}) *sql.Row
}

type Queries struct {
    db DBTX
}

Key Features:

  • Generic database transaction interface
  • Support for both direct database connections and transactions
  • Thread-safe query execution
  • Context-aware operations for timeout handling

2. Database Utilities (database.go)

Contains utility functions and GORM integration:

// Null value converters
func StringToNullString(s string) sql.NullString
func NullStringToPtrString(s sql.NullString) *string
func Int64ToNullInt64(i *int64) sql.NullInt64
func NullInt64ToInt64(i sql.NullInt64) *int64
func TimeToNullTime(t time.Time) sql.NullTime

// GORM configuration
func NewGorm(dsn, dbType string) (*gorm.DB, error)

Key Features:

  • Null value handling for optional database fields
  • GORM integration with custom logging
  • Connection pooling configuration
  • OpenTelemetry observability integration

3. Query Interface (querier.go)

Auto-generated interface containing all database operations:

type Querier interface {
    // Flow operations
    CreateFlow(ctx context.Context, arg CreateFlowParams) (Flow, error)
    GetFlows(ctx context.Context) ([]Flow, error)
    GetUserFlow(ctx context.Context, arg GetUserFlowParams) (Flow, error)
    UpdateFlowStatus(ctx context.Context, arg UpdateFlowStatusParams) (Flow, error)
    DeleteFlow(ctx context.Context, id int64) (Flow, error)

    // Task operations
    CreateTask(ctx context.Context, arg CreateTaskParams) (Task, error)
    GetFlowTasks(ctx context.Context, flowID int64) ([]Task, error)
    UpdateTaskStatus(ctx context.Context, arg UpdateTaskStatusParams) (Task, error)

    // ... 150+ additional methods
}

Features:

  • Complete CRUD operations for all entities
  • User-scoped queries for multi-tenancy
  • Efficient joins with foreign key relationships
  • Soft delete support for critical entities

4. Model Converters (converter/converter.go)

Converts database models to GraphQL schema types:

func ConvertFlows(flows []database.Flow, containers []database.Container) []*model.Flow
func ConvertFlow(flow database.Flow, containers []database.Container) *model.Flow
func ConvertTasks(tasks []database.Task, subtasks []database.Subtask) []*model.Task
func ConvertAssistants(assistants []database.Assistant) []*model.Assistant

Key Functions:

  • Transform database types to GraphQL models
  • Handle relationship mapping (flows → tasks → subtasks)
  • Null value processing for optional fields
  • Aggregation of related entities

Data Models

Core Workflow Entities

Flow

Top-level penetration testing workflow:

  • id, title, status (active/completed/failed)
  • model, model_provider_name, model_provider_type for AI configuration
  • language for localization
  • tool_call_id_template for customizing tool call ID format
  • functions as JSON for AI behavior
  • trace_id for observability
  • user_id for multi-tenancy
  • Soft delete with deleted_at

Note: Prompts are no longer stored in flows. They are managed separately through the prompts table and loaded dynamically based on PROMPT_TYPE.

Task

Major phases within a flow:

  • id, flow_id, title, status (pending/running/done/failed)
  • input for task parameters
  • result JSON for task outputs
  • Creation and update timestamps

SubTask

Specific assignments for AI agents:

  • id, task_id, title, description
  • status (created/waiting/running/finished/failed)
  • result and context JSON fields
  • Agent type classification

Supporting Entities

Container

Docker execution environments:

  • type (primary/secondary), name, image
  • status (starting/running/stopped)
  • local_id for Docker integration
  • local_dir for volume mapping

Assistant

AI assistants for interactive flows:

  • title, status, model, model_provider_name, model_provider_type
  • language for localization
  • tool_call_id_template for customizing tool call ID format
  • functions configuration as JSON
  • use_agents flag for delegation behavior
  • msgchain_id for conversation tracking
  • Flow association and soft delete

Note: Prompts are managed separately through the prompts table, not stored in assistants.

Message Chains (MsgChain)

LLM conversation management and usage tracking:

  • type (primary_agent/assistant/generator/refiner/reporter/etc.)
  • model, model_provider for tracking
  • Token usage tracking:
    • usage_in, usage_out - input/output tokens
    • usage_cache_in, usage_cache_out - cached tokens (for prompt caching)
    • usage_cost_in, usage_cost_out - cost tracking in currency units
  • Duration tracking:
    • duration_seconds - pre-calculated execution duration (DOUBLE PRECISION, NOT NULL, DEFAULT 0.0)
    • Automatically incremented during updates using delta from backend
    • Provides fast analytics without real-time calculations
  • chain JSON for conversation history
  • Multi-level association (flow/task/subtask)
  • Creation and update timestamps for temporal analysis

Provider

LLM provider configurations for multi-provider support:

  • type - PROVIDER_TYPE enum (openai/anthropic/gemini/bedrock/deepseek/glm/kimi/qwen/ollama/custom)
  • name - user-defined provider name
  • config - JSON configuration for API keys and settings
  • user_id - user ownership
  • Soft delete with deleted_at
  • Unique constraint on (name, user_id) for active providers

Prompt

Centralized prompt template management:

  • type - PROMPT_TYPE enum (primary_agent/assistant/pentester/coder/etc.)
  • prompt - template content
  • user_id - user ownership
  • Creation and update timestamps

Logging Entities

The package provides comprehensive logging for all system operations:

  • AgentLog: Inter-agent communication and delegation
  • AssistantLog: Human-assistant interactions
  • MsgLog: General message logging (thoughts/browser/terminal/file/search/advice/ask/input/done)
  • SearchLog: External search operations (google/tavily/traversaal/browser/duckduckgo/perplexity/sploitus/searxng)
  • TermLog: Terminal command execution (stdin/stdout/stderr)
  • ToolCall: AI function calling with duration tracking
    • duration_seconds - pre-calculated execution duration (DOUBLE PRECISION, NOT NULL, DEFAULT 0.0)
    • Automatically incremented during status updates using delta from backend
    • Only counts completed toolcalls (finished/failed) in analytics
  • VecStoreLog: Vector database operations

LLM Usage Analytics

The database package provides comprehensive analytics for tracking LLM usage, costs, and performance across all levels of the workflow hierarchy. This enables detailed monitoring of AI resource consumption and cost optimization.

Usage Tracking Fields

The msgchains table tracks six key metrics for each conversation:

Field Type Description
usage_in BIGINT Input tokens consumed
usage_out BIGINT Output tokens generated
usage_cache_in BIGINT Cached input tokens (for prompt caching)
usage_cache_out BIGINT Cached output tokens
usage_cost_in DOUBLE PRECISION Input cost in currency units
usage_cost_out DOUBLE PRECISION Output cost in currency units

Analytics Queries

1. Hierarchical Usage Statistics

Get aggregated usage for specific entities:

// Get total usage for a flow
stats, err := db.GetFlowUsageStats(ctx, flowID)

// Get total usage for a task
stats, err := db.GetTaskUsageStats(ctx, taskID)

// Get total usage for a subtask
stats, err := db.GetSubtaskUsageStats(ctx, subtaskID)

// Get usage for all flows (grouped by flow_id)
allStats, err := db.GetAllFlowsUsageStats(ctx)

Each query returns:

type UsageStats struct {
    TotalUsageIn      int64   // Total input tokens
    TotalUsageOut     int64   // Total output tokens
    TotalUsageCacheIn int64   // Total cached input tokens
    TotalUsageCacheOut int64  // Total cached output tokens
    TotalUsageCostIn  float64 // Total input cost
    TotalUsageCostOut float64 // Total output cost
}

2. Provider and Model Analytics

Track usage by LLM provider or specific model:

-- Get usage statistics grouped by provider
-- name: GetUsageStatsByProvider :many
SELECT
  mc.model_provider,
  COALESCE(SUM(mc.usage_in), 0) AS total_usage_in,
  COALESCE(SUM(mc.usage_out), 0) AS total_usage_out,
  COALESCE(SUM(mc.usage_cache_in), 0) AS total_usage_cache_in,
  COALESCE(SUM(mc.usage_cache_out), 0) AS total_usage_cache_out,
  COALESCE(SUM(mc.usage_cost_in), 0.0) AS total_usage_cost_in,
  COALESCE(SUM(mc.usage_cost_out), 0.0) AS total_usage_cost_out
FROM msgchains mc
LEFT JOIN subtasks s ON mc.subtask_id = s.id
LEFT JOIN tasks t ON s.task_id = t.id OR mc.task_id = t.id
INNER JOIN flows f ON (mc.flow_id = f.id OR t.flow_id = f.id)
WHERE f.deleted_at IS NULL
GROUP BY mc.model_provider
ORDER BY mc.model_provider;

-- Get usage statistics grouped by model
-- name: GetUsageStatsByModel :many
-- Similar structure, GROUP BY mc.model, mc.model_provider

Usage example:

// Analyze costs per provider
providerStats, err := db.GetUsageStatsByProvider(ctx)
for _, stat := range providerStats {
    totalCost := stat.TotalUsageCostIn + stat.TotalUsageCostOut
    fmt.Printf("Provider: %s, Total Cost: $%.2f\n", 
        stat.ModelProvider, totalCost)
}

// Compare model efficiency
modelStats, err := db.GetUsageStatsByModel(ctx)

3. Agent Type Analytics

Track usage by agent type (primary_agent, assistant, pentester, coder, etc.):

// Get usage by type across all flows
typeStats, err := db.GetUsageStatsByType(ctx)

// Get usage by type for a specific flow
flowTypeStats, err := db.GetUsageStatsByTypeForFlow(ctx, flowID)

This helps identify which agent types consume the most resources.

4. Temporal Analytics

Analyze usage trends over time:

// Last 7 days
weekStats, err := db.GetUsageStatsByDayLastWeek(ctx)

// Last 30 days
monthStats, err := db.GetUsageStatsByDayLastMonth(ctx)

// Last 90 days
quarterStats, err := db.GetUsageStatsByDayLast3Months(ctx)

Each query returns daily aggregates:

type DailyUsageStats struct {
    Date              time.Time
    TotalUsageIn      int64
    TotalUsageOut     int64
    TotalUsageCacheIn int64
    TotalUsageCacheOut int64
    TotalUsageCostIn  float64
    TotalUsageCostOut float64
}

Usage Tracking Implementation

When making LLM API calls, update usage metrics with duration:

// After receiving LLM response
startTime := time.Now()
// ... make LLM API call ...
durationDelta := time.Since(startTime).Seconds()

_, err := db.UpdateMsgChainUsage(ctx, database.UpdateMsgChainUsageParams{
    UsageIn:         response.Usage.PromptTokens,
    UsageOut:        response.Usage.CompletionTokens,
    UsageCacheIn:    response.Usage.PromptCacheTokens,
    UsageCacheOut:   response.Usage.CompletionCacheTokens,
    UsageCostIn:     calculateCost(response.Usage.PromptTokens, inputRate),
    UsageCostOut:    calculateCost(response.Usage.CompletionTokens, outputRate),
    DurationSeconds: durationDelta,
    ID:             msgChainID,
})

Performance Considerations

All analytics queries are optimized with appropriate indexes:

  • Soft delete filtering: flows_deleted_at_idx - partial index for active flows only
  • Time-based queries: msgchains_created_at_idx - for temporal filtering
  • Provider analytics: msgchains_model_provider_idx - for grouping by provider
  • Model analytics: msgchains_model_provider_composite_idx - composite index
  • Type analytics: msgchains_type_flow_id_idx - for flow-scoped type queries

These indexes ensure fast query execution even with millions of message chain records.

Analytics-Specific Indexes

Additional indexes optimized for analytics queries:

Assistants Analytics:

  • assistants_deleted_at_idx - Partial index for soft delete filtering (WHERE deleted_at IS NULL)
  • assistants_created_at_idx - Temporal queries and sorting by creation date
  • assistants_flow_id_deleted_at_idx - Flow-scoped queries with soft delete (GetFlowAssistants)
  • assistants_flow_id_created_at_idx - Temporal analytics by flow (GetFlowsStatsByDay*)

Subtasks Analytics:

  • subtasks_task_id_status_idx - Task-scoped queries with status filtering
  • subtasks_status_created_at_idx - Execution time analytics (excludes created/waiting)

Toolcalls Analytics:

  • toolcalls_flow_id_status_idx - Flow-scoped completed toolcalls counting
  • toolcalls_name_status_idx - Function-based analytics with status filtering

MsgChains Analytics:

  • msgchains_type_task_id_subtask_id_idx - Hierarchical msgchain lookup by type
  • msgchains_type_created_at_idx - Temporal analytics grouped by msgchain type

Tasks Analytics:

  • tasks_flow_id_status_idx - Flow-scoped task queries with status filtering

Cost Optimization Strategies

Use analytics data to optimize LLM costs:

  1. Identify expensive flows: GetAllFlowsUsageStats() to find high-cost workflows
  2. Compare providers: GetUsageStatsByProvider() to choose cost-effective providers
  3. Optimize agent types: GetUsageStatsByType() to reduce token usage per agent
  4. Monitor trends: Temporal queries to detect unusual spikes in usage
  5. Cache effectiveness: Compare usage_cache_in vs usage_in to measure prompt caching benefits

Example cost analysis:

// Calculate cache savings
stats, _ := db.GetFlowUsageStats(ctx, flowID)
regularTokens := stats.TotalUsageIn + stats.TotalUsageOut
cachedTokens := stats.TotalUsageCacheIn + stats.TotalUsageCacheOut
cacheRatio := float64(cachedTokens) / float64(regularTokens+cachedTokens)
savings := stats.TotalUsageCostIn * (cacheRatio * 0.9) // Assuming 90% cache discount

fmt.Printf("Cache effectiveness: %.1f%%\n", cacheRatio*100)
fmt.Printf("Estimated savings: $%.2f\n", savings)

Flows and Structure Analytics

The database package provides comprehensive analytics for tracking flow structure, execution metrics, and assistant usage across the workflow hierarchy.

Flow Structure Queries

1. Flow-Level Statistics

Get structural metrics for specific flows:

// Get structure stats for a flow
stats, err := db.GetFlowStats(ctx, flowID)
// Returns: total_tasks_count, total_subtasks_count, total_assistants_count

// Get total stats for all user's flows
allStats, err := db.GetUserTotalFlowsStats(ctx, userID)
// Returns: total_flows_count, total_tasks_count, total_subtasks_count, total_assistants_count

Each query returns:

type FlowStats struct {
    TotalTasksCount      int64
    TotalSubtasksCount   int64
    TotalAssistantsCount int64
}

type FlowsStats struct {
    TotalFlowsCount      int64
    TotalTasksCount      int64
    TotalSubtasksCount   int64
    TotalAssistantsCount int64
}

2. Temporal Flow Statistics

Track flow creation and structure over time:

-- Get flows stats by day for the last week
-- name: GetFlowsStatsByDayLastWeek :many
SELECT
  DATE(f.created_at) AS date,
  COALESCE(COUNT(DISTINCT f.id), 0)::bigint AS total_flows_count,
  COALESCE(COUNT(DISTINCT t.id), 0)::bigint AS total_tasks_count,
  COALESCE(COUNT(DISTINCT s.id), 0)::bigint AS total_subtasks_count,
  COALESCE(COUNT(DISTINCT a.id), 0)::bigint AS total_assistants_count
FROM flows f
LEFT JOIN tasks t ON f.id = t.flow_id
LEFT JOIN subtasks s ON t.id = s.task_id
LEFT JOIN assistants a ON f.id = a.flow_id AND a.deleted_at IS NULL
WHERE f.created_at >= NOW() - INTERVAL '7 days' 
  AND f.deleted_at IS NULL AND f.user_id = $1
GROUP BY DATE(f.created_at)
ORDER BY date DESC;

Usage example:

// Analyze flow trends
weekStats, err := db.GetFlowsStatsByDayLastWeek(ctx, userID)
for _, stat := range weekStats {
    fmt.Printf("Date: %s, Flows: %d, Tasks: %d, Subtasks: %d, Assistants: %d\n",
        stat.Date, stat.TotalFlowsCount, stat.TotalTasksCount, 
        stat.TotalSubtasksCount, stat.TotalAssistantsCount)
}

// Available for different periods
monthStats, err := db.GetFlowsStatsByDayLastMonth(ctx, userID)
quarterStats, err := db.GetFlowsStatsByDayLast3Months(ctx, userID)

Flow Execution Time Analytics

Track actual execution time and tool usage across the flow hierarchy using pre-calculated duration metrics.

Analytics Queries (analytics.sql)

-- name: GetFlowsForPeriodLastWeek :many
-- Get flow IDs created in the last week for analytics
SELECT id, title
FROM flows
WHERE created_at >= NOW() - INTERVAL '7 days' 
  AND deleted_at IS NULL AND user_id = $1
ORDER BY created_at DESC;

-- name: GetTasksForFlow :many
-- Get all tasks for a flow
SELECT id, title, created_at, updated_at
FROM tasks
WHERE flow_id = $1
ORDER BY id ASC;

-- name: GetSubtasksForTasks :many
-- Get all subtasks for multiple tasks
SELECT id, task_id, title, status, created_at, updated_at
FROM subtasks
WHERE task_id = ANY(@task_ids::BIGINT[])
ORDER BY id ASC;

-- name: GetMsgchainsForFlow :many
-- Get all msgchains for a flow (including task and subtask level)
SELECT id, type, flow_id, task_id, subtask_id, duration_seconds, created_at, updated_at
FROM msgchains
WHERE flow_id = $1
ORDER BY created_at ASC;

-- name: GetToolcallsForFlow :many
-- Get all toolcalls for a flow
SELECT tc.id, tc.status, tc.flow_id, tc.task_id, tc.subtask_id, 
       tc.duration_seconds, tc.created_at, tc.updated_at
FROM toolcalls tc
LEFT JOIN tasks t ON tc.task_id = t.id
LEFT JOIN subtasks s ON tc.subtask_id = s.id
INNER JOIN flows f ON tc.flow_id = f.id
WHERE tc.flow_id = $1 AND f.deleted_at IS NULL
  AND (tc.task_id IS NULL OR t.id IS NOT NULL)
  AND (tc.subtask_id IS NULL OR s.id IS NOT NULL)
ORDER BY tc.created_at ASC;

-- name: GetAssistantsCountForFlow :one
-- Get total count of assistants for a specific flow
SELECT COALESCE(COUNT(id), 0)::bigint AS total_assistants_count
FROM assistants
WHERE flow_id = $1 AND deleted_at IS NULL;

Usage example:

// Get execution statistics for flows in a period
flows, _ := db.GetFlowsForPeriodLastWeek(ctx, userID)

for _, flow := range flows {
    // Get hierarchical data
    tasks, _ := db.GetTasksForFlow(ctx, flow.ID)
    
    // Collect task IDs
    taskIDs := make([]int64, len(tasks))
    for i, task := range tasks {
        taskIDs[i] = task.ID
    }
    
    // Get all subtasks for these tasks
    subtasks, _ := db.GetSubtasksForTasks(ctx, taskIDs)
    
    // Get msgchains and toolcalls
    msgchains, _ := db.GetMsgchainsForFlow(ctx, flow.ID)
    toolcalls, _ := db.GetToolcallsForFlow(ctx, flow.ID)
    
    // Get assistants count
    assistantsCount, _ := db.GetAssistantsCountForFlow(ctx, flow.ID)
    
    // Build execution stats using converter functions
    stats := converter.BuildFlowExecutionStats(
        flow.ID, flow.Title, tasks, subtasks, msgchains, toolcalls, 
        int(assistantsCount),
    )
    
    fmt.Printf("Flow: %s, Duration: %.2fs, Toolcalls: %d, Assistants: %d\n",
        stats.FlowTitle, stats.TotalDurationSeconds, 
        stats.TotalToolcallsCount, stats.TotalAssistantsCount)
}

Assistant Usage Tracking

The database tracks assistant usage across flows:

// Get assistant count for a flow
count, err := db.GetAssistantsCountForFlow(ctx, flowID)

// Get all assistants for a flow
assistants, err := db.GetFlowAssistants(ctx, flowID)

// User-scoped assistant access
userAssistants, err := db.GetUserFlowAssistants(ctx, database.GetUserFlowAssistantsParams{
    FlowID: flowID,
    UserID: userID,
})

Assistant metrics help understand:

  • Interactive flow usage: Flows with high assistant counts indicate heavy user interaction
  • Delegation patterns: Assistants with use_agents flag show delegation behavior
  • Resource allocation: Track assistant-to-flow ratio for capacity planning

Usage Patterns

Basic Query Operations

// Initialize queries
db := database.New(sqlConnection)

// Create a new flow
flow, err := db.CreateFlow(ctx, database.CreateFlowParams{
    Title:              "Security Assessment",
    Status:             "active",
    Model:              "gpt-4",
    ModelProviderName:  "my-openai",
    ModelProviderType:  "openai",
    Language:           "en",
    ToolCallIDTemplate: "call_{r:24:x}",
    Functions:          []byte(`{"tools": ["nmap", "sqlmap"]}`),
    UserID:             userID,
})

// Retrieve user's flows
flows, err := db.GetUserFlows(ctx, userID)

// Update flow status
updatedFlow, err := db.UpdateFlowStatus(ctx, database.UpdateFlowStatusParams{
    Status: "completed",
    ID:     flowID,
})

Transaction Support

tx, err := sqlDB.BeginTx(ctx, nil)
if err != nil {
    return err
}
defer tx.Rollback()

queries := db.WithTx(tx)

// Perform multiple operations atomically
task, err := queries.CreateTask(ctx, taskParams)
if err != nil {
    return err
}

subtask, err := queries.CreateSubtask(ctx, subtaskParams)
if err != nil {
    return err
}

return tx.Commit()

User-Scoped Operations

Most queries include user-scoped variants for multi-tenancy:

// Admin access - all flows
allFlows, err := db.GetFlows(ctx)

// User access - only user's flows
userFlows, err := db.GetUserFlows(ctx, userID)

// User-scoped flow access with validation
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
    ID:     flowID,
    UserID: userID,
})

Integration with PentAGI

GraphQL API Integration

The database package integrates with PentAGI's GraphQL API through the converter package:

// In GraphQL resolvers
func (r *queryResolver) Flows(ctx context.Context) ([]*model.Flow, error) {
    userID := auth.GetUserID(ctx)

    // Fetch from database
    flows, err := r.DB.GetUserFlows(ctx, userID)
    if err != nil {
        return nil, err
    }

    containers, err := r.DB.GetUserContainers(ctx, userID)
    if err != nil {
        return nil, err
    }

    // Convert to GraphQL models
    return converter.ConvertFlows(flows, containers), nil
}

AI Agent Integration

The package supports AI agent operations through specialized queries:

// Log agent interactions
agentLog, err := db.CreateAgentLog(ctx, database.CreateAgentLogParams{
    Initiator: "pentester",
    Executor:  "researcher",
    Task:      "Analyze target application",
    Result:    resultJSON,
    FlowID:    flowID,
    TaskID:    sql.NullInt64{Int64: taskID, Valid: true},
})

// Track tool calls with duration updates
toolCall, err := db.CreateToolcall(ctx, database.CreateToolcallParams{
    CallID:    callID,
    Status:    "received",
    Name:      "nmap_scan",
    Args:      argsJSON,
    FlowID:    flowID,
    TaskID:    sql.NullInt64{Int64: taskID, Valid: true},
    SubtaskID: sql.NullInt64{Int64: subtaskID, Valid: true},
})

// Update status with duration delta
startTime := time.Now()
// ... execute toolcall ...
durationDelta := time.Since(startTime).Seconds()

_, err = db.UpdateToolcallFinishedResult(ctx, database.UpdateToolcallFinishedResultParams{
    Result:          resultJSON,
    DurationSeconds: durationDelta,
    ID:              toolCall.ID,
})

Vector Database Operations

For AI memory and semantic search:

// Log vector operations
vecLog, err := db.CreateVectorStoreLog(ctx, database.CreateVectorStoreLogParams{
    Initiator: "memorist",
    Executor:  "vector_db",
    Filter:    "vulnerability_data",
    Query:     "SQL injection techniques",
    Action:    "search",
    Result:    resultsJSON,
    FlowID:    flowID,
})

Best Practices

Error Handling

Always handle database errors appropriately:

flow, err := db.GetUserFlow(ctx, params)
if err != nil {
    if errors.Is(err, sql.ErrNoRows) {
        return nil, fmt.Errorf("flow not found")
    }
    return nil, fmt.Errorf("database error: %w", err)
}

Context Usage

Use context for timeout and cancellation:

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

flows, err := db.GetFlows(ctx)

Null Value Handling

Use provided utilities for null values:

// Converting optional strings
description := database.StringToNullString(optionalDesc)

// Converting back to pointers
descPtr := database.NullStringToPtrString(task.Description)

Security Considerations

Multi-tenancy

All user-facing operations use user-scoped queries to prevent unauthorized access:

  • GetUserFlows() instead of GetFlows()
  • GetUserFlowTasks() instead of GetFlowTasks()
  • User ID validation in all operations

Soft Deletes

Critical entities use soft deletes to maintain audit trails:

-- Flows and assistants are soft deleted
UPDATE flows SET deleted_at = CURRENT_TIMESTAMP WHERE id = $1

-- Most queries automatically filter soft-deleted records
WHERE f.deleted_at IS NULL

SQL Injection Prevention

sqlc generates parameterized queries that prevent SQL injection:

-- Safe parameterized query
SELECT * FROM flows WHERE user_id = $1 AND id = $2

Performance Considerations

Query Optimization

The database package is designed with performance in mind:

Indexed Queries: All foreign key relationships and frequently queried fields are properly indexed:

-- Primary keys and foreign keys are automatically indexed
-- Common query patterns use indexes for filtering and grouping

-- Flow indexes
CREATE INDEX flows_status_idx ON flows(status);
CREATE INDEX flows_title_idx ON flows(title);
CREATE INDEX flows_language_idx ON flows(language);
CREATE INDEX flows_model_provider_name_idx ON flows(model_provider_name);
CREATE INDEX flows_model_provider_type_idx ON flows(model_provider_type);
CREATE INDEX flows_user_id_idx ON flows(user_id);
CREATE INDEX flows_trace_id_idx ON flows(trace_id);
CREATE INDEX flows_deleted_at_idx ON flows(deleted_at) WHERE deleted_at IS NULL;

-- Task indexes  
CREATE INDEX tasks_status_idx ON tasks(status);
CREATE INDEX tasks_title_idx ON tasks(title);
CREATE INDEX tasks_flow_id_idx ON tasks(flow_id);

-- Subtask indexes
CREATE INDEX subtasks_status_idx ON subtasks(status);
CREATE INDEX subtasks_title_idx ON subtasks(title);
CREATE INDEX subtasks_task_id_idx ON subtasks(task_id);

-- MsgChain indexes for analytics and duration tracking
CREATE INDEX msgchains_type_idx ON msgchains(type);
CREATE INDEX msgchains_flow_id_idx ON msgchains(flow_id);
CREATE INDEX msgchains_task_id_idx ON msgchains(task_id);
CREATE INDEX msgchains_subtask_id_idx ON msgchains(subtask_id);
CREATE INDEX msgchains_created_at_idx ON msgchains(created_at);
CREATE INDEX msgchains_model_provider_idx ON msgchains(model_provider);
CREATE INDEX msgchains_model_idx ON msgchains(model);
CREATE INDEX msgchains_model_provider_composite_idx ON msgchains(model, model_provider);
CREATE INDEX msgchains_created_at_flow_id_idx ON msgchains(created_at, flow_id);
CREATE INDEX msgchains_type_flow_id_idx ON msgchains(type, flow_id);

-- Toolcalls indexes for analytics and duration tracking
CREATE INDEX toolcalls_flow_id_idx ON toolcalls(flow_id);
CREATE INDEX toolcalls_task_id_idx ON toolcalls(task_id);
CREATE INDEX toolcalls_subtask_id_idx ON toolcalls(subtask_id);
CREATE INDEX toolcalls_status_idx ON toolcalls(status);
CREATE INDEX toolcalls_name_idx ON toolcalls(name);
CREATE INDEX toolcalls_created_at_idx ON toolcalls(created_at);
CREATE INDEX toolcalls_call_id_idx ON toolcalls(call_id);

-- Assistants indexes for analytics
CREATE INDEX assistants_flow_id_idx ON assistants(flow_id);
CREATE INDEX assistants_deleted_at_idx ON assistants(deleted_at) WHERE deleted_at IS NULL;
CREATE INDEX assistants_created_at_idx ON assistants(created_at);
CREATE INDEX assistants_flow_id_deleted_at_idx ON assistants(flow_id, deleted_at) WHERE deleted_at IS NULL;
CREATE INDEX assistants_flow_id_created_at_idx ON assistants(flow_id, created_at) WHERE deleted_at IS NULL;

-- Additional analytics indexes
CREATE INDEX subtasks_task_id_status_idx ON subtasks(task_id, status);
CREATE INDEX subtasks_status_created_at_idx ON subtasks(status, created_at);
CREATE INDEX toolcalls_flow_id_status_idx ON toolcalls(flow_id, status);
CREATE INDEX toolcalls_name_status_idx ON toolcalls(name, status);
CREATE INDEX msgchains_type_task_id_subtask_id_idx ON msgchains(type, task_id, subtask_id);
CREATE INDEX msgchains_type_created_at_idx ON msgchains(type, created_at);
CREATE INDEX tasks_flow_id_status_idx ON tasks(flow_id, status);

-- Provider indexes
CREATE INDEX providers_user_id_idx ON providers(user_id);
CREATE INDEX providers_type_idx ON providers(type);
CREATE INDEX providers_name_user_id_idx ON providers(name, user_id);
CREATE UNIQUE INDEX providers_name_user_id_unique ON providers(name, user_id) WHERE deleted_at IS NULL;

Note: Some indexes on large text fields (tasks.input, tasks.result, subtasks.description, subtasks.result) have been removed to improve write performance. These fields should use full-text search when needed.

Efficient Joins: User-scoped queries use INNER JOINs to leverage PostgreSQL query planner:

-- Efficient user-scoped access with proper join order
SELECT t.*
FROM tasks t
INNER JOIN flows f ON t.flow_id = f.id  -- Fast foreign key join
WHERE f.user_id = $1 AND f.deleted_at IS NULL;

Batch Operations: Use transaction batching for bulk operations:

tx, err := db.BeginTx(ctx, nil)
defer tx.Rollback()

queries := database.New(tx)
for _, item := range items {
    if _, err := queries.CreateSubtask(ctx, item); err != nil {
        return err
    }
}
return tx.Commit()

Connection Pooling

The package provides optimized connection pooling through GORM:

func NewGorm(dsn, dbType string) (*gorm.DB, error) {
    db, err := gorm.Open(dbType, dsn)
    if err != nil {
        return nil, err
    }

    // Optimized connection settings
    db.DB().SetMaxIdleConns(5)
    db.DB().SetMaxOpenConns(20)
    db.DB().SetConnMaxLifetime(time.Hour)

    return db, nil
}

Vector Operations

For pgvector operations, consider:

  • Batch embedding inserts for better performance
  • Appropriate vector dimensions (typically 512-1536)
  • Index configuration for similarity searches

Debugging and Troubleshooting

Query Logging

Enable query logging for debugging:

// GORM logger captures all SQL operations
db.SetLogger(&GormLogger{})
db.LogMode(true)

Log Output Example:

INFO[0000] SELECT * FROM flows WHERE user_id = '1' AND deleted_at IS NULL  component=pentagi-gorm duration=2.5ms rows_returned=3

Common Issues and Solutions

1. Foreign Key Constraint Violations

Error: pq: insert or update on table "tasks" violates foreign key constraint

Solution: Ensure parent entities exist before creating child entities:

// Verify flow exists and user has access
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
    ID:     flowID,
    UserID: userID,
})
if err != nil {
    return fmt.Errorf("invalid flow: %w", err)
}

// Now safe to create task
task, err := db.CreateTask(ctx, taskParams)

2. Soft Delete Issues

Error: Records not appearing in queries after "deletion"

Solution: Check soft delete filters in custom queries:

-- Always include soft delete filter
WHERE f.deleted_at IS NULL

3. Null Value Handling

Error: sql: Scan error on column index 2: unsupported Scan

Solution: Use proper null value converters:

// When creating
description := database.StringToNullString(optionalDesc)

// When reading
descPtr := database.NullStringToPtrString(row.Description)

Query Performance Analysis

Use PostgreSQL's EXPLAIN for performance analysis:

-- Analyze query performance
EXPLAIN ANALYZE SELECT f.*, COUNT(t.id) as task_count
FROM flows f
LEFT JOIN tasks t ON f.id = t.flow_id
WHERE f.user_id = $1 AND f.deleted_at IS NULL
GROUP BY f.id;

Extending the Database Package

Adding New Entities

  1. Create migration: Add schema in backend/migrations/sql/
  2. Create SQL queries: Add .sql file in backend/sqlc/models/
  3. Regenerate code: Run sqlc generation command
  4. Add converters: Update converter/converter.go for GraphQL integration

Example New Entity:

-- backend/sqlc/models/vulnerabilities.sql

-- name: CreateVulnerability :one
INSERT INTO vulnerabilities (
  title, severity, description, flow_id
) VALUES (
  $1, $2, $3, $4
) RETURNING *;

-- name: GetFlowVulnerabilities :many
SELECT v.*
FROM vulnerabilities v
INNER JOIN flows f ON v.flow_id = f.id
WHERE v.flow_id = $1 AND f.deleted_at IS NULL
ORDER BY v.severity DESC, v.created_at DESC;

Custom Query Patterns

Follow established patterns for consistency:

-- Pattern: User-scoped access
-- name: GetUser[Entity] :one/:many
SELECT [entity].*
FROM [entity] [alias]
INNER JOIN flows f ON [alias].flow_id = f.id
INNER JOIN users u ON f.user_id = u.id
WHERE [conditions] AND f.user_id = $user_id AND f.deleted_at IS NULL;

-- Pattern: Hierarchical retrieval
-- name: Get[Parent][Children] :many
SELECT [child].*
FROM [child] [child_alias]
INNER JOIN [parent] [parent_alias] ON [child_alias].[parent_id] = [parent_alias].id
WHERE [parent_alias].id = $1 AND [filters];

Integration Testing

Test database operations with real PostgreSQL:

func TestCreateFlow(t *testing.T) {
    // Setup test database
    db := setupTestDB(t)
    defer cleanupTestDB(t, db)

    queries := database.New(db)

    // Test operation
    flow, err := queries.CreateFlow(ctx, database.CreateFlowParams{
        Title:         "Test Flow",
        Status:        "active",
        ModelProvider: "openai",
        UserID:        1,
    })

    assert.NoError(t, err)
    assert.Equal(t, "Test Flow", flow.Title)
}

Security Guidelines

Input Validation

Always validate inputs before database operations:

func validateFlowInput(params CreateFlowParams) error {
    if len(params.Title) > 255 {
        return fmt.Errorf("title too long")
    }
    if !isValidStatus(params.Status) {
        return fmt.Errorf("invalid status")
    }
    return nil
}

Access Control

Implement consistent access control patterns:

// Always verify user ownership
flow, err := db.GetUserFlow(ctx, database.GetUserFlowParams{
    ID:     flowID,
    UserID: currentUserID,
})
if err != nil {
    return fmt.Errorf("access denied or flow not found")
}

Audit Logging

Use logging entities for security audit trails:

// Log sensitive operations
_, err = db.CreateAgentLog(ctx, database.CreateAgentLogParams{
    Initiator: "system",
    Executor:  "user_action",
    Task:      "flow_deletion",
    Result:    []byte(fmt.Sprintf(`{"flow_id": %d, "user_id": %d}`, flowID, userID)),
    FlowID:    flowID,
})

Conclusion

The database package provides a robust, secure, and performant foundation for PentAGI's data layer. By leveraging sqlc for code generation, implementing consistent security patterns, and maintaining comprehensive audit trails, it ensures reliable operation of the autonomous penetration testing system.

Key benefits:

  • Type Safety: Compile-time verification of SQL queries
  • Performance: Optimized queries with proper indexing
  • Security: Multi-tenancy and soft delete support
  • Observability: Comprehensive logging and tracing
  • Maintainability: Consistent patterns and generated code

For developers working with this package, follow the established patterns for security, performance, and maintainability to ensure smooth integration with the broader PentAGI ecosystem.

This documentation provides a comprehensive overview of the database package's architecture, functionality, and integration within the PentAGI system.