package syncapi import ( "context" "crypto/tls" "errors" "fmt" "net" "net/http" "slices" "sync" "time" "github.com/garethgeorge/backrest/gen/go/types" v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/gen/go/v1sync" "github.com/garethgeorge/backrest/gen/go/v1sync/v1syncconnect" "github.com/garethgeorge/backrest/internal/api/syncapi/permissions" "github.com/garethgeorge/backrest/internal/env" "github.com/garethgeorge/backrest/internal/oplog" "go.uber.org/zap" "golang.org/x/net/http2" "google.golang.org/protobuf/proto" ) type SyncClient struct { mgr *SyncManager syncConfigSnapshot syncConfigSnapshot localInstanceID string peer *v1.Multihost_Peer oplog *oplog.OpLog client v1syncconnect.BackrestSyncServiceClient reconnectDelay time.Duration l *zap.Logger reconnectAttempts int } func newInsecureClient() *http.Client { return &http.Client{ Transport: &http2.Transport{ AllowHTTP: true, DialTLS: func(network, addr string, _ *tls.Config) (net.Conn, error) { return net.Dial(network, addr) }, IdleConnTimeout: 300 * time.Second, ReadIdleTimeout: 60 * time.Second, }, } } func NewSyncClient( mgr *SyncManager, snapshot syncConfigSnapshot, peer *v1.Multihost_Peer, oplog *oplog.OpLog, ) (*SyncClient, error) { if peer.GetInstanceUrl() == "" { return nil, errors.New("peer instance URL is required") } client := v1syncconnect.NewBackrestSyncServiceClient( newInsecureClient(), peer.GetInstanceUrl(), ) c := &SyncClient{ mgr: mgr, syncConfigSnapshot: snapshot, localInstanceID: snapshot.config.Instance, peer: peer, reconnectDelay: mgr.syncClientRetryDelay, client: client, oplog: oplog, l: zap.L().Named(fmt.Sprintf("syncclient for %q", peer.GetInstanceId())), } c.mgr.peerStateManager.SetPeerState(peer.Keyid, newPeerState(peer.InstanceId, peer.Keyid)) return c, nil } func (c *SyncClient) RunSync(ctx context.Context) { for { if ctx.Err() != nil { return } lastConnect := time.Now() syncSessionHandler := newSyncHandlerClient( c.l, c.mgr, c.syncConfigSnapshot, c.oplog, c.peer, ) cmdStream := newBidiSyncCommandStream() c.l.Sugar().Infof("connecting to peer %q (%s) at %s", c.peer.InstanceId, c.peer.Keyid, c.peer.GetInstanceUrl()) var wg sync.WaitGroup wg.Add(1) go func() { defer wg.Done() err := runSync( ctx, c.localInstanceID, c.syncConfigSnapshot.identityKey, cmdStream, syncSessionHandler, c.syncConfigSnapshot.config.GetMultihost().GetKnownHosts(), ) cmdStream.SendErrorAndTerminate(err) }() if err := cmdStream.ConnectStream(ctx, c.client.Sync(ctx)); err != nil { c.l.Sugar().Infof("lost stream connection to peer %q (%s): %v", c.peer.InstanceId, c.peer.Keyid, err) var syncErr *SyncError state := c.mgr.peerStateManager.GetPeerState(c.peer.Keyid).Clone() if state == nil { state = newPeerState(c.peer.InstanceId, c.peer.Keyid) } state.LastHeartbeat = time.Now() if errors.As(err, &syncErr) { state.ConnectionState = syncErr.State state.ConnectionStateMessage = syncErr.Message.Error() } else { state.ConnectionState = v1sync.ConnectionState_CONNECTION_STATE_ERROR_INTERNAL state.ConnectionStateMessage = err.Error() } c.mgr.peerStateManager.SetPeerState(c.peer.Keyid, state) } else { c.reconnectAttempts = 0 } wg.Wait() delay := c.reconnectDelay - time.Since(lastConnect) if c.reconnectAttempts > 0 { backoff := time.Duration(1< highOpid { newOps = append(newOps, op) } else { updatedOps = append(updatedOps, op) } } // send new and updated operations if len(newOps) > 0 { stream.Send(&v1sync.SyncStreamItem{ Action: &v1sync.SyncStreamItem_ReceiveOperations{ ReceiveOperations: &v1sync.SyncStreamItem_SyncActionReceiveOperations{ Event: &v1.OperationEvent{ Event: &v1.OperationEvent_CreatedOperations{ CreatedOperations: &v1.OperationList{Operations: batch}, }, }, }, }, }) } if len(updatedOps) > 0 { stream.Send(&v1sync.SyncStreamItem{ Action: &v1sync.SyncStreamItem_ReceiveOperations{ ReceiveOperations: &v1sync.SyncStreamItem_SyncActionReceiveOperations{ Event: &v1.OperationEvent{ Event: &v1.OperationEvent_UpdatedOperations{ UpdatedOperations: &v1.OperationList{Operations: updatedOps}, }, }, }, }, }) } batch = batch[:0] return nil } c.oplog.Query(oplog.Query{}.SetModnoGte(highModno), func(op *v1.Operation) error { if !c.canForwardOperation(op) { return nil // skip operations that the peer is not allowed to read } batch = append(batch, op) if len(batch) >= 256 { if err := send(); err != nil { return err } } return nil }) if err := send(); err != nil { return err } return nil } func (c *syncSessionHandlerClient) HandleReceiveOperations(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveOperations) error { return NewSyncErrorProtocol(errors.New("client should not receive ReceiveOperations messages, this is a host-only message")) } func (c *syncSessionHandlerClient) HandleReceiveResources(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveResources) error { c.l.Debug("received resource list from server", zap.Any("repos", item.GetRepos()), zap.Any("plans", item.GetPlans())) peerState := c.mgr.peerStateManager.GetPeerState(c.peer.Keyid).Clone() if peerState == nil { return NewSyncErrorInternal(fmt.Errorf("peer state for %q not found", c.peer.Keyid)) } repos := item.GetRepos() plans := item.GetPlans() for _, repo := range repos { peerState.KnownRepos[repo.Id] = repo } for _, plan := range plans { peerState.KnownPlans[plan.Id] = plan } c.mgr.peerStateManager.SetPeerState(c.peer.Keyid, peerState) return nil } // Note unused: there isn't a situation where the host would send its config for information, the host will only call 'SetConfig' to update the config. func (c *syncSessionHandlerClient) HandleReceiveConfig(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveConfig) error { c.l.Sugar().Debugf("received remote config update") peerState := c.mgr.peerStateManager.GetPeerState(c.peer.Keyid).Clone() if peerState == nil { return NewSyncErrorInternal(fmt.Errorf("peer state for %q not found", c.peer.Keyid)) } newRemoteConfig := item.Config if newRemoteConfig == nil { return NewSyncErrorProtocol(fmt.Errorf("received nil remote config")) } peerState.Config = newRemoteConfig c.mgr.peerStateManager.SetPeerState(c.peer.Keyid, peerState) return nil } func (c *syncSessionHandlerClient) HandleSetConfig(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionSetConfig) error { // Log the received config updates c.l.Sugar().Debugf("received SetConfig request from peer %q") // Fetch latest config from the config manager latestConfig, err := c.mgr.configMgr.Get() if err != nil { return fmt.Errorf("fetch latest config: %w", err) } latestConfig = proto.Clone(latestConfig).(*v1.Config) // Clone to avoid modifying the original config for _, plan := range item.GetPlans() { c.l.Sugar().Debugf("received plan update: %s", plan.Id) if !c.permissions.CheckPermissionForPlan(plan.Id, permissions.PermsCanWriteConfiguration...) { return NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to update plan %q", c.peer.InstanceId, plan.Id)) } // Update the plan in the local config idx := slices.IndexFunc(latestConfig.Plans, func(p *v1.Plan) bool { return p.Id == plan.Id }) if idx >= 0 { latestConfig.Plans[idx] = plan } else { latestConfig.Plans = append(latestConfig.Plans, plan) } } for _, repo := range item.GetRepos() { c.l.Sugar().Debugf("received repo update: %s", repo.Guid) if !c.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanWriteConfiguration...) { return NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to update repo %q", c.peer.InstanceId, repo.Id)) } // Update the repo in the local config idx := slices.IndexFunc(latestConfig.Repos, func(r *v1.Repo) bool { return r.Guid == repo.Guid }) if idx >= 0 { latestConfig.Repos[idx] = repo } else { latestConfig.Repos = append(latestConfig.Repos, repo) } } for _, plan := range item.GetPlansToDelete() { c.l.Sugar().Debugf("received plan deletion request: %s", plan) if !c.permissions.CheckPermissionForPlan(plan, permissions.PermsCanWriteConfiguration...) { return NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to delete plan %q", c.peer.InstanceId, plan)) } // Remove the plan from the local config idx := slices.IndexFunc(latestConfig.Plans, func(p *v1.Plan) bool { return p.Id == plan }) if idx >= 0 { latestConfig.Plans = append(latestConfig.Plans[:idx], latestConfig.Plans[idx+1:]...) } else { c.l.Sugar().Warnf("received plan deletion request for non-existent plan %q, ignoring", plan) } } for _, repoID := range item.GetReposToDelete() { c.l.Sugar().Debugf("received repo deletion request: %s", repoID) if !c.permissions.CheckPermissionForRepo(repoID, permissions.PermsCanWriteConfiguration...) { return NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to delete repo %q", c.peer.InstanceId, repoID)) } // Remove the repo from the local config idx := slices.IndexFunc(latestConfig.Repos, func(r *v1.Repo) bool { return r.Id == repoID }) if idx >= 0 { latestConfig.Repos = append(latestConfig.Repos[:idx], latestConfig.Repos[idx+1:]...) } else { c.l.Sugar().Warnf("received repo deletion request for non-existent repo %q, ignoring", repoID) } } // Update the local config with the new changes latestConfig.Modno++ if err := c.mgr.configMgr.Update(latestConfig); err != nil { return fmt.Errorf("set updated config: %w", err) } return nil } func (c *syncSessionHandlerClient) sendConfig(ctx context.Context, stream *bidiSyncCommandStream) error { localConfig := c.syncConfigSnapshot.config remoteConfig := &v1sync.RemoteConfig{ Version: localConfig.Version, Modno: localConfig.Modno, } for _, repo := range localConfig.Repos { if c.permissions.CheckPermissionForRepo(repo.Guid, permissions.PermsCanViewConfiguration...) { remoteConfig.Repos = append(remoteConfig.Repos, repo) } } for _, plan := range localConfig.Plans { if c.permissions.CheckPermissionForPlan(plan.Id, permissions.PermsCanViewConfiguration...) { remoteConfig.Plans = append(remoteConfig.Plans, plan) } } stream.Send(&v1sync.SyncStreamItem{ Action: &v1sync.SyncStreamItem_ReceiveConfig{ ReceiveConfig: &v1sync.SyncStreamItem_SyncActionReceiveConfig{ Config: remoteConfig, }, }, }) return nil } func (c *syncSessionHandlerClient) sendResourceList(ctx context.Context, stream *bidiSyncCommandStream) error { repoMetadatas := []*v1sync.RepoMetadata{} planMetadatas := []*v1sync.PlanMetadata{} for _, repo := range c.syncConfigSnapshot.config.Repos { if c.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanViewResources...) { repoMetadatas = append(repoMetadatas, &v1sync.RepoMetadata{ Id: repo.Id, Guid: repo.Guid, }) } } for _, plan := range c.syncConfigSnapshot.config.Plans { if c.permissions.CheckPermissionForPlan(plan.Id, permissions.PermsCanViewResources...) { planMetadatas = append(planMetadatas, &v1sync.PlanMetadata{ Id: plan.Id, }) } } stream.Send(&v1sync.SyncStreamItem{ Action: &v1sync.SyncStreamItem_ReceiveResources{ ReceiveResources: &v1sync.SyncStreamItem_SyncActionReceiveResources{ Repos: repoMetadatas, Plans: planMetadatas, }, }, }) return nil }