Files
beszel/agent/docker.go
henrygd 7cf123a99e fix: limit frame and total sizes when reading docker logs (#1322)
- Add per-frame size limit (1MB) to prevent single massive log entries
- Add total log size limit (5MB) for network transfer and browser rendering
- Gracefully truncate logs that exceed limits instead of consuming unbounded memory
2025-10-26 11:02:18 -04:00

722 lines
23 KiB
Go

package agent
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"errors"
"fmt"
"io"
"log/slog"
"net"
"net/http"
"net/url"
"os"
"strings"
"sync"
"time"
"github.com/henrygd/beszel/agent/deltatracker"
"github.com/henrygd/beszel/internal/entities/container"
"github.com/blang/semver"
)
const (
// Docker API timeout in milliseconds
dockerTimeoutMs = 2100
// Maximum realistic network speed (5 GB/s) to detect bad deltas
maxNetworkSpeedBps uint64 = 5e9
// Maximum conceivable memory usage of a container (100TB) to detect bad memory stats
maxMemoryUsage uint64 = 100 * 1024 * 1024 * 1024 * 1024
// Number of log lines to request when fetching container logs
dockerLogsTail = 200
// Maximum size of a single log frame (1MB) to prevent memory exhaustion
// A single log line larger than 1MB is likely an error or misconfiguration
maxLogFrameSize = 1024 * 1024
// Maximum total log content size (5MB) to prevent memory exhaustion
// This provides a reasonable limit for network transfer and browser rendering
maxTotalLogSize = 5 * 1024 * 1024
)
type dockerManager struct {
client *http.Client // Client to query Docker API
wg sync.WaitGroup // WaitGroup to wait for all goroutines to finish
sem chan struct{} // Semaphore to limit concurrent container requests
containerStatsMutex sync.RWMutex // Mutex to prevent concurrent access to containerStatsMap
apiContainerList []*container.ApiInfo // List of containers from Docker API
containerStatsMap map[string]*container.Stats // Keeps track of container stats
validIds map[string]struct{} // Map of valid container ids, used to prune invalid containers from containerStatsMap
goodDockerVersion bool // Whether docker version is at least 25.0.0 (one-shot works correctly)
isWindows bool // Whether the Docker Engine API is running on Windows
buf *bytes.Buffer // Buffer to store and read response bodies
decoder *json.Decoder // Reusable JSON decoder that reads from buf
apiStats *container.ApiStats // Reusable API stats object
// Cache-time-aware tracking for CPU stats (similar to cpu.go)
// Maps cache time intervals to container-specific CPU usage tracking
lastCpuContainer map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu container usage
lastCpuSystem map[uint16]map[string]uint64 // cacheTimeMs -> containerId -> last cpu system usage
lastCpuReadTime map[uint16]map[string]time.Time // cacheTimeMs -> containerId -> last read time (Windows)
// Network delta trackers - one per cache time to avoid interference
// cacheTimeMs -> DeltaTracker for network bytes sent/received
networkSentTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
networkRecvTrackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
}
// userAgentRoundTripper is a custom http.RoundTripper that adds a User-Agent header to all requests
type userAgentRoundTripper struct {
rt http.RoundTripper
userAgent string
}
// RoundTrip implements the http.RoundTripper interface
func (u *userAgentRoundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("User-Agent", u.userAgent)
return u.rt.RoundTrip(req)
}
// Add goroutine to the queue
func (d *dockerManager) queue() {
d.wg.Add(1)
if d.goodDockerVersion {
d.sem <- struct{}{}
}
}
// Remove goroutine from the queue
func (d *dockerManager) dequeue() {
d.wg.Done()
if d.goodDockerVersion {
<-d.sem
}
}
// Returns stats for all running containers with cache-time-aware delta tracking
func (dm *dockerManager) getDockerStats(cacheTimeMs uint16) ([]*container.Stats, error) {
resp, err := dm.client.Get("http://localhost/containers/json")
if err != nil {
return nil, err
}
dm.apiContainerList = dm.apiContainerList[:0]
if err := dm.decode(resp, &dm.apiContainerList); err != nil {
return nil, err
}
dm.isWindows = strings.Contains(resp.Header.Get("Server"), "windows")
containersLength := len(dm.apiContainerList)
// store valid ids to clean up old container ids from map
if dm.validIds == nil {
dm.validIds = make(map[string]struct{}, containersLength)
} else {
clear(dm.validIds)
}
var failedContainers []*container.ApiInfo
for _, ctr := range dm.apiContainerList {
ctr.IdShort = ctr.Id[:12]
dm.validIds[ctr.IdShort] = struct{}{}
// check if container is less than 1 minute old (possible restart)
// note: can't use Created field because it's not updated on restart
if strings.Contains(ctr.Status, "second") {
// if so, remove old container data
dm.deleteContainerStatsSync(ctr.IdShort)
}
dm.queue()
go func(ctr *container.ApiInfo) {
defer dm.dequeue()
err := dm.updateContainerStats(ctr, cacheTimeMs)
// if error, delete from map and add to failed list to retry
if err != nil {
dm.containerStatsMutex.Lock()
delete(dm.containerStatsMap, ctr.IdShort)
failedContainers = append(failedContainers, ctr)
dm.containerStatsMutex.Unlock()
}
}(ctr)
}
dm.wg.Wait()
// retry failed containers separately so we can run them in parallel (docker 24 bug)
if len(failedContainers) > 0 {
slog.Debug("Retrying failed containers", "count", len(failedContainers))
for i := range failedContainers {
ctr := failedContainers[i]
dm.queue()
go func(ctr *container.ApiInfo) {
defer dm.dequeue()
if err2 := dm.updateContainerStats(ctr, cacheTimeMs); err2 != nil {
slog.Error("Error getting container stats", "err", err2)
}
}(ctr)
}
dm.wg.Wait()
}
// populate final stats and remove old / invalid container stats
stats := make([]*container.Stats, 0, containersLength)
for id, v := range dm.containerStatsMap {
if _, exists := dm.validIds[id]; !exists {
delete(dm.containerStatsMap, id)
} else {
stats = append(stats, v)
}
}
// prepare network trackers for next interval for this cache time
dm.cycleNetworkDeltasForCacheTime(cacheTimeMs)
return stats, nil
}
// initializeCpuTracking initializes CPU tracking maps for a specific cache time interval
func (dm *dockerManager) initializeCpuTracking(cacheTimeMs uint16) {
// Initialize cache time maps if they don't exist
if dm.lastCpuContainer[cacheTimeMs] == nil {
dm.lastCpuContainer[cacheTimeMs] = make(map[string]uint64)
}
if dm.lastCpuSystem[cacheTimeMs] == nil {
dm.lastCpuSystem[cacheTimeMs] = make(map[string]uint64)
}
// Ensure the outer map exists before indexing
if dm.lastCpuReadTime == nil {
dm.lastCpuReadTime = make(map[uint16]map[string]time.Time)
}
if dm.lastCpuReadTime[cacheTimeMs] == nil {
dm.lastCpuReadTime[cacheTimeMs] = make(map[string]time.Time)
}
}
// getCpuPreviousValues returns previous CPU values for a container and cache time interval
func (dm *dockerManager) getCpuPreviousValues(cacheTimeMs uint16, containerId string) (uint64, uint64) {
return dm.lastCpuContainer[cacheTimeMs][containerId], dm.lastCpuSystem[cacheTimeMs][containerId]
}
// setCpuCurrentValues stores current CPU values for a container and cache time interval
func (dm *dockerManager) setCpuCurrentValues(cacheTimeMs uint16, containerId string, cpuContainer, cpuSystem uint64) {
dm.lastCpuContainer[cacheTimeMs][containerId] = cpuContainer
dm.lastCpuSystem[cacheTimeMs][containerId] = cpuSystem
}
// calculateMemoryUsage calculates memory usage from Docker API stats
func calculateMemoryUsage(apiStats *container.ApiStats, isWindows bool) (uint64, error) {
if isWindows {
return apiStats.MemoryStats.PrivateWorkingSet, nil
}
memCache := apiStats.MemoryStats.Stats.InactiveFile
if memCache == 0 {
memCache = apiStats.MemoryStats.Stats.Cache
}
usedDelta := apiStats.MemoryStats.Usage - memCache
if usedDelta <= 0 || usedDelta > maxMemoryUsage {
return 0, fmt.Errorf("bad memory stats")
}
return usedDelta, nil
}
// getNetworkTracker returns the DeltaTracker for a specific cache time, creating it if needed
func (dm *dockerManager) getNetworkTracker(cacheTimeMs uint16, isSent bool) *deltatracker.DeltaTracker[string, uint64] {
var trackers map[uint16]*deltatracker.DeltaTracker[string, uint64]
if isSent {
trackers = dm.networkSentTrackers
} else {
trackers = dm.networkRecvTrackers
}
if trackers[cacheTimeMs] == nil {
trackers[cacheTimeMs] = deltatracker.NewDeltaTracker[string, uint64]()
}
return trackers[cacheTimeMs]
}
// cycleNetworkDeltasForCacheTime cycles the network delta trackers for a specific cache time
func (dm *dockerManager) cycleNetworkDeltasForCacheTime(cacheTimeMs uint16) {
if dm.networkSentTrackers[cacheTimeMs] != nil {
dm.networkSentTrackers[cacheTimeMs].Cycle()
}
if dm.networkRecvTrackers[cacheTimeMs] != nil {
dm.networkRecvTrackers[cacheTimeMs].Cycle()
}
}
// calculateNetworkStats calculates network sent/receive deltas using DeltaTracker
func (dm *dockerManager) calculateNetworkStats(ctr *container.ApiInfo, apiStats *container.ApiStats, stats *container.Stats, initialized bool, name string, cacheTimeMs uint16) (uint64, uint64) {
var total_sent, total_recv uint64
for _, v := range apiStats.Networks {
total_sent += v.TxBytes
total_recv += v.RxBytes
}
// Get the DeltaTracker for this specific cache time
sentTracker := dm.getNetworkTracker(cacheTimeMs, true)
recvTracker := dm.getNetworkTracker(cacheTimeMs, false)
// Set current values in the cache-time-specific DeltaTracker
sentTracker.Set(ctr.IdShort, total_sent)
recvTracker.Set(ctr.IdShort, total_recv)
// Get deltas (bytes since last measurement)
sent_delta_raw := sentTracker.Delta(ctr.IdShort)
recv_delta_raw := recvTracker.Delta(ctr.IdShort)
// Calculate bytes per second independently for Tx and Rx if we have previous data
var sent_delta, recv_delta uint64
if initialized {
millisecondsElapsed := uint64(time.Since(stats.PrevReadTime).Milliseconds())
if millisecondsElapsed > 0 {
if sent_delta_raw > 0 {
sent_delta = sent_delta_raw * 1000 / millisecondsElapsed
if sent_delta > maxNetworkSpeedBps {
slog.Warn("Bad network delta", "container", name)
sent_delta = 0
}
}
if recv_delta_raw > 0 {
recv_delta = recv_delta_raw * 1000 / millisecondsElapsed
if recv_delta > maxNetworkSpeedBps {
slog.Warn("Bad network delta", "container", name)
recv_delta = 0
}
}
}
}
return sent_delta, recv_delta
}
// validateCpuPercentage checks if CPU percentage is within valid range
func validateCpuPercentage(cpuPct float64, containerName string) error {
if cpuPct > 100 {
return fmt.Errorf("%s cpu pct greater than 100: %+v", containerName, cpuPct)
}
return nil
}
// updateContainerStatsValues updates the final stats values
func updateContainerStatsValues(stats *container.Stats, cpuPct float64, usedMemory uint64, sent_delta, recv_delta uint64, readTime time.Time) {
stats.Cpu = twoDecimals(cpuPct)
stats.Mem = bytesToMegabytes(float64(usedMemory))
stats.NetworkSent = bytesToMegabytes(float64(sent_delta))
stats.NetworkRecv = bytesToMegabytes(float64(recv_delta))
stats.PrevReadTime = readTime
}
func parseDockerStatus(status string) (string, container.DockerHealth) {
trimmed := strings.TrimSpace(status)
if trimmed == "" {
return "", container.DockerHealthNone
}
// Remove "About " from status
trimmed = strings.Replace(trimmed, "About ", "", 1)
openIdx := strings.LastIndex(trimmed, "(")
if openIdx == -1 || !strings.HasSuffix(trimmed, ")") {
return trimmed, container.DockerHealthNone
}
statusText := strings.TrimSpace(trimmed[:openIdx])
if statusText == "" {
statusText = trimmed
}
healthText := strings.ToLower(strings.TrimSpace(strings.TrimSuffix(trimmed[openIdx+1:], ")")))
// Some Docker statuses include a "health:" prefix inside the parentheses.
// Strip it so it maps correctly to the known health states.
if colonIdx := strings.IndexRune(healthText, ':'); colonIdx != -1 {
prefix := strings.TrimSpace(healthText[:colonIdx])
if prefix == "health" || prefix == "health status" {
healthText = strings.TrimSpace(healthText[colonIdx+1:])
}
}
if health, ok := container.DockerHealthStrings[healthText]; ok {
return statusText, health
}
return trimmed, container.DockerHealthNone
}
// Updates stats for individual container with cache-time-aware delta tracking
func (dm *dockerManager) updateContainerStats(ctr *container.ApiInfo, cacheTimeMs uint16) error {
name := ctr.Names[0][1:]
resp, err := dm.client.Get(fmt.Sprintf("http://localhost/containers/%s/stats?stream=0&one-shot=1", ctr.IdShort))
if err != nil {
return err
}
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
// add empty values if they doesn't exist in map
stats, initialized := dm.containerStatsMap[ctr.IdShort]
if !initialized {
stats = &container.Stats{Name: name, Id: ctr.IdShort, Image: ctr.Image}
dm.containerStatsMap[ctr.IdShort] = stats
}
stats.Id = ctr.IdShort
statusText, health := parseDockerStatus(ctr.Status)
stats.Status = statusText
stats.Health = health
// reset current stats
stats.Cpu = 0
stats.Mem = 0
stats.NetworkSent = 0
stats.NetworkRecv = 0
res := dm.apiStats
res.Networks = nil
if err := dm.decode(resp, res); err != nil {
return err
}
// Initialize CPU tracking for this cache time interval
dm.initializeCpuTracking(cacheTimeMs)
// Get previous CPU values
prevCpuContainer, prevCpuSystem := dm.getCpuPreviousValues(cacheTimeMs, ctr.IdShort)
// Calculate CPU percentage based on platform
var cpuPct float64
if dm.isWindows {
prevRead := dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort]
cpuPct = res.CalculateCpuPercentWindows(prevCpuContainer, prevRead)
} else {
cpuPct = res.CalculateCpuPercentLinux(prevCpuContainer, prevCpuSystem)
}
// Calculate memory usage
usedMemory, err := calculateMemoryUsage(res, dm.isWindows)
if err != nil {
return fmt.Errorf("%s - %w - see https://github.com/henrygd/beszel/issues/144", name, err)
}
// Store current CPU stats for next calculation
currentCpuContainer := res.CPUStats.CPUUsage.TotalUsage
currentCpuSystem := res.CPUStats.SystemUsage
dm.setCpuCurrentValues(cacheTimeMs, ctr.IdShort, currentCpuContainer, currentCpuSystem)
// Validate CPU percentage
if err := validateCpuPercentage(cpuPct, name); err != nil {
return err
}
// Calculate network stats using DeltaTracker
sent_delta, recv_delta := dm.calculateNetworkStats(ctr, res, stats, initialized, name, cacheTimeMs)
// Store current network values for legacy compatibility
var total_sent, total_recv uint64
for _, v := range res.Networks {
total_sent += v.TxBytes
total_recv += v.RxBytes
}
stats.PrevNet.Sent, stats.PrevNet.Recv = total_sent, total_recv
// Update final stats values
updateContainerStatsValues(stats, cpuPct, usedMemory, sent_delta, recv_delta, res.Read)
// store per-cache-time read time for Windows CPU percent calc
dm.lastCpuReadTime[cacheTimeMs][ctr.IdShort] = res.Read
return nil
}
// Delete container stats from map using mutex
func (dm *dockerManager) deleteContainerStatsSync(id string) {
dm.containerStatsMutex.Lock()
defer dm.containerStatsMutex.Unlock()
delete(dm.containerStatsMap, id)
for ct := range dm.lastCpuContainer {
delete(dm.lastCpuContainer[ct], id)
}
for ct := range dm.lastCpuSystem {
delete(dm.lastCpuSystem[ct], id)
}
for ct := range dm.lastCpuReadTime {
delete(dm.lastCpuReadTime[ct], id)
}
}
// Creates a new http client for Docker or Podman API
func newDockerManager(a *Agent) *dockerManager {
dockerHost, exists := GetEnv("DOCKER_HOST")
if exists {
// return nil if set to empty string
if dockerHost == "" {
return nil
}
} else {
dockerHost = getDockerHost()
}
parsedURL, err := url.Parse(dockerHost)
if err != nil {
os.Exit(1)
}
transport := &http.Transport{
DisableCompression: true,
MaxConnsPerHost: 0,
}
switch parsedURL.Scheme {
case "unix":
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "unix", parsedURL.Path)
}
case "tcp", "http", "https":
transport.DialContext = func(ctx context.Context, proto, addr string) (net.Conn, error) {
return (&net.Dialer{}).DialContext(ctx, "tcp", parsedURL.Host)
}
default:
slog.Error("Invalid DOCKER_HOST", "scheme", parsedURL.Scheme)
os.Exit(1)
}
// configurable timeout
timeout := time.Millisecond * time.Duration(dockerTimeoutMs)
if t, set := GetEnv("DOCKER_TIMEOUT"); set {
timeout, err = time.ParseDuration(t)
if err != nil {
slog.Error(err.Error())
os.Exit(1)
}
slog.Info("DOCKER_TIMEOUT", "timeout", timeout)
}
// Custom user-agent to avoid docker bug: https://github.com/docker/for-mac/issues/7575
userAgentTransport := &userAgentRoundTripper{
rt: transport,
userAgent: "Docker-Client/",
}
manager := &dockerManager{
client: &http.Client{
Timeout: timeout,
Transport: userAgentTransport,
},
containerStatsMap: make(map[string]*container.Stats),
sem: make(chan struct{}, 5),
apiContainerList: []*container.ApiInfo{},
apiStats: &container.ApiStats{},
// Initialize cache-time-aware tracking structures
lastCpuContainer: make(map[uint16]map[string]uint64),
lastCpuSystem: make(map[uint16]map[string]uint64),
lastCpuReadTime: make(map[uint16]map[string]time.Time),
networkSentTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
networkRecvTrackers: make(map[uint16]*deltatracker.DeltaTracker[string, uint64]),
}
// If using podman, return client
if strings.Contains(dockerHost, "podman") {
a.systemInfo.Podman = true
manager.goodDockerVersion = true
return manager
}
// this can take up to 5 seconds with retry, so run in goroutine
go manager.checkDockerVersion()
// give version check a chance to complete before returning
time.Sleep(50 * time.Millisecond)
return manager
}
// checkDockerVersion checks Docker version and sets goodDockerVersion if at least 25.0.0.
// Versions before 25.0.0 have a bug with one-shot which requires all requests to be made in one batch.
func (dm *dockerManager) checkDockerVersion() {
var err error
var resp *http.Response
var versionInfo struct {
Version string `json:"Version"`
}
const versionMaxTries = 2
for i := 1; i <= versionMaxTries; i++ {
resp, err = dm.client.Get("http://localhost/version")
if err == nil {
break
}
if resp != nil {
resp.Body.Close()
}
if i < versionMaxTries {
slog.Debug("Failed to get Docker version; retrying", "attempt", i, "error", err)
time.Sleep(5 * time.Second)
}
}
if err != nil {
return
}
if err := dm.decode(resp, &versionInfo); err != nil {
return
}
// if version > 24, one-shot works correctly and we can limit concurrent operations
if dockerVersion, err := semver.Parse(versionInfo.Version); err == nil && dockerVersion.Major > 24 {
dm.goodDockerVersion = true
} else {
slog.Info(fmt.Sprintf("Docker %s is outdated. Upgrade if possible. See https://github.com/henrygd/beszel/issues/58", versionInfo.Version))
}
}
// Decodes Docker API JSON response using a reusable buffer and decoder. Not thread safe.
func (dm *dockerManager) decode(resp *http.Response, d any) error {
if dm.buf == nil {
// initialize buffer with 256kb starting size
dm.buf = bytes.NewBuffer(make([]byte, 0, 1024*256))
dm.decoder = json.NewDecoder(dm.buf)
}
defer resp.Body.Close()
defer dm.buf.Reset()
_, err := dm.buf.ReadFrom(resp.Body)
if err != nil {
return err
}
return dm.decoder.Decode(d)
}
// Test docker / podman sockets and return if one exists
func getDockerHost() string {
scheme := "unix://"
socks := []string{"/var/run/docker.sock", fmt.Sprintf("/run/user/%v/podman/podman.sock", os.Getuid())}
for _, sock := range socks {
if _, err := os.Stat(sock); err == nil {
return scheme + sock
}
}
return scheme + socks[0]
}
// getContainerInfo fetches the inspection data for a container
func (dm *dockerManager) getContainerInfo(ctx context.Context, containerID string) ([]byte, error) {
endpoint := fmt.Sprintf("http://localhost/containers/%s/json", containerID)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return nil, err
}
resp, err := dm.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return nil, fmt.Errorf("container info request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
// Remove sensitive environment variables from Config.Env
var containerInfo map[string]any
if err := json.NewDecoder(resp.Body).Decode(&containerInfo); err != nil {
return nil, err
}
if config, ok := containerInfo["Config"].(map[string]any); ok {
delete(config, "Env")
}
return json.Marshal(containerInfo)
}
// getLogs fetches the logs for a container
func (dm *dockerManager) getLogs(ctx context.Context, containerID string) (string, error) {
endpoint := fmt.Sprintf("http://localhost/containers/%s/logs?stdout=1&stderr=1&tail=%d", containerID, dockerLogsTail)
req, err := http.NewRequestWithContext(ctx, http.MethodGet, endpoint, nil)
if err != nil {
return "", err
}
resp, err := dm.client.Do(req)
if err != nil {
return "", err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
body, _ := io.ReadAll(io.LimitReader(resp.Body, 1024))
return "", fmt.Errorf("logs request failed: %s: %s", resp.Status, strings.TrimSpace(string(body)))
}
var builder strings.Builder
if err := decodeDockerLogStream(resp.Body, &builder); err != nil {
return "", err
}
return builder.String(), nil
}
func decodeDockerLogStream(reader io.Reader, builder *strings.Builder) error {
const headerSize = 8
var header [headerSize]byte
buf := make([]byte, 0, dockerLogsTail*200)
totalBytesRead := 0
for {
if _, err := io.ReadFull(reader, header[:]); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
return nil
}
return err
}
frameLen := binary.BigEndian.Uint32(header[4:])
if frameLen == 0 {
continue
}
// Prevent memory exhaustion from excessively large frames
if frameLen > maxLogFrameSize {
return fmt.Errorf("log frame size (%d) exceeds maximum (%d)", frameLen, maxLogFrameSize)
}
// Check if reading this frame would exceed total log size limit
if totalBytesRead+int(frameLen) > maxTotalLogSize {
// Read and discard remaining data to avoid blocking
_, _ = io.Copy(io.Discard, io.LimitReader(reader, int64(frameLen)))
slog.Debug("Truncating logs: limit reached", "read", totalBytesRead, "limit", maxTotalLogSize)
return nil
}
buf = allocateBuffer(buf, int(frameLen))
if _, err := io.ReadFull(reader, buf[:frameLen]); err != nil {
if errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF) {
if len(buf) > 0 {
builder.Write(buf[:min(int(frameLen), len(buf))])
}
return nil
}
return err
}
builder.Write(buf[:frameLen])
totalBytesRead += int(frameLen)
}
}
func allocateBuffer(current []byte, needed int) []byte {
if cap(current) >= needed {
return current[:needed]
}
return make([]byte, needed)
}
func min(a, b int) int {
if a < b {
return a
}
return b
}