mirror of
https://github.com/vxcontrol/pentagi.git
synced 2026-05-03 21:40:32 +00:00
feat: enhance Docker client with container file operations and API integration
- Added methods for non-recursive directory listing and file stat operations in the Docker client. - Implemented a new API endpoint to retrieve files from a running container's directory. - Updated documentation to reflect new file operations and API changes. - Introduced data structures for container file metadata and integrated them into the flow file service. - Enhanced flow file management capabilities with improved synchronization between local and container file systems.
This commit is contained in:
+53
-3
@@ -23,7 +23,7 @@ The Docker client package (`backend/pkg/docker`) provides a secure and isolated
|
||||
- **AI Agent Integration**: Specifically designed to support AI agent workflows and terminal operations
|
||||
- **Container Lifecycle Management**: Comprehensive container creation, execution, and cleanup
|
||||
- **Port Management**: Automatic port allocation for flow-specific containers
|
||||
- **File Operations**: Safe file transfer between host and containers
|
||||
- **File Operations**: Safe file transfer, path metadata lookup, and non-recursive directory listing between host and containers
|
||||
- **Network Isolation**: Configurable network policies for security
|
||||
- **Resource Management**: Memory and CPU limits for controlled execution
|
||||
- **Volume Management**: Persistent and temporary storage solutions
|
||||
@@ -58,6 +58,7 @@ const BaseContainerPortsNumber = 28000 // Starting port number fo
|
||||
const defaultImage = "debian:latest" // Fallback image if custom image fails
|
||||
const containerPortsNumber = 2 // Number of ports allocated per container
|
||||
const limitContainerPortsNumber = 2000 // Maximum port range for allocation
|
||||
const containerListWorkers = 20 // Parallel stat workers for directory listing
|
||||
```
|
||||
|
||||
### Port Allocation Strategy
|
||||
@@ -214,6 +215,8 @@ type DockerClient interface {
|
||||
ContainerExecInspect(ctx context.Context, execID string) (container.ExecInspect, error)
|
||||
|
||||
// File operations
|
||||
ContainerStatPath(ctx context.Context, containerID string, path string) (container.PathStat, error)
|
||||
ListContainerDir(ctx context.Context, containerID string, dirPath string) ([]container.PathStat, error)
|
||||
CopyToContainer(ctx context.Context, containerID string, dstPath string, content io.Reader, options container.CopyToContainerOptions) error
|
||||
CopyFromContainer(ctx context.Context, containerID string, srcPath string) (io.ReadCloser, container.PathStat, error)
|
||||
|
||||
@@ -259,7 +262,7 @@ The `RunContainer` method handles the complete container creation workflow:
|
||||
3. **Container Configuration**:
|
||||
- Sets hostname based on container name hash
|
||||
- Configures working directory to `/work`
|
||||
- Sets up restart policy (`unless-stopped`)
|
||||
- Sets up restart policy (`on-failure`, maximum 5 retries)
|
||||
- Configures logging (JSON driver with rotation)
|
||||
|
||||
4. **Storage Setup**:
|
||||
@@ -294,7 +297,8 @@ containerConfig := &container.Config{
|
||||
hostConfig := &container.HostConfig{
|
||||
CapAdd: []string{"NET_RAW"}, // Required capabilities for network tools
|
||||
RestartPolicy: container.RestartPolicy{
|
||||
Name: "unless-stopped", // Auto-restart unless explicitly stopped
|
||||
Name: "on-failure", // Restart failed containers only
|
||||
MaximumRetryCount: 5,
|
||||
},
|
||||
Binds: []string{
|
||||
"/host/data/flow-123:/work", // Work directory mount
|
||||
@@ -408,6 +412,33 @@ The terminal tool uses the Docker client for:
|
||||
- **File Operations**: Reading and writing files safely
|
||||
- **Result Capture**: Collecting command output and artifacts
|
||||
|
||||
### Flow File Integration
|
||||
|
||||
Flow files are managed by the REST API in `pkg/server/services/flow_files.go` and use Docker client file APIs for synchronization with the running primary container.
|
||||
|
||||
PentAGI keeps two different storage areas for flow files:
|
||||
|
||||
- **Local cache**: `{DATA_DIR}/flow-{id}-data/uploads` and `{DATA_DIR}/flow-{id}-data/container`
|
||||
- **Container workspace**: `/work` inside the primary container
|
||||
|
||||
This separation is intentional. It supports both single-node deployments and remote worker-node deployments where the backend host filesystem is not the same filesystem used by Docker workers.
|
||||
|
||||
The current behavior is:
|
||||
|
||||
- User uploads are saved to the local cache under `uploads/`.
|
||||
- If the primary container is running, uploaded files are pushed best-effort to `/work/uploads`.
|
||||
- When the primary container starts or is reused, cached uploads are synchronized into `/work/uploads`; the cache is the source of truth.
|
||||
- Files pulled from the container are stored under `container/` using their normalized full container path, for example:
|
||||
- `/etc/nginx/nginx.conf` -> `container/etc/nginx/nginx.conf`
|
||||
- `/work/test.md` -> `container/work/test.md`
|
||||
- Deleting cached upload files is allowed even when the container is not running. The next container start will resynchronize `/work/uploads` from cache.
|
||||
|
||||
The flow files API also exposes a non-recursive live container directory listing endpoint. It uses `ContainerStatPath` to determine whether the requested path is a file or directory:
|
||||
|
||||
- If the path is a file, it returns that file metadata directly.
|
||||
- If the path is a directory, it calls `ListContainerDir`.
|
||||
- If the path is omitted, it defaults to `/work`.
|
||||
|
||||
### Provider Integration
|
||||
|
||||
The provider system uses Docker client for environment preparation:
|
||||
@@ -516,8 +547,27 @@ defer reader.Close()
|
||||
|
||||
// Extract content from tar
|
||||
content := extractFromTar(reader)
|
||||
|
||||
// Stat a file or directory in the container
|
||||
stat, err := dockerClient.ContainerStatPath(ctx, containerID, "/work/results.txt")
|
||||
|
||||
// List direct entries in a container directory
|
||||
entries, err := dockerClient.ListContainerDir(ctx, containerID, "/work")
|
||||
```
|
||||
|
||||
### Container Directory Listing
|
||||
|
||||
`ListContainerDir` performs a non-recursive directory listing inside a running container:
|
||||
|
||||
1. Uses `ContainerStatPath` to verify that `dirPath` exists and is a directory.
|
||||
2. Executes `ls -1 -- <dirPath>` inside the container to get direct entry names.
|
||||
3. Calls `ContainerStatPath` for every entry to return Docker `container.PathStat` metadata.
|
||||
4. Runs entry stat calls through `pkg/queue` with `containerListWorkers = 20` workers to reduce latency for large directories.
|
||||
|
||||
The method returns `[]container.PathStat`. The caller is responsible for joining the returned entry name with the requested base path when it needs full paths.
|
||||
|
||||
If `dirPath` is empty, it defaults to `WorkFolderPathInContainer` (`/work`).
|
||||
|
||||
### Cleanup and Resource Management
|
||||
|
||||
```go
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"hash/crc32"
|
||||
"io"
|
||||
"os"
|
||||
"path"
|
||||
"path/filepath"
|
||||
"slices"
|
||||
"strings"
|
||||
@@ -13,6 +14,7 @@ import (
|
||||
|
||||
"pentagi/pkg/config"
|
||||
"pentagi/pkg/database"
|
||||
"pentagi/pkg/queue"
|
||||
|
||||
"github.com/docker/docker/api/types"
|
||||
"github.com/docker/docker/api/types/container"
|
||||
@@ -37,8 +39,20 @@ const (
|
||||
containerLocalCwdTemplate = "flow-%d"
|
||||
containerPortsNumber = 2
|
||||
limitContainerPortsNumber = 2000
|
||||
containerListWorkers = 20
|
||||
)
|
||||
|
||||
type containerPathStatRequest struct {
|
||||
name string
|
||||
path string
|
||||
}
|
||||
|
||||
type containerPathStatResult struct {
|
||||
name string
|
||||
stat container.PathStat
|
||||
err error
|
||||
}
|
||||
|
||||
type dockerClient struct {
|
||||
db database.Querier
|
||||
logger *logrus.Logger
|
||||
@@ -61,6 +75,8 @@ type DockerClient interface {
|
||||
ContainerExecCreate(ctx context.Context, container string, config container.ExecOptions) (container.ExecCreateResponse, error)
|
||||
ContainerExecAttach(ctx context.Context, execID string, config container.ExecAttachOptions) (types.HijackedResponse, error)
|
||||
ContainerExecInspect(ctx context.Context, execID string) (container.ExecInspect, error)
|
||||
ContainerStatPath(ctx context.Context, containerID string, path string) (container.PathStat, error)
|
||||
ListContainerDir(ctx context.Context, containerID string, dirPath string) ([]container.PathStat, error)
|
||||
CopyToContainer(ctx context.Context, containerID string, dstPath string, content io.Reader, options container.CopyToContainerOptions) error
|
||||
CopyFromContainer(ctx context.Context, containerID string, srcPath string) (io.ReadCloser, container.PathStat, error)
|
||||
Cleanup(ctx context.Context) error
|
||||
@@ -557,6 +573,107 @@ func (dc *dockerClient) ContainerExecInspect(
|
||||
return dc.client.ContainerExecInspect(ctx, execID)
|
||||
}
|
||||
|
||||
func (dc *dockerClient) ContainerStatPath(
|
||||
ctx context.Context,
|
||||
containerID string,
|
||||
path string,
|
||||
) (container.PathStat, error) {
|
||||
return dc.client.ContainerStatPath(ctx, containerID, path)
|
||||
}
|
||||
|
||||
func (dc *dockerClient) ListContainerDir(
|
||||
ctx context.Context,
|
||||
containerID string,
|
||||
dirPath string,
|
||||
) ([]container.PathStat, error) {
|
||||
if strings.TrimSpace(dirPath) == "" {
|
||||
dirPath = WorkFolderPathInContainer
|
||||
}
|
||||
|
||||
dirStat, err := dc.ContainerStatPath(ctx, containerID, dirPath)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to stat container path '%s': %w", dirPath, err)
|
||||
}
|
||||
if !dirStat.Mode.IsDir() {
|
||||
return nil, fmt.Errorf("container path '%s' is not a directory", dirPath)
|
||||
}
|
||||
|
||||
createResp, err := dc.ContainerExecCreate(ctx, containerID, container.ExecOptions{
|
||||
Cmd: []string{"ls", "-1", "--", dirPath},
|
||||
AttachStdout: true,
|
||||
AttachStderr: true,
|
||||
Tty: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create ls exec for '%s': %w", dirPath, err)
|
||||
}
|
||||
|
||||
resp, err := dc.ContainerExecAttach(ctx, createResp.ID, container.ExecAttachOptions{
|
||||
Tty: true,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to attach ls exec for '%s': %w", dirPath, err)
|
||||
}
|
||||
output, readErr := io.ReadAll(resp.Reader)
|
||||
resp.Close()
|
||||
if readErr != nil {
|
||||
return nil, fmt.Errorf("failed to read ls output for '%s': %w", dirPath, readErr)
|
||||
}
|
||||
|
||||
inspect, err := dc.ContainerExecInspect(ctx, createResp.ID)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to inspect ls exec for '%s': %w", dirPath, err)
|
||||
}
|
||||
if inspect.ExitCode != 0 {
|
||||
return nil, fmt.Errorf("ls command failed for '%s' with exit code %d: %s", dirPath, inspect.ExitCode, string(output))
|
||||
}
|
||||
|
||||
names := make([]string, 0)
|
||||
for _, line := range strings.Split(string(output), "\n") {
|
||||
name := strings.TrimSpace(line)
|
||||
if name == "" {
|
||||
continue
|
||||
}
|
||||
names = append(names, name)
|
||||
}
|
||||
|
||||
input := make(chan containerPathStatRequest, len(names))
|
||||
outputStats := make(chan containerPathStatResult)
|
||||
for _, name := range names {
|
||||
input <- containerPathStatRequest{
|
||||
name: name,
|
||||
path: path.Join(dirPath, name),
|
||||
}
|
||||
}
|
||||
close(input)
|
||||
|
||||
statQueue := queue.NewQueue(input, outputStats, containerListWorkers, func(req containerPathStatRequest) (containerPathStatResult, error) {
|
||||
stat, err := dc.ContainerStatPath(ctx, containerID, req.path)
|
||||
|
||||
return containerPathStatResult{
|
||||
name: req.name,
|
||||
stat: stat,
|
||||
err: err,
|
||||
}, nil
|
||||
})
|
||||
if err := statQueue.Start(); err != nil {
|
||||
return nil, fmt.Errorf("failed to start container stat queue: %w", err)
|
||||
}
|
||||
|
||||
stats := make([]container.PathStat, 0, len(names))
|
||||
for range names {
|
||||
result := <-outputStats
|
||||
if result.err != nil {
|
||||
_ = statQueue.Stop()
|
||||
return nil, fmt.Errorf("failed to stat container entry '%s': %w", result.name, result.err)
|
||||
}
|
||||
stats = append(stats, result.stat)
|
||||
}
|
||||
_ = statQueue.Stop()
|
||||
|
||||
return stats, nil
|
||||
}
|
||||
|
||||
func (dc *dockerClient) CopyToContainer(
|
||||
ctx context.Context,
|
||||
containerID string,
|
||||
|
||||
@@ -2090,6 +2090,82 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"/flows/{flowID}/files/container": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"BearerAuth": []
|
||||
}
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"FlowFiles"
|
||||
],
|
||||
"summary": "Retrieve flow container directory files list",
|
||||
"parameters": [
|
||||
{
|
||||
"minimum": 0,
|
||||
"type": "integer",
|
||||
"description": "flow id",
|
||||
"name": "flowID",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "absolute path inside the running container; defaults to /work",
|
||||
"name": "path",
|
||||
"in": "query"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "container files list received successful",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SuccessResponse"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"$ref": "#/definitions/services.containerFiles"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "invalid flow files request data or container not running",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "getting flow container files not permitted",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "flow not found",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "internal error on getting flow container files",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/flows/{flowID}/files/download": {
|
||||
"get": {
|
||||
"security": [
|
||||
@@ -7826,6 +7902,46 @@ const docTemplate = `{
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containerFile": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"isDir": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"modifiedAt": {
|
||||
"type": "string"
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"size": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containerFiles": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"files": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/services.containerFile"
|
||||
}
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"total": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containers": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -2082,6 +2082,82 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"/flows/{flowID}/files/container": {
|
||||
"get": {
|
||||
"security": [
|
||||
{
|
||||
"BearerAuth": []
|
||||
}
|
||||
],
|
||||
"produces": [
|
||||
"application/json"
|
||||
],
|
||||
"tags": [
|
||||
"FlowFiles"
|
||||
],
|
||||
"summary": "Retrieve flow container directory files list",
|
||||
"parameters": [
|
||||
{
|
||||
"minimum": 0,
|
||||
"type": "integer",
|
||||
"description": "flow id",
|
||||
"name": "flowID",
|
||||
"in": "path",
|
||||
"required": true
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"description": "absolute path inside the running container; defaults to /work",
|
||||
"name": "path",
|
||||
"in": "query"
|
||||
}
|
||||
],
|
||||
"responses": {
|
||||
"200": {
|
||||
"description": "container files list received successful",
|
||||
"schema": {
|
||||
"allOf": [
|
||||
{
|
||||
"$ref": "#/definitions/SuccessResponse"
|
||||
},
|
||||
{
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"data": {
|
||||
"$ref": "#/definitions/services.containerFiles"
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"400": {
|
||||
"description": "invalid flow files request data or container not running",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"403": {
|
||||
"description": "getting flow container files not permitted",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"404": {
|
||||
"description": "flow not found",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
},
|
||||
"500": {
|
||||
"description": "internal error on getting flow container files",
|
||||
"schema": {
|
||||
"$ref": "#/definitions/ErrorResponse"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"/flows/{flowID}/files/download": {
|
||||
"get": {
|
||||
"security": [
|
||||
@@ -7818,6 +7894,46 @@
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containerFile": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"id": {
|
||||
"type": "string"
|
||||
},
|
||||
"isDir": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"modifiedAt": {
|
||||
"type": "string"
|
||||
},
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"size": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containerFiles": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"files": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "#/definitions/services.containerFile"
|
||||
}
|
||||
},
|
||||
"path": {
|
||||
"type": "string"
|
||||
},
|
||||
"total": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"services.containers": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
|
||||
@@ -1351,6 +1351,32 @@ definitions:
|
||||
total:
|
||||
type: integer
|
||||
type: object
|
||||
services.containerFile:
|
||||
properties:
|
||||
id:
|
||||
type: string
|
||||
isDir:
|
||||
type: boolean
|
||||
modifiedAt:
|
||||
type: string
|
||||
name:
|
||||
type: string
|
||||
path:
|
||||
type: string
|
||||
size:
|
||||
type: integer
|
||||
type: object
|
||||
services.containerFiles:
|
||||
properties:
|
||||
files:
|
||||
items:
|
||||
$ref: '#/definitions/services.containerFile'
|
||||
type: array
|
||||
path:
|
||||
type: string
|
||||
total:
|
||||
type: integer
|
||||
type: object
|
||||
services.containers:
|
||||
properties:
|
||||
containers:
|
||||
@@ -2972,6 +2998,52 @@ paths:
|
||||
summary: Upload files to flow workspace
|
||||
tags:
|
||||
- FlowFiles
|
||||
/flows/{flowID}/files/container:
|
||||
get:
|
||||
parameters:
|
||||
- description: flow id
|
||||
in: path
|
||||
minimum: 0
|
||||
name: flowID
|
||||
required: true
|
||||
type: integer
|
||||
- description: absolute path inside the running container; defaults to /work
|
||||
in: query
|
||||
name: path
|
||||
type: string
|
||||
produces:
|
||||
- application/json
|
||||
responses:
|
||||
"200":
|
||||
description: container files list received successful
|
||||
schema:
|
||||
allOf:
|
||||
- $ref: '#/definitions/SuccessResponse'
|
||||
- properties:
|
||||
data:
|
||||
$ref: '#/definitions/services.containerFiles'
|
||||
type: object
|
||||
"400":
|
||||
description: invalid flow files request data or container not running
|
||||
schema:
|
||||
$ref: '#/definitions/ErrorResponse'
|
||||
"403":
|
||||
description: getting flow container files not permitted
|
||||
schema:
|
||||
$ref: '#/definitions/ErrorResponse'
|
||||
"404":
|
||||
description: flow not found
|
||||
schema:
|
||||
$ref: '#/definitions/ErrorResponse'
|
||||
"500":
|
||||
description: internal error on getting flow container files
|
||||
schema:
|
||||
$ref: '#/definitions/ErrorResponse'
|
||||
security:
|
||||
- BearerAuth: []
|
||||
summary: Retrieve flow container directory files list
|
||||
tags:
|
||||
- FlowFiles
|
||||
/flows/{flowID}/files/download:
|
||||
get:
|
||||
parameters:
|
||||
|
||||
@@ -375,6 +375,7 @@ func setFlowFilesGroup(parent *gin.RouterGroup, svc *services.FlowFileService) {
|
||||
flowFilesGroup := parent.Group("/flows/:flowID/files")
|
||||
{
|
||||
flowFilesGroup.GET("/", svc.GetFlowFiles)
|
||||
flowFilesGroup.GET("/container", svc.GetFlowContainerFiles)
|
||||
flowFilesGroup.POST("/", svc.UploadFlowFiles)
|
||||
flowFilesGroup.DELETE("/", svc.DeleteFlowFile)
|
||||
flowFilesGroup.GET("/download", svc.DownloadFlowFile)
|
||||
|
||||
@@ -49,6 +49,21 @@ type flowFiles struct {
|
||||
Total uint64 `json:"total"`
|
||||
}
|
||||
|
||||
type containerFile struct {
|
||||
ID string `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Path string `json:"path"`
|
||||
Size int64 `json:"size"`
|
||||
IsDir bool `json:"isDir"`
|
||||
ModifiedAt time.Time `json:"modifiedAt"`
|
||||
}
|
||||
|
||||
type containerFiles struct {
|
||||
Path string `json:"path"`
|
||||
Files []containerFile `json:"files"`
|
||||
Total uint64 `json:"total"`
|
||||
}
|
||||
|
||||
type pullFlowFilesRequest struct {
|
||||
// Path is an arbitrary path inside the container, e.g. "/etc/nginx/conf" or "/work/uploads/report.txt".
|
||||
Path string `json:"path"`
|
||||
@@ -651,6 +666,98 @@ func (s *FlowFileService) PullFlowFiles(c *gin.Context) {
|
||||
})
|
||||
}
|
||||
|
||||
// GetFlowContainerFiles is a function to return non-recursive container directory files list
|
||||
// @Summary Retrieve flow container directory files list
|
||||
// @Tags FlowFiles
|
||||
// @Produce json
|
||||
// @Security BearerAuth
|
||||
// @Param flowID path int true "flow id" minimum(0)
|
||||
// @Param path query string false "absolute path inside the running container; defaults to /work"
|
||||
// @Success 200 {object} response.successResp{data=containerFiles} "container files list received successful"
|
||||
// @Failure 400 {object} response.errorResp "invalid flow files request data or container not running"
|
||||
// @Failure 403 {object} response.errorResp "getting flow container files not permitted"
|
||||
// @Failure 404 {object} response.errorResp "flow not found"
|
||||
// @Failure 500 {object} response.errorResp "internal error on getting flow container files"
|
||||
// @Router /flows/{flowID}/files/container [get]
|
||||
func (s *FlowFileService) GetFlowContainerFiles(c *gin.Context) {
|
||||
flowID, err := parseFlowIDParam(c)
|
||||
if err != nil {
|
||||
logger.FromContext(c).WithError(err).Error("error parsing flow id")
|
||||
response.Error(c, response.ErrFlowFilesInvalidRequest, err)
|
||||
return
|
||||
}
|
||||
|
||||
if _, err := s.getFlow(c, flowID, false); err != nil {
|
||||
s.handleFlowLookupError(c, flowID, err)
|
||||
return
|
||||
}
|
||||
|
||||
if s.dockerClient == nil {
|
||||
err = errors.New("docker client not configured on this server")
|
||||
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("docker client unavailable for container files list")
|
||||
response.Error(c, response.ErrInternal, err)
|
||||
return
|
||||
}
|
||||
|
||||
containerPath := strings.TrimSpace(c.Query("path"))
|
||||
if containerPath == "" {
|
||||
containerPath = docker.WorkFolderPathInContainer
|
||||
}
|
||||
|
||||
containerName := primaryContainerName(flowID)
|
||||
running, err := s.dockerClient.IsContainerRunning(c.Request.Context(), containerName)
|
||||
if err != nil {
|
||||
logger.FromContext(c).WithError(err).WithFields(map[string]any{
|
||||
"flow_id": flowID,
|
||||
"container_name": containerName,
|
||||
}).Error("error checking container status for files list")
|
||||
response.Error(c, response.ErrInternal, err)
|
||||
return
|
||||
}
|
||||
if !running {
|
||||
err = fmt.Errorf("container '%s' is not running; start the flow before listing container files", containerName)
|
||||
logger.FromContext(c).WithError(err).WithField("flow_id", flowID).Error("container not running for files list")
|
||||
response.Error(c, response.ErrFlowFilesContainerNotRunning, err)
|
||||
return
|
||||
}
|
||||
|
||||
pathStat, err := s.dockerClient.ContainerStatPath(c.Request.Context(), containerName, containerPath)
|
||||
if err != nil {
|
||||
logger.FromContext(c).WithError(err).WithFields(map[string]any{
|
||||
"flow_id": flowID,
|
||||
"container_path": containerPath,
|
||||
}).Error("error stating container path")
|
||||
response.Error(c, response.ErrInternal, err)
|
||||
return
|
||||
}
|
||||
if !pathStat.Mode.IsDir() {
|
||||
file := convertContainerFile(path.Dir(containerPath), pathStat)
|
||||
response.Success(c, http.StatusOK, containerFiles{
|
||||
Path: containerPath,
|
||||
Files: []containerFile{file},
|
||||
Total: 1,
|
||||
})
|
||||
return
|
||||
}
|
||||
|
||||
stats, err := s.dockerClient.ListContainerDir(c.Request.Context(), containerName, containerPath)
|
||||
if err != nil {
|
||||
logger.FromContext(c).WithError(err).WithFields(map[string]any{
|
||||
"flow_id": flowID,
|
||||
"container_path": containerPath,
|
||||
}).Error("error listing container directory")
|
||||
response.Error(c, response.ErrInternal, err)
|
||||
return
|
||||
}
|
||||
|
||||
files := convertContainerFiles(containerPath, stats)
|
||||
response.Success(c, http.StatusOK, containerFiles{
|
||||
Path: containerPath,
|
||||
Files: files,
|
||||
Total: uint64(len(files)),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *FlowFileService) getFlow(c *gin.Context, flowID uint64, writeAccess bool) (models.Flow, error) {
|
||||
var flow models.Flow
|
||||
|
||||
@@ -909,6 +1016,7 @@ func convertFlowFile(file flowfiles.File) flowFile {
|
||||
|
||||
func convertModelFlowFile(file flowFile) *model.FlowFile {
|
||||
return &model.FlowFile{
|
||||
ID: file.ID,
|
||||
Name: file.Name,
|
||||
Path: file.Path,
|
||||
Size: int(file.Size),
|
||||
@@ -917,6 +1025,32 @@ func convertModelFlowFile(file flowFile) *model.FlowFile {
|
||||
}
|
||||
}
|
||||
|
||||
func convertContainerFiles(basePath string, stats []container.PathStat) []containerFile {
|
||||
files := make([]containerFile, 0, len(stats))
|
||||
for _, stat := range stats {
|
||||
files = append(files, convertContainerFile(basePath, stat))
|
||||
}
|
||||
|
||||
sort.Slice(files, func(i, j int) bool {
|
||||
return files[i].Name < files[j].Name
|
||||
})
|
||||
|
||||
return files
|
||||
}
|
||||
|
||||
func convertContainerFile(basePath string, stat container.PathStat) containerFile {
|
||||
filePath := path.Join(basePath, stat.Name)
|
||||
|
||||
return containerFile{
|
||||
ID: flowfiles.ID(filePath),
|
||||
Name: stat.Name,
|
||||
Path: filePath,
|
||||
Size: stat.Size,
|
||||
IsDir: stat.Mode.IsDir(),
|
||||
ModifiedAt: stat.Mtime,
|
||||
}
|
||||
}
|
||||
|
||||
func (s *FlowFileService) publishFlowFilesAdded(ctx context.Context, flow models.Flow, files []flowFile) {
|
||||
if s.ss == nil {
|
||||
return
|
||||
|
||||
@@ -13,6 +13,7 @@ import (
|
||||
|
||||
"pentagi/pkg/flowfiles"
|
||||
|
||||
"github.com/docker/docker/api/types/container"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
@@ -226,6 +227,57 @@ func TestFlowFileService_ListFlowFiles_EmptyDirs(t *testing.T) {
|
||||
assert.Empty(t, files)
|
||||
}
|
||||
|
||||
func TestConvertModelFlowFile_PreservesID(t *testing.T) {
|
||||
file := flowFile{
|
||||
ID: "flow-file-id",
|
||||
Name: "report.md",
|
||||
Path: "uploads/report.md",
|
||||
Size: 42,
|
||||
IsDir: false,
|
||||
ModifiedAt: time.Now(),
|
||||
}
|
||||
|
||||
modelFile := convertModelFlowFile(file)
|
||||
|
||||
require.NotNil(t, modelFile)
|
||||
assert.Equal(t, file.ID, modelFile.ID)
|
||||
assert.Equal(t, file.Name, modelFile.Name)
|
||||
assert.Equal(t, file.Path, modelFile.Path)
|
||||
assert.Equal(t, int(file.Size), modelFile.Size)
|
||||
assert.Equal(t, file.IsDir, modelFile.IsDir)
|
||||
assert.Equal(t, file.ModifiedAt, modelFile.ModifiedAt)
|
||||
}
|
||||
|
||||
func TestConvertContainerFiles(t *testing.T) {
|
||||
mtime := time.Now()
|
||||
files := convertContainerFiles("/work", []container.PathStat{
|
||||
{
|
||||
Name: "zeta.txt",
|
||||
Size: 10,
|
||||
Mode: 0644,
|
||||
Mtime: mtime,
|
||||
},
|
||||
{
|
||||
Name: "alpha",
|
||||
Mode: os.ModeDir | 0755,
|
||||
Mtime: mtime.Add(time.Second),
|
||||
},
|
||||
})
|
||||
|
||||
require.Len(t, files, 2)
|
||||
assert.Equal(t, "alpha", files[0].Name)
|
||||
assert.Equal(t, "/work/alpha", files[0].Path)
|
||||
assert.Equal(t, flowfiles.ID("/work/alpha"), files[0].ID)
|
||||
assert.True(t, files[0].IsDir)
|
||||
assert.Equal(t, int64(0), files[0].Size)
|
||||
|
||||
assert.Equal(t, "zeta.txt", files[1].Name)
|
||||
assert.Equal(t, "/work/zeta.txt", files[1].Path)
|
||||
assert.Equal(t, flowfiles.ID("/work/zeta.txt"), files[1].ID)
|
||||
assert.False(t, files[1].IsDir)
|
||||
assert.Equal(t, int64(10), files[1].Size)
|
||||
}
|
||||
|
||||
// ── localEntryExists ─────────────────────────────────────────────────────────
|
||||
|
||||
func TestLocalEntryExists(t *testing.T) {
|
||||
|
||||
@@ -93,6 +93,12 @@ func (m *contextAwareMockDockerClient) ContainerExecAttach(ctx context.Context,
|
||||
Reader: bufio.NewReader(pr),
|
||||
}, nil
|
||||
}
|
||||
func (m *contextAwareMockDockerClient) ContainerStatPath(_ context.Context, _ string, _ string) (container.PathStat, error) {
|
||||
return container.PathStat{}, nil
|
||||
}
|
||||
func (m *contextAwareMockDockerClient) ListContainerDir(_ context.Context, _ string, _ string) ([]container.PathStat, error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (m *contextAwareMockDockerClient) ContainerExecInspect(_ context.Context, _ string) (container.ExecInspect, error) {
|
||||
return m.inspectResp, nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user