diff --git a/internal/api/backresthandler.go b/internal/api/backresthandler.go index 3a3f403..58688d0 100644 --- a/internal/api/backresthandler.go +++ b/internal/api/backresthandler.go @@ -188,8 +188,10 @@ func (s *BackrestHandler) ListSnapshotFiles(ctx context.Context, req *connect.Re // GetOperationEvents implements GET /v1/events/operations func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.Request[emptypb.Empty], resp *connect.ServerStream[v1.OperationEvent]) error { - errorChan := make(chan error) - defer close(errorChan) + + errChan := make(chan error, 1) + events := make(chan *v1.OperationEvent, 100) + callback := func(oldOp *v1.Operation, newOp *v1.Operation) { var event *v1.OperationEvent if oldOp == nil && newOp != nil { @@ -212,17 +214,27 @@ func (s *BackrestHandler) GetOperationEvents(ctx context.Context, req *connect.R return } - if err := resp.Send(event); err != nil { - errorChan <- fmt.Errorf("failed to send event: %w", err) + select { + case events <- event: + default: + errChan <- errors.New("event buffer overflow, closing stream for client retry and catchup") } } s.oplog.Subscribe(&callback) defer s.oplog.Unsubscribe(&callback) - select { - case <-ctx.Done(): - return nil - case err := <-errorChan: - return err + + for { + select { + case err := <-errChan: + return err + case <-ctx.Done(): + return nil + case event := <-events: + zap.S().Infof("sending event %v", event) + if err := resp.Send(event); err != nil { + return fmt.Errorf("failed to write event: %w", err) + } + } } } diff --git a/proto/v1/hostinfo.proto b/proto/v1/hostinfo.proto new file mode 100644 index 0000000..ce1b349 --- /dev/null +++ b/proto/v1/hostinfo.proto @@ -0,0 +1,3 @@ +syntax = "proto3"; + +package v1;