fix modno Set issues

This commit is contained in:
Gareth
2025-11-05 22:47:54 -08:00
parent ba180ea8c9
commit aa2bdfec5c
6 changed files with 653 additions and 6 deletions
+183
View File
@@ -544,6 +544,189 @@ func TestSyncMutations(t *testing.T) {
wg.Wait()
}
func TestMultistageRandomSync(t *testing.T) {
testutil.InstallZapLogger(t)
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
peerHostAddr := testutil.AllocOpenBindAddr(t)
peerClientAddr := testutil.AllocOpenBindAddr(t)
peerHostConfig := &v1.Config{
Version: migrations.CurrentVersion,
Instance: defaultHostID,
Repos: []*v1.Repo{
{
Id: defaultRepoID,
Guid: defaultRepoGUID,
Uri: "test-uri",
},
},
Multihost: &v1.Multihost{
Identity: identity1,
AuthorizedClients: []*v1.Multihost_Peer{
{
Keyid: identity2.Keyid,
KeyidVerified: true,
InstanceId: defaultClientID,
},
},
},
}
peerClientConfig := &v1.Config{
Version: migrations.CurrentVersion,
Instance: defaultClientID,
Repos: []*v1.Repo{
{
Id: defaultRepoID,
Guid: defaultRepoGUID,
Uri: "backrest://" + defaultHostID,
},
},
Multihost: &v1.Multihost{
Identity: identity2,
KnownHosts: []*v1.Multihost_Peer{
{
Keyid: identity1.Keyid,
InstanceId: defaultHostID,
InstanceUrl: fmt.Sprintf("http://%s", peerHostAddr),
Permissions: []*v1.Multihost_Permission{
{
Type: v1.Multihost_Permission_PERMISSION_READ_OPERATIONS,
Scopes: []string{
"repo:" + defaultRepoID,
},
},
},
},
},
},
}
peerHost := newPeerUnderTest(t, peerHostConfig)
peerClient := newPeerUnderTest(t, peerClientConfig)
// Track all operations added to client to verify sync at the end
var clientOps []*v1.Operation
var mu sync.Mutex
// Start with the sync APIs running
var syncCtx context.Context
var cancelSyncCtx context.CancelFunc
var wg sync.WaitGroup
isConnected := false
startSyncAPIs := func() {
syncCtx, cancelSyncCtx = context.WithCancel(ctx)
wg.Add(2)
go func() {
defer wg.Done()
runSyncAPIWithCtx(syncCtx, peerHost, peerHostAddr)
}()
go func() {
defer wg.Done()
runSyncAPIWithCtx(syncCtx, peerClient, peerClientAddr)
}()
isConnected = true
}
stopSyncAPIs := func() {
cancelSyncCtx()
wg.Wait()
isConnected = false
}
// Start initially connected
startSyncAPIs()
tryConnect(t, ctx, peerClient, peerClientConfig.Multihost.KnownHosts[0])
// Give the sync some time to fully establish
time.Sleep(100 * time.Millisecond)
// Run 50 random steps
for step := 0; step < 50; step++ {
action := step % 3 // Cycle through actions deterministically for reproducibility
// In a real random test, you could use: action := rand.Intn(3)
switch action {
case 0: // Add an operation
op := testutil.OperationsWithDefaults(basicClientOperationTempl, []*v1.Operation{
{
DisplayMessage: fmt.Sprintf("multistage-op-%d", step),
},
})[0]
if err := peerClient.oplog.Add(op); err != nil {
t.Fatalf("step %d: failed to add operation: %v", step, err)
}
mu.Lock()
clientOps = append(clientOps, op)
mu.Unlock()
zap.S().Infof("step %d: added operation %q", step, op.DisplayMessage)
case 1: // Mutate a random existing operation
mu.Lock()
if len(clientOps) > 0 {
// Pick a random operation to mutate
idx := step % len(clientOps)
op := clientOps[idx]
op.DisplayMessage = fmt.Sprintf("mutated-at-step-%d", step)
if err := peerClient.oplog.Update(op); err != nil {
mu.Unlock()
t.Fatalf("step %d: failed to update operation: %v", step, err)
}
zap.S().Infof("step %d: mutated operation %d to %q", step, idx, op.DisplayMessage)
} else {
zap.S().Infof("step %d: skipped mutation (no operations exist)", step)
}
mu.Unlock()
case 2: // Randomly disconnect or reconnect
if isConnected {
zap.S().Infof("step %d: disconnecting client", step)
stopSyncAPIs()
// Wait a bit to ensure shutdown completes
time.Sleep(100 * time.Millisecond)
} else {
zap.S().Infof("step %d: reconnecting client", step)
startSyncAPIs()
tryConnect(t, ctx, peerClient, peerClientConfig.Multihost.KnownHosts[0])
}
}
// Small delay between steps to allow sync to process
time.Sleep(10 * time.Millisecond)
}
// Ensure we're connected at the end
if !isConnected {
zap.S().Info("final connection: reconnecting client")
startSyncAPIs()
tryConnect(t, ctx, peerClient, peerClientConfig.Multihost.KnownHosts[0])
}
// Give sync some time to complete
time.Sleep(500 * time.Millisecond)
// Assert that all operations are synced correctly
mu.Lock()
expectedOps := make([]*v1.Operation, len(clientOps))
copy(expectedOps, clientOps)
mu.Unlock()
zap.S().Infof("verifying sync of %d operations", len(expectedOps))
tryExpectOperationsSynced(t, ctx, peerHost, peerClient,
oplog.Query{}.SetInstanceID(defaultClientID).SetRepoGUID(defaultRepoGUID),
"host and client should be synced after multistage test")
// Clean up
cancelSyncCtx()
wg.Wait()
}
func getOperations(t *testing.T, oplog *oplog.OpLog, query oplog.Query) []*v1.Operation {
ops := []*v1.Operation{}
if err := oplog.Query(query, func(op *v1.Operation) error {
+2 -3
View File
@@ -270,13 +270,12 @@ func (h *syncSessionHandlerServer) insertOrUpdate(op *v1.Operation, isUpdate boo
if isUpdate {
h.l.Sugar().Warnf("received update for non-existent operation %+v, inserting instead", op)
}
op.Modno = 0
return h.mgr.oplog.Add(op)
return h.mgr.oplog.Set(oplog.OPERATION_ADDED, op)
} else {
if !isUpdate {
h.l.Sugar().Warnf("received insert for existing operation %+v, updating instead", op)
}
return h.mgr.oplog.Update(op)
return h.mgr.oplog.Set(oplog.OPERATION_UPDATED, op)
}
}
+23
View File
@@ -150,6 +150,29 @@ func (m *MemStore) Add(op ...*v1.Operation) error {
return nil
}
func (m *MemStore) Set(op ...*v1.Operation) error {
m.mu.Lock()
defer m.mu.Unlock()
for _, o := range op {
if o.Id == 0 {
m.nextID++
o.Id = m.nextID
}
if o.FlowId == 0 {
o.FlowId = o.Id
}
if err := protoutil.ValidateOperation(o); err != nil {
return err
}
}
for _, o := range op {
m.operations[o.Id] = o
}
return nil
}
func (m *MemStore) Get(opID int64) (*v1.Operation, error) {
m.mu.Lock()
defer m.mu.Unlock()
+12 -2
View File
@@ -133,6 +133,14 @@ func (o *OpLog) Get(opID int64) (*v1.Operation, error) {
return o.store.Get(opID)
}
func (o *OpLog) Set(eventType OperationEvent, ops ...*v1.Operation) error {
if err := o.store.Set(ops...); err != nil {
return err
}
o.notify(ops, eventType)
return nil
}
func (o *OpLog) Add(ops ...*v1.Operation) error {
for _, o := range ops {
if o.Id != 0 || o.Modno != 0 {
@@ -181,6 +189,8 @@ func (o *OpLog) GetHighestOpIDAndModno(q Query) (int64, int64, error) {
}
type OpStore interface {
// SetVersion sets the data version
SetVersion(version int64) error
// Query returns all operations that match the query.
Query(q Query, f func(*v1.Operation) error) error
// QueryMetadata is like Query, but only returns metadata about the operations.
@@ -192,6 +202,8 @@ type OpStore interface {
GetHighestOpIDAndModno(q Query) (int64, int64, error)
// Add adds the given operations to the store.
Add(op ...*v1.Operation) error
// Set sets the given operations in the store exactly as provided. It will assign Id and FlowId if not set, but otherwise stores the operation as-is.
Set(op ...*v1.Operation) error
// Update updates the given operations in the store.
Update(op ...*v1.Operation) error
// Delete removes the operations with the given IDs from the store, and returns the removed operations.
@@ -200,8 +212,6 @@ type OpStore interface {
Transform(q Query, f func(*v1.Operation) (*v1.Operation, error)) error
// Version returns the current data version
Version() (int64, error)
// SetVersion sets the data version
SetVersion(version int64) error
}
// OpMetadata is a struct that contains metadata about an operation without fetching the operation itself.
+49 -1
View File
@@ -27,7 +27,6 @@ import (
_ "github.com/ncruces/go-sqlite3/embed"
"github.com/ncruces/go-sqlite3/vfs"
"github.com/ncruces/go-sqlite3/vfs/memdb"
_ "github.com/ncruces/go-sqlite3/vfs/memdb"
)
var ErrLocked = errors.New("sqlite db is locked")
@@ -452,6 +451,55 @@ func (m *SqliteStore) Update(op ...*v1.Operation) error {
return tx.Commit()
}
func (m *SqliteStore) setInternal(tx *sql.Tx, op ...*v1.Operation) error {
for _, o := range op {
ogid, err := m.findOrCreateGroup(tx, o)
if err != nil {
return fmt.Errorf("find ogid: %v", err)
}
query := `INSERT OR REPLACE INTO operations
(id, ogid, original_id, original_flow_id, modno, flow_id, start_time_ms, status, snapshot_id, operation)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)`
bytes, err := proto.Marshal(o)
if err != nil {
return fmt.Errorf("marshal operation: %v", err)
}
_, err = tx.ExecContext(context.Background(), query, o.Id, ogid, o.OriginalId, o.OriginalFlowId, o.Modno, o.FlowId, o.UnixTimeStartMs, int64(o.Status), o.SnapshotId, bytes)
if err != nil {
return fmt.Errorf("set operation: %v", err)
}
}
return nil
}
func (m *SqliteStore) Set(op ...*v1.Operation) error {
tx, err := m.dbpool.BeginTx(context.Background(), &sql.TxOptions{Isolation: sql.LevelSerializable})
if err != nil {
return fmt.Errorf("set operation: begin tx: %v", err)
}
defer tx.Rollback()
for _, o := range op {
if o.Id == 0 {
o.Id = m.highestOpID.Add(1)
}
if o.FlowId == 0 {
o.FlowId = o.Id
}
if err := protoutil.ValidateOperation(o); err != nil {
return err
}
}
if err := m.setInternal(tx, op...); err != nil {
return err
}
return tx.Commit()
}
func (m *SqliteStore) updateInternal(tx *sql.Tx, op ...*v1.Operation) error {
for _, o := range op {
o.Modno = m.highestModno.Add(1)
@@ -1016,6 +1016,390 @@ func BenchmarkList(b *testing.B) {
}
}
func TestSet_NewOperation(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create a new operation without ID
op := &v1.Operation{
Modno: 1,
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
// Set should assign an ID and FlowID
if err := store.Set(op); err != nil {
t.Fatalf("error setting operation: %s", err)
}
if op.Id == 0 {
t.Error("expected Set to assign an ID")
}
if op.FlowId == 0 {
t.Error("expected Set to assign a FlowID")
}
if op.FlowId != op.Id {
t.Errorf("expected FlowID to equal ID, got FlowID=%d, ID=%d", op.FlowId, op.Id)
}
if op.Modno != 1 {
t.Errorf("expected Modno to remain 1, got %d", op.Modno)
}
// Verify the operation was stored
gotOp, err := store.Get(op.Id)
if err != nil {
t.Fatalf("error getting operation: %s", err)
}
if diff := cmp.Diff(op, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("unexpected diff in operation: %v", diff)
}
})
}
}
func TestSet_ExistingID(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create an operation with a specific ID
op := &v1.Operation{
Id: 100,
Modno: 50,
FlowId: 100,
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
// Set should use the provided ID and Modno
if err := store.Set(op); err != nil {
t.Fatalf("error setting operation: %s", err)
}
if op.Id != 100 {
t.Errorf("expected ID to be 100, got %d", op.Id)
}
if op.Modno != 50 {
t.Errorf("expected Modno to be 50, got %d", op.Modno)
}
// Verify the operation was stored with exact values
gotOp, err := store.Get(100)
if err != nil {
t.Fatalf("error getting operation: %s", err)
}
if diff := cmp.Diff(op, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("unexpected diff in operation: %v", diff)
}
})
}
}
func TestSet_ReplaceExisting(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Add an initial operation
op1 := &v1.Operation{
Id: 200,
Modno: 10,
FlowId: 200,
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_PENDING,
Op: &v1.Operation_OperationBackup{},
}
if err := store.Set(op1); err != nil {
t.Fatalf("error setting first operation: %s", err)
}
// Replace with a different operation with the same ID but different Modno
op2 := &v1.Operation{
Id: 200,
Modno: 20,
FlowId: 200,
UnixTimeStartMs: 5678,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
if err := store.Set(op2); err != nil {
t.Fatalf("error setting second operation: %s", err)
}
// Verify the operation was replaced
gotOp, err := store.Get(200)
if err != nil {
t.Fatalf("error getting operation: %s", err)
}
if gotOp.UnixTimeStartMs != 5678 {
t.Errorf("expected UnixTimeStartMs to be 5678, got %d", gotOp.UnixTimeStartMs)
}
if gotOp.Status != v1.OperationStatus_STATUS_SUCCESS {
t.Errorf("expected status to be SUCCESS, got %v", gotOp.Status)
}
if gotOp.Modno != 20 {
t.Errorf("expected Modno to be 20, got %d", gotOp.Modno)
}
if diff := cmp.Diff(op2, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("unexpected diff in operation: %v", diff)
}
})
}
}
func TestSet_PreservesExactModno(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create operations with specific Modno values
ops := []*v1.Operation{
{
Id: 1,
Modno: 100,
FlowId: 1,
UnixTimeStartMs: 1000,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
},
{
Id: 2,
Modno: 50, // Lower Modno than previous
FlowId: 2,
UnixTimeStartMs: 2000,
PlanId: "plan2",
RepoId: "repo2",
RepoGuid: "repo2",
InstanceId: "instance2",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
},
{
Id: 3,
Modno: 200,
FlowId: 3,
UnixTimeStartMs: 3000,
PlanId: "plan3",
RepoId: "repo3",
RepoGuid: "repo3",
InstanceId: "instance3",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
},
}
for _, op := range ops {
if err := store.Set(op); err != nil {
t.Fatalf("error setting operation: %s", err)
}
}
// Verify each operation has its exact Modno
for _, expectedOp := range ops {
gotOp, err := store.Get(expectedOp.Id)
if err != nil {
t.Fatalf("error getting operation %d: %s", expectedOp.Id, err)
}
if gotOp.Modno != expectedOp.Modno {
t.Errorf("operation %d: expected Modno %d, got %d", expectedOp.Id, expectedOp.Modno, gotOp.Modno)
}
if diff := cmp.Diff(expectedOp, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("operation %d: unexpected diff: %v", expectedOp.Id, diff)
}
}
})
}
}
func TestSet_MultipleOperations(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create multiple operations to set in one call
ops := make([]*v1.Operation, 10)
for i := 0; i < 10; i++ {
ops[i] = &v1.Operation{
Id: int64(i + 1),
Modno: int64(i*10 + 5),
FlowId: int64(i + 1),
UnixTimeStartMs: int64(1000 + i),
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
}
// Set all operations at once
if err := store.Set(ops...); err != nil {
t.Fatalf("error setting multiple operations: %s", err)
}
// Verify all operations were stored correctly
for i, expectedOp := range ops {
gotOp, err := store.Get(expectedOp.Id)
if err != nil {
t.Fatalf("error getting operation %d: %s", i, err)
}
if diff := cmp.Diff(expectedOp, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("operation %d: unexpected diff: %v", i, diff)
}
}
})
}
}
func TestSet_WithoutFlowID(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create an operation with ID but without FlowID
op := &v1.Operation{
Id: 300,
Modno: 10,
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
if err := store.Set(op); err != nil {
t.Fatalf("error setting operation: %s", err)
}
// FlowID should be set to ID
if op.FlowId != 300 {
t.Errorf("expected FlowID to be 300, got %d", op.FlowId)
}
// Verify the operation was stored correctly
gotOp, err := store.Get(300)
if err != nil {
t.Fatalf("error getting operation: %s", err)
}
if gotOp.FlowId != 300 {
t.Errorf("stored operation: expected FlowID to be 300, got %d", gotOp.FlowId)
}
})
}
}
func TestSet_DifferentFlowID(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Create an operation with a different FlowID
op := &v1.Operation{
Id: 400,
Modno: 10,
FlowId: 500, // Different from ID
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
}
if err := store.Set(op); err != nil {
t.Fatalf("error setting operation: %s", err)
}
// FlowID should remain 500
if op.FlowId != 500 {
t.Errorf("expected FlowID to be 500, got %d", op.FlowId)
}
// Verify the operation was stored correctly
gotOp, err := store.Get(400)
if err != nil {
t.Fatalf("error getting operation: %s", err)
}
if gotOp.FlowId != 500 {
t.Errorf("stored operation: expected FlowID to be 500, got %d", gotOp.FlowId)
}
if diff := cmp.Diff(op, gotOp, protocmp.Transform()); diff != "" {
t.Errorf("unexpected diff in operation: %v", diff)
}
})
}
}
func TestSet_Validation(t *testing.T) {
// t.Parallel()
for name, store := range StoresForTest(t) {
t.Run(name, func(t *testing.T) {
// Try to set an invalid operation (missing required fields)
op := &v1.Operation{
Id: 600,
Modno: 10,
// Missing required fields like PlanId, RepoId, etc.
}
err := store.Set(op)
if err == nil {
t.Error("expected Set to return validation error for invalid operation")
}
// Try to set an operation without Modno
op2 := &v1.Operation{
Id: 601,
FlowId: 601,
UnixTimeStartMs: 1234,
PlanId: "plan1",
RepoId: "repo1",
RepoGuid: "repo1",
InstanceId: "instance1",
Status: v1.OperationStatus_STATUS_SUCCESS,
Op: &v1.Operation_OperationBackup{},
// Missing Modno
}
err = store.Set(op2)
if err == nil {
t.Error("expected Set to return validation error for operation without Modno")
}
})
}
}
func BenchmarkGetLastItem(b *testing.B) {
for _, count := range []int{100, 1000, 10000} {
b.Run(fmt.Sprintf("%d", count), func(b *testing.B) {