fix: handle backpressure correctly in event stream

This commit is contained in:
garethgeorge
2024-04-11 20:23:49 -07:00
parent 834b74f0f3
commit 4e2bf1f76c
2 changed files with 24 additions and 9 deletions

View File

@@ -188,8 +188,10 @@ func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Re
// GetOperationEvents implements GET /v1/events/operations // GetOperationEvents implements GET /v1/events/operations
func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error { func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error {
errorChan := make(chan error)
defer close(errorChan) errChan := make(chan error, 1)
events := make(chan *v1.OperationEvent, 100)
callback := func(oldOp *v1.Operation, newOp *v1.Operation) { callback := func(oldOp *v1.Operation, newOp *v1.Operation) {
var event *v1.OperationEvent var event *v1.OperationEvent
if oldOp == nil && newOp != nil { if oldOp == nil && newOp != nil {
@@ -212,17 +214,27 @@ func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.R
return return
} }
if err := resp.Send(event); err != nil { select {
errorChan <- fmt.Errorf("failed to send event: %w", err) case events <- event:
default:
errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup")
} }
} }
s.oplog.Subscribe(&callback) s.oplog.Subscribe(&callback)
defer s.oplog.Unsubscribe(&callback) defer s.oplog.Unsubscribe(&callback)
select {
case <-ctx.Done(): for {
return nil select {
case err := <-errorChan: case err := <-errChan:
return err return err
case <-ctx.Done():
return nil
case event := <-events:
zap.S().Infof("sending event %v", event)
if err := resp.Send(event); err != nil {
return fmt.Errorf("failed to write event: %w", err)
}
}
} }
} }

3
proto/v1/hostinfo.proto Normal file
View File

@@ -0,0 +1,3 @@
syntax = "proto3";
package v1;