From 5217855efceeb615c3633706d5033bf059a6445e Mon Sep 17 00:00:00 2001 From: Gareth George Date: Thu, 23 Apr 2026 05:31:00 +0000 Subject: [PATCH] remove sync api locks --- internal/api/syncapi/lockmanager.go | 69 -------- internal/api/syncapi/permissions/groups.go | 4 + .../api/syncapi/permissions/permissions.go | 16 +- internal/api/syncapi/syncclient.go | 122 ++----------- internal/api/syncapi/synccommon.go | 36 ---- internal/api/syncapi/syncmanager.go | 32 +--- internal/api/syncapi/syncserver.go | 54 ------ internal/hook/types/synclock.go | 160 ------------------ internal/orchestrator/orchestrator.go | 15 -- internal/orchestrator/tasks/hookvars.go | 4 - internal/protoutil/conditions.go | 1 - proto/v1/config.proto | 10 -- proto/v1sync/syncservice.proto | 26 --- webui/messages/en.json | 2 - webui/src/components/common/HooksFormList.tsx | 41 ----- 15 files changed, 24 insertions(+), 568 deletions(-) delete mode 100644 internal/api/syncapi/lockmanager.go delete mode 100644 internal/hook/types/synclock.go diff --git a/internal/api/syncapi/lockmanager.go b/internal/api/syncapi/lockmanager.go deleted file mode 100644 index a12d92bd..00000000 --- a/internal/api/syncapi/lockmanager.go +++ /dev/null @@ -1,69 +0,0 @@ -package syncapi - -import ( - "sync" - "time" -) - -const lockExpiry = 30 * time.Second - -type lockEntry struct { - holderID string - expiresAt time.Time -} - -// LockManager provides an in-memory best-effort lock store for coordinating -// repo access between sync peers. Locks expire after 30 seconds if not refreshed. -type LockManager struct { - mu sync.Mutex - locks map[string]*lockEntry -} - -func NewLockManager() *LockManager { - return &LockManager{ - locks: make(map[string]*lockEntry), - } -} - -// Acquire attempts to acquire a lock for the given key. Returns true if the -// lock was acquired (key was free, expired, or already held by the same holder). -func (lm *LockManager) Acquire(key, holderID string) bool { - lm.mu.Lock() - defer lm.mu.Unlock() - - entry, exists := lm.locks[key] - if !exists || time.Now().After(entry.expiresAt) || entry.holderID == holderID { - lm.locks[key] = &lockEntry{ - holderID: holderID, - expiresAt: time.Now().Add(lockExpiry), - } - return true - } - return false -} - -// Release releases a lock if the holder matches. -func (lm *LockManager) Release(key, holderID string) bool { - lm.mu.Lock() - defer lm.mu.Unlock() - - entry, exists := lm.locks[key] - if exists && entry.holderID == holderID { - delete(lm.locks, key) - return true - } - return false -} - -// Refresh extends the expiry of a lock if the holder matches. -func (lm *LockManager) Refresh(key, holderID string) bool { - lm.mu.Lock() - defer lm.mu.Unlock() - - entry, exists := lm.locks[key] - if exists && entry.holderID == holderID { - entry.expiresAt = time.Now().Add(lockExpiry) - return true - } - return false -} diff --git a/internal/api/syncapi/permissions/groups.go b/internal/api/syncapi/permissions/groups.go index 4f245dd2..5f708e2a 100644 --- a/internal/api/syncapi/permissions/groups.go +++ b/internal/api/syncapi/permissions/groups.go @@ -23,4 +23,8 @@ var ( PermsCanViewOperations = []v1.Multihost_Permission_Type{ v1.Multihost_Permission_PERMISSION_READ_OPERATIONS, } + + PermsCanReceiveSharedRepos = []v1.Multihost_Permission_Type{ + v1.Multihost_Permission_PERMISSION_RECEIVE_SHARED_REPOS, + } ) diff --git a/internal/api/syncapi/permissions/permissions.go b/internal/api/syncapi/permissions/permissions.go index 132a804e..e58990c2 100644 --- a/internal/api/syncapi/permissions/permissions.go +++ b/internal/api/syncapi/permissions/permissions.go @@ -129,14 +129,16 @@ func NewPermissionSet(perms []*v1.Multihost_Permission) (*PermissionSet, error) return permSet, nil } -// HasPermissionType checks if a permission type is granted, regardless of scopes. +// HasPermissionType checks if any of the given permission types are granted, regardless of scopes. // Use this for scope-less permissions like PERMISSION_RECEIVE_SHARED_REPOS. -func (p *PermissionSet) HasPermissionType(permType v1.Multihost_Permission_Type) bool { - if _, ok := p.scopelessPerms[permType]; ok { - return true - } - if _, ok := p.perms[permType]; ok { - return true +func (p *PermissionSet) HasPermissionType(permTypes ...v1.Multihost_Permission_Type) bool { + for _, permType := range permTypes { + if _, ok := p.scopelessPerms[permType]; ok { + return true + } + if _, ok := p.perms[permType]; ok { + return true + } } return false } diff --git a/internal/api/syncapi/syncclient.go b/internal/api/syncapi/syncclient.go index b383f897..e76dbd71 100644 --- a/internal/api/syncapi/syncclient.go +++ b/internal/api/syncapi/syncclient.go @@ -35,11 +35,6 @@ type SyncClient struct { l *zap.Logger reconnectAttempts int - - // Lock protocol support - streamMu sync.Mutex - activeStream *bidiSyncCommandStream // set while connected, nil otherwise - lockResponseCh chan *v1sync.SyncStreamItem_SyncActionAcquireLockResponse } func newInsecureClient() *http.Client { @@ -78,7 +73,6 @@ func NewSyncClient( client: client, oplog: oplog, l: zap.L().Named(fmt.Sprintf("syncclient for %q", peer.GetInstanceId())), - lockResponseCh: make(chan *v1sync.SyncStreamItem_SyncActionAcquireLockResponse, 1), } c.mgr.peerStateManager.SetPeerState(peer.Keyid, newPeerState(peer.InstanceId, peer.Keyid)) return c, nil @@ -98,15 +92,10 @@ func (c *SyncClient) RunSync(ctx context.Context) { c.syncConfigSnapshot, c.oplog, c.peer, - c, ) cmdStream := newBidiSyncCommandStream() - c.streamMu.Lock() - c.activeStream = cmdStream - c.streamMu.Unlock() - c.l.Sugar().Infof("connecting to peer %q (%s) at %s", c.peer.InstanceId, c.peer.Keyid, c.peer.GetInstanceUrl()) var wg sync.WaitGroup @@ -127,9 +116,6 @@ func (c *SyncClient) RunSync(ctx context.Context) { }() connectErr := cmdStream.ConnectStream(ctx, c.client.Sync(ctx)) - c.streamMu.Lock() - c.activeStream = nil - c.streamMu.Unlock() if connectErr != nil { c.l.Sugar().Infof("lost stream connection to peer %q (%s): %v", c.peer.InstanceId, c.peer.Keyid, connectErr) var syncErr *SyncError @@ -175,83 +161,6 @@ func (c *SyncClient) RunSync(ctx context.Context) { } } -// AcquireLock sends a lock acquire request and waits for the response. -// Returns true if the lock was acquired, false otherwise. -// Returns an error if the client is not connected or the context expires. -func (c *SyncClient) AcquireLock(ctx context.Context, lockKey string) (bool, error) { - c.streamMu.Lock() - stream := c.activeStream - c.streamMu.Unlock() - - if stream == nil { - return false, fmt.Errorf("not connected to peer %q", c.peer.GetInstanceId()) - } - - // Drain any stale responses - select { - case <-c.lockResponseCh: - default: - } - - stream.Send(&v1sync.SyncStreamItem{ - Action: &v1sync.SyncStreamItem_AcquireLock{ - AcquireLock: &v1sync.SyncStreamItem_SyncActionAcquireLock{ - LockKey: lockKey, - HolderId: c.localInstanceID, - }, - }, - }) - - select { - case resp := <-c.lockResponseCh: - return resp.GetAcquired(), nil - case <-ctx.Done(): - return false, ctx.Err() - case <-time.After(10 * time.Second): - return false, fmt.Errorf("timeout waiting for lock response from peer %q", c.peer.GetInstanceId()) - } -} - -// ReleaseLock sends a lock release request to the peer. -func (c *SyncClient) ReleaseLock(lockKey string) { - c.streamMu.Lock() - stream := c.activeStream - c.streamMu.Unlock() - - if stream == nil { - return - } - - stream.Send(&v1sync.SyncStreamItem{ - Action: &v1sync.SyncStreamItem_ReleaseLock{ - ReleaseLock: &v1sync.SyncStreamItem_SyncActionReleaseLock{ - LockKey: lockKey, - HolderId: c.localInstanceID, - }, - }, - }) -} - -// RefreshLock sends a lock refresh request to the peer. -func (c *SyncClient) RefreshLock(lockKey string) { - c.streamMu.Lock() - stream := c.activeStream - c.streamMu.Unlock() - - if stream == nil { - return - } - - stream.Send(&v1sync.SyncStreamItem{ - Action: &v1sync.SyncStreamItem_RefreshLock{ - RefreshLock: &v1sync.SyncStreamItem_SyncActionRefreshLock{ - LockKey: lockKey, - HolderId: c.localInstanceID, - }, - }, - }) -} - // syncSessionHandlerClient is a syncSessionHandler implementation for clients. type syncSessionHandlerClient struct { unimplementedSyncSessionHandler @@ -268,8 +177,6 @@ type syncSessionHandlerClient struct { canForwardReposSet map[string]struct{} canForwardPlansSet map[string]struct{} - syncClient *SyncClient // back-reference for forwarding lock responses - oplogSubscription *oplog.Subscription // set while subscribed; unsubscribed in OnConnectionDisconnected. } @@ -279,7 +186,6 @@ func newSyncHandlerClient( snapshot syncConfigSnapshot, oplog *oplog.OpLog, peer *v1.Multihost_Peer, // The peer this handler is associated with, must be set before calling OnConnectionEstablished. - syncClient *SyncClient, ) *syncSessionHandlerClient { return &syncSessionHandlerClient{ l: l, @@ -291,8 +197,6 @@ func newSyncHandlerClient( canForwardReposSet: make(map[string]struct{}), canForwardPlansSet: make(map[string]struct{}), - - syncClient: syncClient, } } @@ -625,14 +529,19 @@ func (c *syncSessionHandlerClient) HandleSetConfig(ctx context.Context, stream * var reposNew, reposUpdated, reposUnchanged, reposSkipped int for _, repo := range item.GetRepos() { - isFromOriginPeer := repo.GetOriginInstanceId() != "" && repo.GetOriginInstanceId() == c.peer.InstanceId - if !isFromOriginPeer && !c.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanWriteConfiguration...) { - return nil, NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to update repo %q", c.peer.InstanceId, repo.Id)) - } - idx := slices.IndexFunc(cfg.Repos, func(r *v1.Repo) bool { return r.Guid == repo.Guid }) + + // Permission check: accept if we have RECEIVE_SHARED_REPOS and the repo + // is either new or already owned by this peer; otherwise require scoped write perms. + isNewOrOwnedByPeer := idx < 0 || cfg.Repos[idx].GetOriginInstanceId() == c.peer.InstanceId + allowed := (isNewOrOwnedByPeer && c.permissions.HasPermissionType(permissions.PermsCanReceiveSharedRepos...)) || + c.permissions.CheckPermissionForRepo(repo.Id, permissions.PermsCanWriteConfiguration...) + if !allowed { + return nil, NewSyncErrorAuth(fmt.Errorf("peer %q is not allowed to update repo %q", c.peer.InstanceId, repo.Id)) + } + if idx >= 0 { if proto.Equal(cfg.Repos[idx], repo) { c.l.Sugar().Debugf("received repo %s (unchanged)", repo.Id) @@ -774,14 +683,3 @@ func (c *syncSessionHandlerClient) sendResourceList(ctx context.Context, stream return len(repoMetadatas), len(planMetadatas), nil } - -func (c *syncSessionHandlerClient) HandleAcquireLockResponse(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLockResponse) error { - if c.syncClient != nil { - select { - case c.syncClient.lockResponseCh <- item: - default: - c.l.Warn("lock response channel full, dropping response", zap.String("key", item.GetLockKey())) - } - } - return nil -} diff --git a/internal/api/syncapi/synccommon.go b/internal/api/syncapi/synccommon.go index 040d0925..30974f83 100644 --- a/internal/api/syncapi/synccommon.go +++ b/internal/api/syncapi/synccommon.go @@ -124,22 +124,6 @@ func runSync( if err := handler.HandleReceiveLogData(ctx, commandStream, item.GetReceiveLogData()); err != nil { return fmt.Errorf("handling receive log data: %w", err) } - case *v1sync.SyncStreamItem_AcquireLock: - if err := handler.HandleAcquireLock(ctx, commandStream, item.GetAcquireLock()); err != nil { - return fmt.Errorf("handling acquire lock: %w", err) - } - case *v1sync.SyncStreamItem_AcquireLockResponse: - if err := handler.HandleAcquireLockResponse(ctx, commandStream, item.GetAcquireLockResponse()); err != nil { - return fmt.Errorf("handling acquire lock response: %w", err) - } - case *v1sync.SyncStreamItem_ReleaseLock: - if err := handler.HandleReleaseLock(ctx, commandStream, item.GetReleaseLock()); err != nil { - return fmt.Errorf("handling release lock: %w", err) - } - case *v1sync.SyncStreamItem_RefreshLock: - if err := handler.HandleRefreshLock(ctx, commandStream, item.GetRefreshLock()); err != nil { - return fmt.Errorf("handling refresh lock: %w", err) - } case *v1sync.SyncStreamItem_Throttle: if err := handler.HandleThrottle(ctx, commandStream, item.GetThrottle()); err != nil { return fmt.Errorf("handling throttle: %w", err) @@ -259,10 +243,6 @@ type syncSessionHandler interface { HandleRequestLog(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionRequestLog) error HandleReceiveLogData(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReceiveLogData) error HandleThrottle(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionThrottle) error - HandleAcquireLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLock) error - HandleAcquireLockResponse(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLockResponse) error - HandleReleaseLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReleaseLock) error - HandleRefreshLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionRefreshLock) error } type unimplementedSyncSessionHandler struct{} @@ -319,22 +299,6 @@ func (h *unimplementedSyncSessionHandler) HandleThrottle(ctx context.Context, st return NewSyncErrorProtocol(fmt.Errorf("HandleThrottle not implemented")) } -func (h *unimplementedSyncSessionHandler) HandleAcquireLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLock) error { - return nil // default: ignore lock requests -} - -func (h *unimplementedSyncSessionHandler) HandleAcquireLockResponse(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLockResponse) error { - return nil // default: ignore lock responses -} - -func (h *unimplementedSyncSessionHandler) HandleReleaseLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReleaseLock) error { - return nil // default: ignore lock releases -} - -func (h *unimplementedSyncSessionHandler) HandleRefreshLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionRefreshLock) error { - return nil // default: ignore lock refreshes -} - type remoteOpIdCacheKey struct { OriginalInstanceKeyid unique.Handle[string] ID int64 diff --git a/internal/api/syncapi/syncmanager.go b/internal/api/syncapi/syncmanager.go index a72aed5a..47b594d1 100644 --- a/internal/api/syncapi/syncmanager.go +++ b/internal/api/syncapi/syncmanager.go @@ -14,7 +14,6 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/cryptoutil" "github.com/garethgeorge/backrest/internal/oplog" - hooktypes "github.com/garethgeorge/backrest/internal/hook/types" "github.com/garethgeorge/backrest/internal/orchestrator" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -56,8 +55,6 @@ type SyncManager struct { connectedPeers map[string]*connectedPeerHandle peerStateManager PeerStateManager - - lockManager *LockManager } func NewSyncManager(configMgr *config.ConfigManager, oplog *oplog.OpLog, orchestrator *orchestrator.Orchestrator, peerStateManager PeerStateManager) *SyncManager { @@ -85,7 +82,7 @@ func NewSyncManager(configMgr *config.ConfigManager, oplog *oplog.OpLog, orchest } else { zap.S().Errorf("syncmanager failed to get initial config: %v", err) } - mgr := &SyncManager{ + return &SyncManager{ configMgr: configMgr, orchestrator: orchestrator, oplog: oplog, @@ -95,10 +92,7 @@ func NewSyncManager(configMgr *config.ConfigManager, oplog *oplog.OpLog, orchest connectedPeers: make(map[string]*connectedPeerHandle), peerStateManager: peerStateManager, - lockManager: NewLockManager(), } - hooktypes.SetSyncLockClientProvider(mgr) - return mgr } // GetSyncClients returns a copy of the sync clients map. This makes the map safe to read from concurrently. @@ -108,30 +102,6 @@ func (m *SyncManager) GetSyncClients() map[string]*SyncClient { return maps.Clone(m.syncClients) } -// GetSyncClient returns the sync client for the given instance ID, or nil if not found. -// The map is keyed by Keyid internally (unique and stable), but callers typically only know -// the user-facing InstanceId, so this scans the small set of active clients. -func (m *SyncManager) GetSyncClient(instanceID string) *SyncClient { - m.mu.Lock() - defer m.mu.Unlock() - for _, client := range m.syncClients { - if client.peer.GetInstanceId() == instanceID { - return client - } - } - return nil -} - -// GetSyncLockClient returns a SyncLockClient for the given instance ID, satisfying the -// types.SyncLockClientProvider interface for the synclock hook handler. -func (m *SyncManager) GetSyncLockClient(instanceID string) hooktypes.SyncLockClient { - client := m.GetSyncClient(instanceID) - if client == nil { - return nil - } - return client -} - // Note: top level function will be called holding the lock, must kick off goroutines and then return. func (m *SyncManager) RunSync(ctx context.Context) { var syncWg sync.WaitGroup diff --git a/internal/api/syncapi/syncserver.go b/internal/api/syncapi/syncserver.go index 2c44e109..c4f6e795 100644 --- a/internal/api/syncapi/syncserver.go +++ b/internal/api/syncapi/syncserver.go @@ -233,32 +233,6 @@ func (h *syncSessionHandlerServer) OnConnectionDisconnected() { } } -func (h *syncSessionHandlerServer) HandleAcquireLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionAcquireLock) error { - acquired := h.mgr.lockManager.Acquire(item.GetLockKey(), item.GetHolderId()) - h.l.Debug("lock acquire request", zap.String("key", item.GetLockKey()), zap.String("holder", item.GetHolderId()), zap.Bool("acquired", acquired)) - stream.Send(&v1sync.SyncStreamItem{ - Action: &v1sync.SyncStreamItem_AcquireLockResponse{ - AcquireLockResponse: &v1sync.SyncStreamItem_SyncActionAcquireLockResponse{ - Acquired: acquired, - LockKey: item.GetLockKey(), - }, - }, - }) - return nil -} - -func (h *syncSessionHandlerServer) HandleReleaseLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionReleaseLock) error { - released := h.mgr.lockManager.Release(item.GetLockKey(), item.GetHolderId()) - h.l.Debug("lock release request", zap.String("key", item.GetLockKey()), zap.String("holder", item.GetHolderId()), zap.Bool("released", released)) - return nil -} - -func (h *syncSessionHandlerServer) HandleRefreshLock(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionRefreshLock) error { - refreshed := h.mgr.lockManager.Refresh(item.GetLockKey(), item.GetHolderId()) - h.l.Debug("lock refresh request", zap.String("key", item.GetLockKey()), zap.String("holder", item.GetHolderId()), zap.Bool("refreshed", refreshed)) - return nil -} - func (h *syncSessionHandlerServer) HandleHeartbeat(ctx context.Context, stream *bidiSyncCommandStream, item *v1sync.SyncStreamItem_SyncActionHeartbeat) error { peerState := h.mgr.peerStateManager.GetPeerState(h.peer.Keyid).Clone() if peerState == nil { @@ -405,39 +379,11 @@ func (h *syncSessionHandlerServer) sendConfigToClient(stream *bidiSyncCommandStr // This pushes repo configurations to the client so they are added to the client's local config. // Returns the number of shared repos sent. func (h *syncSessionHandlerServer) sendSharedReposToClient(stream *bidiSyncCommandStream, config *v1.Config) int { - if !h.permissions.HasPermissionType(v1.Multihost_Permission_PERMISSION_RECEIVE_SHARED_REPOS) { - return 0 - } - var sharedRepos []*v1.Repo for _, repo := range config.Repos { if repo.GetShared() { repoCopy := proto.Clone(repo).(*v1.Repo) repoCopy.OriginInstanceId = config.Instance - // Inject lock hooks so the client acquires a lock on this server - // before running operations on the shared repo. - repoCopy.Hooks = append(repoCopy.Hooks, - &v1.Hook{ - Conditions: []v1.Hook_Condition{v1.Hook_CONDITION_ANY_START}, - OnError: v1.Hook_ON_ERROR_IGNORE, - Action: &v1.Hook_ActionSyncLock{ - ActionSyncLock: &v1.Hook_SyncLock{ - TargetInstanceId: config.Instance, - LockKey: repo.GetId(), - }, - }, - }, - &v1.Hook{ - Conditions: []v1.Hook_Condition{v1.Hook_CONDITION_ANY_END}, - OnError: v1.Hook_ON_ERROR_IGNORE, - Action: &v1.Hook_ActionSyncLock{ - ActionSyncLock: &v1.Hook_SyncLock{ - TargetInstanceId: config.Instance, - LockKey: repo.GetId(), - }, - }, - }, - ) sharedRepos = append(sharedRepos, repoCopy) } } diff --git a/internal/hook/types/synclock.go b/internal/hook/types/synclock.go deleted file mode 100644 index 4b704c20..00000000 --- a/internal/hook/types/synclock.go +++ /dev/null @@ -1,160 +0,0 @@ -package types - -import ( - "context" - "fmt" - "reflect" - "sync" - "time" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" - "github.com/garethgeorge/backrest/internal/orchestrator/tasks" - "go.uber.org/zap" -) - -// SyncLockClient is an interface for acquiring and releasing locks on remote peers. -// This is implemented by syncapi.SyncClient. -type SyncLockClient interface { - AcquireLock(ctx context.Context, lockKey string) (bool, error) - ReleaseLock(lockKey string) - RefreshLock(lockKey string) -} - -// SyncLockClientProvider provides SyncLockClients by target instance ID. -type SyncLockClientProvider interface { - GetSyncLockClient(instanceID string) SyncLockClient -} - -var ( - syncLockProviderMu sync.Mutex - syncLockProvider SyncLockClientProvider -) - -// SetSyncLockClientProvider registers the provider used by the synclock hook handler. -func SetSyncLockClientProvider(provider SyncLockClientProvider) { - syncLockProviderMu.Lock() - defer syncLockProviderMu.Unlock() - syncLockProvider = provider -} - -func getSyncLockClientProvider() SyncLockClientProvider { - syncLockProviderMu.Lock() - defer syncLockProviderMu.Unlock() - return syncLockProvider -} - -const ( - lockRefreshInterval = 10 * time.Second - lockMaxRetryDelay = 60 * time.Second - lockInitialRetryDelay = 1 * time.Second - lockMaxRetries = 7 // 1s, 2s, 4s, 8s, 16s, 32s, 60s -) - -type syncLockHandler struct{} - -func (syncLockHandler) Name() string { - return "synclock" -} - -func (syncLockHandler) ActionType() reflect.Type { - return reflect.TypeOf(&v1.Hook_ActionSyncLock{}) -} - -func (h syncLockHandler) Execute(ctx context.Context, hook *v1.Hook, vars interface{}, runner tasks.TaskRunner, event v1.Hook_Condition) error { - lockConfig := hook.GetActionSyncLock() - if lockConfig == nil { - return fmt.Errorf("synclock hook missing action config") - } - - provider := getSyncLockClientProvider() - if provider == nil { - zap.L().Warn("synclock: no provider registered, skipping lock operation") - return nil - } - - client := provider.GetSyncLockClient(lockConfig.GetTargetInstanceId()) - if client == nil { - zap.L().Warn("synclock: no client for target instance, skipping lock operation", - zap.String("targetInstance", lockConfig.GetTargetInstanceId())) - return nil - } - - switch event { - case v1.Hook_CONDITION_ANY_START: - return h.acquireLock(ctx, client, lockConfig) - case v1.Hook_CONDITION_ANY_END: - h.releaseLock(client, lockConfig) - return nil - default: - return nil - } -} - -func (h syncLockHandler) acquireLock(ctx context.Context, client SyncLockClient, config *v1.Hook_SyncLock) error { - lockKey := config.GetLockKey() - delay := lockInitialRetryDelay - - for attempt := 0; attempt <= lockMaxRetries; attempt++ { - acquired, err := client.AcquireLock(ctx, lockKey) - if err != nil { - zap.L().Warn("synclock: error acquiring lock, proceeding without lock (best-effort)", - zap.String("lockKey", lockKey), zap.Error(err)) - return nil // best-effort: proceed without lock - } - if acquired { - zap.L().Info("synclock: acquired lock", zap.String("lockKey", lockKey)) - // Start refresh goroutine - go h.refreshLoop(ctx, client, lockKey) - return nil - } - - if attempt == lockMaxRetries { - break - } - - zap.L().Info("synclock: lock not acquired, retrying", - zap.String("lockKey", lockKey), - zap.Int("attempt", attempt+1), - zap.Duration("delay", delay)) - - select { - case <-time.After(delay): - case <-ctx.Done(): - zap.L().Warn("synclock: context cancelled while waiting for lock, proceeding without lock", - zap.String("lockKey", lockKey)) - return nil - } - - delay *= 2 - if delay > lockMaxRetryDelay { - delay = lockMaxRetryDelay - } - } - - zap.L().Warn("synclock: could not acquire lock after retries, proceeding without lock (best-effort)", - zap.String("lockKey", lockKey)) - return nil // best-effort: default open -} - -func (h syncLockHandler) releaseLock(client SyncLockClient, config *v1.Hook_SyncLock) { - client.ReleaseLock(config.GetLockKey()) - zap.L().Info("synclock: released lock", zap.String("lockKey", config.GetLockKey())) -} - -func (h syncLockHandler) refreshLoop(ctx context.Context, client SyncLockClient, lockKey string) { - ticker := time.NewTicker(lockRefreshInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - client.RefreshLock(lockKey) - case <-ctx.Done(): - return - } - } -} - -func init() { - DefaultRegistry().RegisterHandler(&syncLockHandler{}) -} diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index c1764909..5836e9e1 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -652,23 +652,8 @@ func (o *Orchestrator) cleanupTaskContext(ctx context.Context, op *v1.Operation, func (o *Orchestrator) executeTask(ctx context.Context, st tasks.ScheduledTask) error { start := time.Now() runner := newTaskRunnerImpl(o, st.Task, st.Op) - - // Execute ANY_START hooks (best-effort, errors are logged but don't block) - if err := runner.ExecuteHooks(ctx, []v1.Hook_Condition{v1.Hook_CONDITION_ANY_START}, tasks.HookVars{}); err != nil { - runner.Logger(ctx).Warn("ANY_START hook error (best-effort, continuing)", zap.Error(err)) - } - err := st.Task.Run(ctx, st, runner) - // Execute ANY_END hooks (best-effort) - endVars := tasks.HookVars{} - if err != nil { - endVars.Error = err.Error() - } - if endErr := runner.ExecuteHooks(ctx, []v1.Hook_Condition{v1.Hook_CONDITION_ANY_END}, endVars); endErr != nil { - runner.Logger(ctx).Warn("ANY_END hook error (best-effort, continuing)", zap.Error(endErr)) - } - // Record metrics based on task result if err != nil { runner.Logger(ctx).Error("task failed", zap.Error(err), zap.Duration("duration", time.Since(start))) diff --git a/internal/orchestrator/tasks/hookvars.go b/internal/orchestrator/tasks/hookvars.go index e55ec05f..781809cc 100644 --- a/internal/orchestrator/tasks/hookvars.go +++ b/internal/orchestrator/tasks/hookvars.go @@ -61,10 +61,6 @@ func (v HookVars) EventName(cond v1.Hook_Condition) string { return "forget error" case v1.Hook_CONDITION_FORGET_SUCCESS: return "forget success" - case v1.Hook_CONDITION_ANY_START: - return "any start" - case v1.Hook_CONDITION_ANY_END: - return "any end" default: return "unknown" } diff --git a/internal/protoutil/conditions.go b/internal/protoutil/conditions.go index 171aa9b1..8c6aa6e3 100644 --- a/internal/protoutil/conditions.go +++ b/internal/protoutil/conditions.go @@ -9,7 +9,6 @@ var startConditionsMap = map[v1.Hook_Condition]bool{ v1.Hook_CONDITION_PRUNE_START: true, v1.Hook_CONDITION_SNAPSHOT_START: true, v1.Hook_CONDITION_FORGET_START: true, - v1.Hook_CONDITION_ANY_START: true, } var errorConditionsMap = map[v1.Hook_Condition]bool{ diff --git a/proto/v1/config.proto b/proto/v1/config.proto index 69c391fc..b68ad937 100644 --- a/proto/v1/config.proto +++ b/proto/v1/config.proto @@ -202,10 +202,6 @@ message Hook { CONDITION_FORGET_START = 300; // forget started. CONDITION_FORGET_ERROR = 301; // forget failed. CONDITION_FORGET_SUCCESS = 302; // forget succeeded. - - // any operation conditions - CONDITION_ANY_START = 400; // before any operation starts. - CONDITION_ANY_END = 401; // after any operation ends (success or fail). } enum OnError { @@ -229,7 +225,6 @@ message Hook { Shoutrrr action_shoutrrr = 105 [json_name="actionShoutrrr"]; Healthchecks action_healthchecks = 106 [json_name="actionHealthchecks"]; Telegram action_telegram = 107 [json_name="actionTelegram"]; - SyncLock action_sync_lock = 109 [json_name="actionSyncLock"]; } message Command { @@ -280,11 +275,6 @@ message Hook { string chat_id = 2 [json_name="chatId"]; string template = 3 [json_name="template"]; // template for the message text. } - - message SyncLock { - string target_instance_id = 1 [json_name="targetInstanceId"]; // the instance ID of the peer to acquire the lock on. - string lock_key = 2 [json_name="lockKey"]; // the lock key, typically the repo ID. - } } message Auth { diff --git a/proto/v1sync/syncservice.proto b/proto/v1sync/syncservice.proto index 6886c801..00c73437 100644 --- a/proto/v1sync/syncservice.proto +++ b/proto/v1sync/syncservice.proto @@ -134,11 +134,6 @@ message SyncStreamItem { SyncActionRequestLog request_log = 30; SyncActionReceiveLogData receive_log_data = 31; - SyncActionAcquireLock acquire_lock = 40; - SyncActionAcquireLockResponse acquire_lock_response = 41; - SyncActionReleaseLock release_lock = 42; - SyncActionRefreshLock refresh_lock = 43; - SyncActionThrottle throttle = 1000; } @@ -222,25 +217,4 @@ message SyncStreamItem { // See https://pkg.go.dev/crypto/ecdh#PrivateKey.ECDH . string ecdsa_pub = 2 [json_name="ecdsaPub"]; // base64 encoded public key } - - // Lock protocol messages for coordinating repo access between peers. - message SyncActionAcquireLock { - string lock_key = 1; // the lock key, typically a repo ID. - string holder_id = 2; // the instance ID of the lock requester. - } - - message SyncActionAcquireLockResponse { - bool acquired = 1; // whether the lock was acquired. - string lock_key = 2; // the lock key that was requested. - } - - message SyncActionReleaseLock { - string lock_key = 1; // the lock key to release. - string holder_id = 2; // the instance ID of the lock holder. - } - - message SyncActionRefreshLock { - string lock_key = 1; // the lock key to refresh. - string holder_id = 2; // the instance ID of the lock holder. - } } diff --git a/webui/messages/en.json b/webui/messages/en.json index 0c0a8818..b59e1ef5 100644 --- a/webui/messages/en.json +++ b/webui/messages/en.json @@ -185,8 +185,6 @@ "repo_hooks_command_runs_condition_forget_success": "Triggered when a forget operation completes successfully", "repo_hooks_command_runs_condition_forget_error": "Triggered when a forget operation fails", "repo_hooks_command_runs_condition_any_error": "Triggered when any operation fails", - "repo_hooks_command_runs_condition_any_start": "Triggered before any operation starts", - "repo_hooks_command_runs_condition_any_end": "Triggered after any operation ends (success or fail)", "repo_hooks_command_runs_condition_unknown": "Triggered when unknown", "settings_modal_title": "Settings", "app_breadcrumb_repo": "Repo", diff --git a/webui/src/components/common/HooksFormList.tsx b/webui/src/components/common/HooksFormList.tsx index 302677a6..e44e9f12 100644 --- a/webui/src/components/common/HooksFormList.tsx +++ b/webui/src/components/common/HooksFormList.tsx @@ -42,7 +42,6 @@ export interface HookFields { actionShoutrrr?: any; actionHealthchecks?: any; actionTelegram?: any; - actionSyncLock?: any; } export const hooksListTooltipText = ( @@ -84,8 +83,6 @@ const hookConditionDescriptions: Record = { CONDITION_FORGET_SUCCESS: m.repo_hooks_command_runs_condition_forget_success(), CONDITION_FORGET_ERROR: m.repo_hooks_command_runs_condition_forget_error(), CONDITION_ANY_ERROR: m.repo_hooks_command_runs_condition_any_error(), - CONDITION_ANY_START: m.repo_hooks_command_runs_condition_any_start(), - CONDITION_ANY_END: m.repo_hooks_command_runs_condition_any_end(), CONDITION_UNKNOWN: m.repo_hooks_command_runs_condition_unknown(), }; @@ -557,44 +554,6 @@ const hookTypes: { ); }, }, - { - name: "Sync Lock", - template: { - actionSyncLock: { - targetInstanceId: "", - lockKey: "", - }, - conditions: ["CONDITION_ANY_START", "CONDITION_ANY_END"], - }, - oneofKey: "actionSyncLock", - component: ({ hook, onChange }) => { - const updateSyncLock = (field: string, val: string) => { - onChange({ - ...hook, - actionSyncLock: { ...hook.actionSyncLock, [field]: val }, - }); - }; - return ( - - - updateSyncLock("targetInstanceId", e.target.value) - } - size="sm" - /> - updateSyncLock("lockKey", e.target.value)} - size="sm" - /> - - - ); - }, - }, ]; const findHookTypeName = (field: HookFields): string => {