fix: improve memory pressure from getlogs

This commit is contained in:
garethgeorge
2024-11-13 18:18:21 -08:00
parent b7abbf9603
commit 592e4cf9dd
3 changed files with 38 additions and 33 deletions

View File

@@ -504,54 +504,66 @@ func (s *BackrestHandler) GetLogs(ctx context.Context, req *connect.Request[v1.L
r.Close()
}()
var bufferMu sync.Mutex
var buffer bytes.Buffer
var errChan = make(chan error, 1)
var sendChan = make(chan []byte, 1)
var buffer bytes.Buffer
var bufferMu sync.Mutex
go func() {
data := make([]byte, 4*1024)
for {
n, err := r.Read(data)
if n == 0 {
close(errChan)
break
} else if err != nil && err != io.EOF {
errChan <- fmt.Errorf("failed to read log data: %w", err)
close(errChan)
return
break
}
bufferMu.Lock()
buffer.Write(data[:n])
if buffer.Len() > 128*1024 {
sendChan <- bytes.Clone(buffer.Bytes())
buffer.Reset()
}
bufferMu.Unlock()
}
if buffer.Len() > 0 {
bufferMu.Lock()
sendChan <- bytes.Clone(buffer.Bytes())
buffer.Reset()
bufferMu.Unlock()
}
close(sendChan)
}()
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
flush := func() error {
bufferMu.Lock()
if buffer.Len() > 0 {
if err := resp.Send(&types.BytesValue{Value: buffer.Bytes()}); err != nil {
bufferMu.Unlock()
return fmt.Errorf("failed to send log data: %w", err)
}
buffer.Reset()
}
bufferMu.Unlock()
return nil
}
for {
select {
case <-ctx.Done():
return flush()
case err := <-errChan:
_ = flush()
return err
case <-ticker.C:
if err := flush(); err != nil {
return nil
case data, ok := <-sendChan:
if !ok {
return nil
}
if err := resp.Send(&types.BytesValue{Value: data}); err != nil {
return err
}
case err := <-errChan:
return err
case <-ticker.C:
bufferMu.Lock()
if buffer.Len() > 0 {
if err := resp.Send(&types.BytesValue{Value: bytes.Clone(buffer.Bytes())}); err != nil {
bufferMu.Unlock()
return err
}
buffer.Reset()
}
bufferMu.Unlock()
}
}