mirror of
https://github.com/garethgeorge/backrest.git
synced 2025-12-14 17:45:36 +00:00
112 lines
3.1 KiB
Go
112 lines
3.1 KiB
Go
package syncapi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
|
)
|
|
|
|
type syncCommandStreamTrait interface {
|
|
Send(item *v1.SyncStreamItem) error
|
|
Receive() (*v1.SyncStreamItem, error)
|
|
}
|
|
|
|
var _ syncCommandStreamTrait = (*connect.BidiStream[v1.SyncStreamItem, v1.SyncStreamItem])(nil) // Ensure that connect.BidiStream implements syncCommandStreamTrait
|
|
var _ syncCommandStreamTrait = (*connect.BidiStreamForClient[v1.SyncStreamItem, v1.SyncStreamItem])(nil) // Ensure that connect.BidiStreamForClient implements syncCommandStreamTrait
|
|
|
|
type bidiSyncCommandStream struct {
|
|
sendChan chan *v1.SyncStreamItem
|
|
recvChan chan *v1.SyncStreamItem
|
|
terminateWithErrChan chan error
|
|
}
|
|
|
|
func newBidiSyncCommandStream() *bidiSyncCommandStream {
|
|
return &bidiSyncCommandStream{
|
|
sendChan: make(chan *v1.SyncStreamItem, 64), // Buffered channel to allow sending items without blocking
|
|
recvChan: make(chan *v1.SyncStreamItem, 1),
|
|
terminateWithErrChan: make(chan error, 1),
|
|
}
|
|
}
|
|
|
|
func (s *bidiSyncCommandStream) Send(item *v1.SyncStreamItem) {
|
|
select {
|
|
case s.sendChan <- item:
|
|
default:
|
|
// Try again with a timeout, if it fails, send an error to terminate the stream
|
|
select {
|
|
case s.sendChan <- item:
|
|
case <-time.After(100 * time.Millisecond):
|
|
s.SendErrorAndTerminate(NewSyncErrorDisconnected(errors.New("send channel is full, cannot send item")))
|
|
}
|
|
}
|
|
}
|
|
|
|
// SendErrorAndTerminate sends an error to the termination channel.
|
|
// If the error is nil, it terminates only.
|
|
func (s *bidiSyncCommandStream) SendErrorAndTerminate(err error) {
|
|
if err == nil {
|
|
return
|
|
}
|
|
select {
|
|
case s.terminateWithErrChan <- err:
|
|
default:
|
|
// If the channel is full, we can't send the error, so we just ignore it.
|
|
// This is a best-effort termination.
|
|
}
|
|
}
|
|
|
|
func (s *bidiSyncCommandStream) ReadChannel() chan *v1.SyncStreamItem {
|
|
return s.recvChan
|
|
}
|
|
|
|
func (s *bidiSyncCommandStream) ReceiveWithinDuration(d time.Duration) *v1.SyncStreamItem {
|
|
select {
|
|
case item := <-s.recvChan:
|
|
return item
|
|
case <-time.After(d):
|
|
return nil // Return nil if no item is received within the duration
|
|
}
|
|
}
|
|
|
|
func (s *bidiSyncCommandStream) ConnectStream(ctx context.Context, stream syncCommandStreamTrait) error {
|
|
ctx, cancel := context.WithCancel(ctx)
|
|
defer cancel()
|
|
go func() {
|
|
for ctx.Err() == nil {
|
|
if val, err := stream.Receive(); err != nil {
|
|
s.SendErrorAndTerminate(NewSyncErrorDisconnected(fmt.Errorf("receiving item: %w", err)))
|
|
break
|
|
} else {
|
|
s.recvChan <- val
|
|
}
|
|
}
|
|
close(s.recvChan)
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case item := <-s.sendChan:
|
|
if item == nil {
|
|
continue
|
|
}
|
|
if err := stream.Send(item); err != nil {
|
|
if errors.Is(err, io.EOF) {
|
|
err = fmt.Errorf("connection failed or dropped: %w", err)
|
|
}
|
|
s.SendErrorAndTerminate(err)
|
|
return err
|
|
}
|
|
case err := <-s.terminateWithErrChan:
|
|
return err // Terminate the stream with the error or nil if no error was sent
|
|
case <-ctx.Done():
|
|
// Context is done, we should stop processing.
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|