mirror of
https://github.com/garethgeorge/backrest.git
synced 2026-05-05 04:20:31 +00:00
575 lines
22 KiB
Go
575 lines
22 KiB
Go
package syncapi
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"slices"
|
|
"time"
|
|
|
|
"connectrpc.com/connect"
|
|
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
|
"github.com/garethgeorge/backrest/gen/go/v1sync"
|
|
"github.com/garethgeorge/backrest/gen/go/v1sync/v1syncconnect"
|
|
"github.com/garethgeorge/backrest/internal/api/syncapi/permissions"
|
|
"github.com/garethgeorge/backrest/internal/env"
|
|
"github.com/garethgeorge/backrest/internal/oplog"
|
|
"go.uber.org/zap"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
// SyncProtocolVersion is the application-layer handshake version. Bumped to 3
|
|
// when the handshake was redesigned to carry a single signature that binds
|
|
// the long-term identity to the post-quantum transport transcript (and to
|
|
// drop the legacy v1.SignedMessage instance_id wrapping).
|
|
const SyncProtocolVersion = 3
|
|
|
|
type BackrestSyncHandler struct {
|
|
v1syncconnect.UnimplementedBackrestSyncServiceHandler
|
|
mgr *SyncManager
|
|
mapper *remoteOpIDMapper
|
|
}
|
|
|
|
var _ v1syncconnect.BackrestSyncServiceHandler = &BackrestSyncHandler{}
|
|
|
|
func NewBackrestSyncHandler(mgr *SyncManager) *BackrestSyncHandler {
|
|
mapper, err := newRemoteOpIDMapper(mgr.oplog, 4096)
|
|
if err != nil {
|
|
panic(fmt.Errorf("syncapi: constructing remote op ID mapper: %w", err))
|
|
}
|
|
return &BackrestSyncHandler{
|
|
mgr: mgr,
|
|
mapper: mapper,
|
|
}
|
|
}
|
|
|
|
func (h *BackrestSyncHandler) Sync(ctx context.Context, stream *connect.BidiStream[v1sync.SyncStreamItem, v1sync.SyncStreamItem]) error {
|
|
// TODO: this request can be very long lived, we must periodically refresh the config
|
|
// e.g. to disconnect a client if its access is revoked.
|
|
snapshot := h.mgr.getSyncConfigSnapshot()
|
|
if snapshot == nil {
|
|
return connect.NewError(connect.CodePermissionDenied, errors.New("sync server is not configured"))
|
|
}
|
|
|
|
sessionHandler := newSyncHandlerServer(h.mgr, snapshot, h.mapper)
|
|
cmdStream := newBidiSyncCommandStream()
|
|
|
|
go func() {
|
|
err := runSync(
|
|
ctx,
|
|
snapshot.config.Instance,
|
|
snapshot.identityKey,
|
|
cmdStream,
|
|
sessionHandler,
|
|
snapshot.config.GetMultihost().GetAuthorizedClients(),
|
|
"", // server never sends a pairing secret
|
|
h.handleUnknownPeerPairing(snapshot),
|
|
)
|
|
cmdStream.SendErrorAndTerminate(err)
|
|
}()
|
|
|
|
if err := cmdStream.ConnectStream(ctx, stream, false /* isInitiator: server side */); err != nil {
|
|
zap.S().Errorf("sync handler stream error: %v", err)
|
|
var syncErr *SyncError
|
|
if errors.As(err, &syncErr) {
|
|
switch syncErr.State {
|
|
case v1sync.ConnectionState_CONNECTION_STATE_ERROR_AUTH:
|
|
return connect.NewError(connect.CodePermissionDenied, syncErr.Message)
|
|
case v1sync.ConnectionState_CONNECTION_STATE_ERROR_PROTOCOL:
|
|
return connect.NewError(connect.CodeInvalidArgument, syncErr.Message)
|
|
default:
|
|
return connect.NewError(connect.CodeInternal, syncErr.Message)
|
|
}
|
|
}
|
|
|
|
if sessionHandler.peer != nil {
|
|
peerState := h.mgr.peerStateManager.GetPeerState(sessionHandler.peer.Keyid).Clone()
|
|
if peerState == nil {
|
|
peerState = newPeerState(sessionHandler.peer.InstanceId, sessionHandler.peer.Keyid)
|
|
}
|
|
if syncErr != nil {
|
|
peerState.ConnectionState = syncErr.State
|
|
peerState.ConnectionStateMessage = syncErr.Message.Error()
|
|
} else if errors.Is(err, context.Canceled) {
|
|
peerState.ConnectionState = v1sync.ConnectionState_CONNECTION_STATE_DISCONNECTED
|
|
peerState.ConnectionStateMessage = "lost connection"
|
|
} else {
|
|
peerState.ConnectionState = v1sync.ConnectionState_CONNECTION_STATE_DISCONNECTED
|
|
peerState.ConnectionStateMessage = fmt.Sprintf("disconnected: %v", err)
|
|
}
|
|
peerState.LastHeartbeat = time.Now()
|
|
h.mgr.peerStateManager.SetPeerState(sessionHandler.peer.Keyid, peerState)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// syncSessionHandlerServer is a syncSessionHandler implementation for servers.
|
|
type syncSessionHandlerServer struct {
|
|
unimplementedSyncSessionHandler
|
|
|
|
mgr *SyncManager
|
|
snapshot syncConfigSnapshot
|
|
|
|
peer *v1.Multihost_Peer // The authorized client peer this handler is associated with, set during OnConnectionEstablished.
|
|
permissions *permissions.PermissionSet
|
|
handle *connectedPeerHandle // The handle registered with the manager; used so unregister can CAS against it.
|
|
|
|
mapper *remoteOpIDMapper
|
|
|
|
l *zap.Logger
|
|
}
|
|
|
|
func newSyncHandlerServer(mgr *SyncManager, snapshot *syncConfigSnapshot, mapper *remoteOpIDMapper) *syncSessionHandlerServer {
|
|
return &syncSessionHandlerServer{
|
|
mgr: mgr,
|
|
snapshot: *snapshot,
|
|
mapper: mapper,
|
|
l: zap.L().Named("syncserver handler for unknown peer"),
|
|
}
|
|
}
|
|
|
|
var _ syncSessionHandler = (*syncSessionHandlerServer)(nil)
|
|
|
|
func (h *syncSessionHandlerServer) OnConnectionEstablished(ctx context.Context, stream *bidiSyncCommandStream, peer *v1.Multihost_Peer) error {
|
|
// Verify that the peer is in our authorized clients list
|
|
authorizedClientPeerIdx := slices.IndexFunc(h.snapshot.config.Multihost.GetAuthorizedClients(), func(p *v1.Multihost_Peer) bool {
|
|
return p.InstanceId == peer.InstanceId && p.Keyid == peer.Keyid
|
|
})
|
|
if authorizedClientPeerIdx == -1 {
|
|
h.l.Sugar().Warnf("rejected a connection from client instance ID %q because it is not authorized", peer.InstanceId)
|
|
return NewSyncErrorAuth(errors.New("client is not an authorized peer"))
|
|
}
|
|
|
|
h.peer = h.snapshot.config.Multihost.AuthorizedClients[authorizedClientPeerIdx]
|
|
h.l = zap.L().Named(fmt.Sprintf("syncserver handler for peer %q", h.peer.InstanceId))
|
|
|
|
var err error
|
|
h.permissions, err = permissions.NewPermissionSet(h.peer.GetPermissions())
|
|
if err != nil {
|
|
h.l.Sugar().Warnf("failed to create permission set for client %q: %v", peer.InstanceId, err)
|
|
return NewSyncErrorInternal(fmt.Errorf("failed to create permission set for client %q: %w", peer.InstanceId, err))
|
|
}
|
|
|
|
// Configure the state for the connected peer.
|
|
peerState := newPeerState(peer.InstanceId, h.peer.Keyid)
|
|
peerState.ConnectionStateMessage = "connected"
|
|
peerState.ConnectionState = v1sync.ConnectionState_CONNECTION_STATE_CONNECTED
|
|
peerState.LastHeartbeat = time.Now()
|
|
h.mgr.peerStateManager.SetPeerState(h.peer.Keyid, peerState)
|
|
|
|
h.l.Sugar().Infof("accepted a connection from client instance ID %q", h.peer.InstanceId)
|
|
|
|
// Register this peer's stream handle so the API layer can send messages to it.
|
|
// If a stream is already registered for this key ID (e.g. the previous session
|
|
// hasn't noticed it's dead yet), kick it so the newest connection wins.
|
|
h.handle = &connectedPeerHandle{
|
|
stream: stream,
|
|
peer: h.peer,
|
|
permissions: h.permissions,
|
|
}
|
|
if prev := h.mgr.registerConnectedPeer(h.peer.Keyid, h.handle); prev != nil {
|
|
h.l.Sugar().Infof("displacing existing connection for client %q (%s) with newer session", h.peer.InstanceId, h.peer.Keyid)
|
|
prev.stream.SendErrorAndTerminate(NewSyncErrorDisconnected(errors.New("displaced by newer connection from the same peer")))
|
|
}
|
|
|
|
// start a heartbeat thread
|
|
go sendHeartbeats(ctx, stream, env.MultihostHeartbeatInterval())
|
|
|
|
// subscribe to our own configuration for changes
|
|
go func() {
|
|
configWatchCh := h.mgr.configMgr.OnChange.Subscribe()
|
|
defer h.mgr.configMgr.OnChange.Unsubscribe(configWatchCh)
|
|
for {
|
|
select {
|
|
case <-configWatchCh:
|
|
newConfig, err := h.mgr.configMgr.Get()
|
|
if err != nil {
|
|
h.l.Sugar().Warnf("failed to get config on change: %v, disconnecting client", err)
|
|
stream.SendErrorAndTerminate(nil)
|
|
return
|
|
}
|
|
|
|
// Check if this peer is still authorized
|
|
peerIdx := slices.IndexFunc(newConfig.Multihost.GetAuthorizedClients(), func(p *v1.Multihost_Peer) bool {
|
|
return p.InstanceId == h.peer.InstanceId && p.Keyid == h.peer.Keyid
|
|
})
|
|
if peerIdx == -1 {
|
|
h.l.Sugar().Infof("disconnecting client %q: no longer authorized", h.peer.InstanceId)
|
|
stream.SendErrorAndTerminate(nil)
|
|
return
|
|
}
|
|
|
|
// Check if permissions changed by comparing the proto peer definition
|
|
updatedPeer := newConfig.Multihost.AuthorizedClients[peerIdx]
|
|
if !proto.Equal(h.peer, updatedPeer) {
|
|
h.l.Sugar().Infof("disconnecting client %q: peer configuration changed", h.peer.InstanceId)
|
|
stream.SendErrorAndTerminate(nil)
|
|
return
|
|
}
|
|
|
|
// Permissions unchanged — send updated config and shared repos to client
|
|
configRepos, configPlans, err := h.sendConfigToClient(stream, newConfig)
|
|
if err != nil {
|
|
h.l.Sugar().Warnf("failed to send updated config to client %q: %v", h.peer.InstanceId, err)
|
|
} else {
|
|
sharedRepos := h.sendSharedReposToClient(stream, newConfig)
|
|
h.l.Sugar().Debugf("config changed, sent update to client %q: %d repos, %d plans (config); %d shared repos pushed",
|
|
h.peer.InstanceId, configRepos, configPlans, sharedRepos)
|
|
}
|
|
case <-ctx.Done():
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
configRepos, configPlans, err := h.sendConfigToClient(stream, h.snapshot.config)
|
|
if err != nil {
|
|
return NewSyncErrorInternal(fmt.Errorf("sending initial config to client: %w", err))
|
|
}
|
|
|
|
// Push shared repos to the client
|
|
sharedRepoCount := h.sendSharedReposToClient(stream, h.snapshot.config)
|
|
|
|
h.l.Sugar().Infof("sent initial state to client %q: %d repos, %d plans (config); %d shared repos pushed",
|
|
h.peer.InstanceId, configRepos, configPlans, sharedRepoCount)
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) OnConnectionDisconnected() {
|
|
if h.peer != nil && h.handle != nil {
|
|
h.mgr.unregisterConnectedPeer(h.peer.Keyid, h.handle)
|
|
}
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleHeartbeat(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionHeartbeat) error {
|
|
peerState := h.mgr.peerStateManager.GetPeerState(h.peer.Keyid).Clone()
|
|
if peerState == nil {
|
|
return NewSyncErrorInternal(fmt.Errorf("peer state for %q not found", h.peer.Keyid))
|
|
}
|
|
peerState.LastHeartbeat = time.Now()
|
|
h.mgr.peerStateManager.SetPeerState(h.peer.Keyid, peerState)
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleReceiveOperations(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveOperations) error {
|
|
switch event := item.GetEvent().Event.(type) {
|
|
case *v1.OperationEvent_CreatedOperations:
|
|
h.l.Debug("received created operations", zap.Any("operations", event.CreatedOperations.GetOperations()))
|
|
for _, op := range event.CreatedOperations.GetOperations() {
|
|
if err := h.insertOrUpdate(op, false /* isUpdate */); err != nil {
|
|
return fmt.Errorf("action ReceiveOperations: operation event create %+v: %w", op, err)
|
|
}
|
|
}
|
|
case *v1.OperationEvent_UpdatedOperations:
|
|
h.l.Debug("received update operations", zap.Any("operations", event.UpdatedOperations.GetOperations()))
|
|
for _, op := range event.UpdatedOperations.GetOperations() {
|
|
if err := h.insertOrUpdate(op, true /* isUpdate */); err != nil {
|
|
return fmt.Errorf("action ReceiveOperations: operation event update %+v: %w", op, err)
|
|
}
|
|
}
|
|
case *v1.OperationEvent_DeletedOperations:
|
|
h.l.Debug("received delete operations", zap.Any("operations", event.DeletedOperations.GetValues()))
|
|
for _, id := range event.DeletedOperations.GetValues() {
|
|
if err := h.deleteByOriginalID(id); err != nil {
|
|
return fmt.Errorf("action ReceiveOperations: operation event delete %d: %w", id, err)
|
|
}
|
|
}
|
|
case *v1.OperationEvent_KeepAlive:
|
|
default:
|
|
return NewSyncErrorProtocol(errors.New("action ReceiveOperations: unknown event type"))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleReceiveConfig(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveConfig) error {
|
|
peerState := h.mgr.peerStateManager.GetPeerState(h.peer.Keyid).Clone()
|
|
if peerState == nil {
|
|
return NewSyncErrorInternal(fmt.Errorf("peer state for %q not found", h.peer.Keyid))
|
|
}
|
|
peerState.Config = item.GetConfig()
|
|
h.mgr.peerStateManager.SetPeerState(h.peer.Keyid, peerState)
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleReceiveResources(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveResources) error {
|
|
h.l.Debug("received resource list from client",
|
|
zap.Any("repos", item.GetRepos()),
|
|
zap.Any("plans", item.GetPlans()))
|
|
peerState := h.mgr.peerStateManager.GetPeerState(h.peer.Keyid).Clone()
|
|
if peerState == nil {
|
|
return NewSyncErrorInternal(fmt.Errorf("peer state for %q not found", h.peer.Keyid))
|
|
}
|
|
repos := item.GetRepos()
|
|
plans := item.GetPlans()
|
|
for _, repo := range repos {
|
|
peerState.KnownRepos[repo.Id] = repo
|
|
}
|
|
for _, plan := range plans {
|
|
peerState.KnownPlans[plan.Id] = plan
|
|
}
|
|
h.mgr.peerStateManager.SetPeerState(h.peer.Keyid, peerState)
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) insertOrUpdate(op *v1.Operation, isUpdate bool) error {
|
|
// Returns a localOpID and localFlowID or 0 if not found in which case a new ID will be assigned by the insert.
|
|
localOpID, localFlowID, err := h.mapper.TranslateOpIdAndFlowID(h.peer.Keyid, op.Id, op.FlowId)
|
|
if err != nil {
|
|
return fmt.Errorf("translating operation ID and flow ID: %w", err)
|
|
}
|
|
op.OriginalInstanceKeyid = h.peer.Keyid
|
|
op.OriginalId = op.Id
|
|
op.OriginalFlowId = op.FlowId
|
|
op.Id = localOpID
|
|
op.FlowId = localFlowID
|
|
// Use Set which handles both insert (Id==0) and update (Id!=0),
|
|
// preserving the operation's Modno from the client.
|
|
return h.mgr.oplog.Set(oplog.SetOptions{}, op)
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) deleteByOriginalID(originalID int64) error {
|
|
foundOp, err := h.mgr.oplog.FindOneMetadata(oplog.Query{}.
|
|
SetOriginalInstanceKeyid(h.peer.Keyid).
|
|
SetOriginalID(originalID))
|
|
if err != nil {
|
|
return fmt.Errorf("finding operation metadata: %w", err)
|
|
}
|
|
if foundOp.ID == 0 {
|
|
h.l.Sugar().Debugf("received delete for non-existent operation %v", originalID)
|
|
return nil
|
|
}
|
|
return h.mgr.oplog.Delete(foundOp.ID)
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) sendConfigToClient(stream *bidiSyncCommandStream, config *v1.Config) (int, int, error) {
|
|
remoteConfig := &v1sync.RemoteConfig{
|
|
Version: config.Version,
|
|
Modno: config.Modno,
|
|
}
|
|
resourceListMsg := &v1sync.SyncStreamItem_SyncActionReceiveResources{}
|
|
for _, repo := range config.Repos {
|
|
if h.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanViewConfiguration...) {
|
|
remoteConfig.Repos = append(remoteConfig.Repos, repo)
|
|
resourceListMsg.Repos = append(resourceListMsg.Repos, &v1sync.RepoMetadata{
|
|
Id: repo.Id,
|
|
Guid: repo.Guid,
|
|
})
|
|
}
|
|
}
|
|
for _, plan := range config.Plans {
|
|
if h.permissions.CheckPermissionForPlan(plan.Id, permissions.PermsCanViewConfiguration...) {
|
|
remoteConfig.Plans = append(remoteConfig.Plans, plan)
|
|
resourceListMsg.Plans = append(resourceListMsg.Plans, &v1sync.PlanMetadata{
|
|
Id: plan.Id,
|
|
})
|
|
}
|
|
}
|
|
|
|
// Send the config, this is the first meaningful packet the client will receive.
|
|
stream.Send(&v1sync.SyncStreamItem{
|
|
Action: &v1sync.SyncStreamItem_ReceiveConfig{
|
|
ReceiveConfig: &v1sync.SyncStreamItem_SyncActionReceiveConfig{
|
|
Config: remoteConfig,
|
|
},
|
|
},
|
|
})
|
|
|
|
// Send the updated list of resources that the client can access.
|
|
stream.Send(&v1sync.SyncStreamItem{
|
|
Action: &v1sync.SyncStreamItem_ReceiveResources{
|
|
ReceiveResources: resourceListMsg,
|
|
},
|
|
})
|
|
return len(remoteConfig.Repos), len(remoteConfig.Plans), nil
|
|
}
|
|
|
|
// sendSharedReposToClient sends repos marked as shared to the client via SetConfig.
|
|
// This pushes repo configurations to the client so they are added to the client's local config.
|
|
// Returns the number of shared repos sent.
|
|
//
|
|
// On the host, PERMISSION_RECEIVE_SHARED_REPOS is checked per-repo, so the host can
|
|
// scope which shared repos are pushed to a given client (a scopeless grant acts as
|
|
// a wildcard and pushes every shared repo). The client also enforces the same
|
|
// permission on its known_host entry before accepting the push, but does so
|
|
// scope-lessly — see syncclient.go HandleSetConfig.
|
|
func (h *syncSessionHandlerServer) sendSharedReposToClient(stream *bidiSyncCommandStream, config *v1.Config) int {
|
|
var sharedRepos []*v1.Repo
|
|
for _, repo := range config.Repos {
|
|
if !repo.GetShared() {
|
|
continue
|
|
}
|
|
if !h.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanReceiveSharedRepos...) {
|
|
continue
|
|
}
|
|
repoCopy := proto.Clone(repo).(*v1.Repo)
|
|
repoCopy.OriginInstanceId = config.Instance
|
|
sharedRepos = append(sharedRepos, repoCopy)
|
|
}
|
|
|
|
if len(sharedRepos) == 0 {
|
|
return 0
|
|
}
|
|
|
|
stream.Send(&v1sync.SyncStreamItem{
|
|
Action: &v1sync.SyncStreamItem_SetConfig{
|
|
SetConfig: &v1sync.SyncStreamItem_SyncActionSetConfig{
|
|
Repos: sharedRepos,
|
|
},
|
|
},
|
|
})
|
|
return len(sharedRepos)
|
|
}
|
|
|
|
// ValidatePairingSecret checks a pairing secret against a list of pairing tokens.
|
|
// Returns the matching token if valid, or an error explaining why validation failed.
|
|
// This is a pure function with no side effects, making it easy to test exhaustively.
|
|
func ValidatePairingSecret(secret string, tokens []*v1.Multihost_PairingToken, now time.Time) (*v1.Multihost_PairingToken, error) {
|
|
if secret == "" {
|
|
return nil, fmt.Errorf("empty pairing secret")
|
|
}
|
|
for _, token := range tokens {
|
|
if token.Secret != secret {
|
|
continue
|
|
}
|
|
if token.ExpiresAtUnix > 0 && now.Unix() > token.ExpiresAtUnix {
|
|
return nil, fmt.Errorf("pairing token %q has expired", token.Label)
|
|
}
|
|
if token.MaxUses > 0 && token.Uses >= token.MaxUses {
|
|
return nil, fmt.Errorf("pairing token %q has reached its maximum number of uses (%d)", token.Label, token.MaxUses)
|
|
}
|
|
return token, nil
|
|
}
|
|
return nil, fmt.Errorf("no matching pairing token found")
|
|
}
|
|
|
|
// handleUnknownPeerPairing returns an onUnknownPeerFunc that validates a pairing secret
|
|
// from the handshake, adds the client to authorized_clients in the config, and consumes the token.
|
|
// The peer is added to the config BEFORE runSync proceeds with its normal authorization check,
|
|
// ensuring that runSync's hard gate (peer must be in authorized_clients) is never bypassed.
|
|
//
|
|
// runSync has already verified the handshake signature against the transport
|
|
// transcript before invoking this callback, so signature validity (and
|
|
// therefore the client's possession of the private key) is established by
|
|
// the time we get here. The pairing token check below is the only remaining
|
|
// authorization step.
|
|
func (h *BackrestSyncHandler) handleUnknownPeerPairing(snapshot *syncConfigSnapshot) onUnknownPeerFunc {
|
|
return func(handshake *v1sync.SyncStreamItem) (*v1.Multihost_Peer, error) {
|
|
pairingSecret := handshake.GetHandshake().GetPairingSecret()
|
|
if pairingSecret == "" {
|
|
return nil, fmt.Errorf("unknown peer and no pairing secret provided")
|
|
}
|
|
|
|
peerKeyID := handshake.GetHandshake().GetPublicKey().GetKeyid()
|
|
peerInstanceID := handshake.GetHandshake().GetInstanceId()
|
|
|
|
// Atomically validate the pairing secret and add the client.
|
|
var newPeer *v1.Multihost_Peer
|
|
if err := h.mgr.configMgr.Transform(func(cfg *v1.Config) (*v1.Config, error) {
|
|
token, err := ValidatePairingSecret(pairingSecret, cfg.GetMultihost().GetPairingTokens(), time.Now())
|
|
if err != nil {
|
|
zap.S().Warnf("rejected pairing attempt from %q (%s): %v", peerInstanceID, peerKeyID, err)
|
|
return nil, err
|
|
}
|
|
|
|
newPeer = &v1.Multihost_Peer{
|
|
InstanceId: peerInstanceID,
|
|
Keyid: peerKeyID,
|
|
Permissions: token.Permissions,
|
|
}
|
|
cfg.Multihost.AuthorizedClients = append(cfg.Multihost.AuthorizedClients, newPeer)
|
|
|
|
// Consume the token: increment uses, remove if exhausted.
|
|
token.Uses++
|
|
if token.MaxUses > 0 && token.Uses >= token.MaxUses {
|
|
cfg.Multihost.PairingTokens = slices.DeleteFunc(cfg.Multihost.PairingTokens, func(t *v1.Multihost_PairingToken) bool {
|
|
return t.Secret == token.Secret
|
|
})
|
|
}
|
|
|
|
cfg.Modno++
|
|
return cfg, nil
|
|
}); err != nil {
|
|
return nil, fmt.Errorf("failed to save paired client: %w", err)
|
|
}
|
|
|
|
zap.S().Infof("successfully paired client %q (%s)", peerInstanceID, peerKeyID)
|
|
return newPeer, nil
|
|
}
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleOperationManifest(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionOperationManifest) error {
|
|
h.l.Sugar().Debugf("received operation manifest with %d operations", len(item.GetOpIds()))
|
|
// Build local state: original_id → {localID, modno}
|
|
type localOp struct {
|
|
localID int64
|
|
modno int64
|
|
}
|
|
localState := map[int64]localOp{}
|
|
if err := h.mgr.oplog.QueryMetadata(
|
|
oplog.Query{}.SetOriginalInstanceKeyid(h.peer.Keyid),
|
|
func(meta oplog.OpMetadata) error {
|
|
localState[meta.OriginalID] = localOp{localID: meta.ID, modno: meta.Modno}
|
|
return nil
|
|
},
|
|
); err != nil {
|
|
return fmt.Errorf("querying local operation metadata: %w", err)
|
|
}
|
|
h.l.Sugar().Debugf("local state has %d operations from this peer", len(localState))
|
|
|
|
// Build remote set from manifest
|
|
if len(item.GetOpIds()) != len(item.GetModnos()) {
|
|
return NewSyncErrorProtocol(fmt.Errorf("operation manifest has mismatched OpIds (%d) and Modnos (%d) lengths", len(item.GetOpIds()), len(item.GetModnos())))
|
|
}
|
|
remoteSet := make(map[int64]int64, len(item.GetOpIds()))
|
|
for i, id := range item.GetOpIds() {
|
|
remoteSet[id] = item.GetModnos()[i]
|
|
}
|
|
|
|
// Delete ops not in manifest
|
|
var toDelete []int64
|
|
for origID, local := range localState {
|
|
if _, exists := remoteSet[origID]; !exists {
|
|
toDelete = append(toDelete, local.localID)
|
|
}
|
|
}
|
|
if len(toDelete) > 0 {
|
|
h.l.Sugar().Debugf("deleting %d stale operations", len(toDelete))
|
|
if err := h.mgr.oplog.Delete(toDelete...); err != nil {
|
|
h.l.Sugar().Warnf("failed to delete stale operations: %v", err)
|
|
}
|
|
}
|
|
|
|
// Find ops we need (new or changed modno), preserving manifest order
|
|
opIDs := item.GetOpIds()
|
|
modnos := item.GetModnos()
|
|
var needIDs []int64
|
|
for i, id := range opIDs {
|
|
modno := modnos[i]
|
|
local, exists := localState[id]
|
|
if !exists || local.modno != modno {
|
|
needIDs = append(needIDs, id)
|
|
}
|
|
}
|
|
h.l.Sugar().Debugf("need %d operations (new or changed), local state comparison: remoteSet=%v localState=%v", len(needIDs), remoteSet, localState)
|
|
|
|
// Request the ops we need
|
|
if len(needIDs) > 0 {
|
|
stream.Send(&v1sync.SyncStreamItem{
|
|
Action: &v1sync.SyncStreamItem_RequestOperationData{
|
|
RequestOperationData: &v1sync.SyncStreamItem_SyncActionRequestOperationData{
|
|
OpIds: needIDs,
|
|
},
|
|
},
|
|
})
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (h *syncSessionHandlerServer) HandleRequestOperationData(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionRequestOperationData) error {
|
|
return NewSyncErrorProtocol(fmt.Errorf("server should not receive RequestOperationData"))
|
|
}
|