From 7c8ded2fcc4b597e21c24f451e02cc14ba9a015c Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Mon, 1 Jul 2024 21:05:50 -0700 Subject: [PATCH] fix: make instance ID required field --- internal/oplog/migrations.go | 1 + internal/oplog/oplog_test.go | 86 +++++++------------ internal/orchestrator/tasks/task.go | 5 +- .../orchestrator/tasks/taskindexsnapshots.go | 6 +- internal/protoutil/validation.go | 3 +- 5 files changed, 42 insertions(+), 59 deletions(-) diff --git a/internal/oplog/migrations.go b/internal/oplog/migrations.go index 895d8b7..ae2d05e 100644 --- a/internal/oplog/migrations.go +++ b/internal/oplog/migrations.go @@ -15,6 +15,7 @@ var migrations = []func(*OpLog, *bbolt.Tx) error{ migration001FlowID, migration002InstanceID, migration003ResetLastValidated, + migration002InstanceID, // re-run migration002InstanceID to fix improperly set instance IDs } var CurrentVersion = int64(len(migrations)) diff --git a/internal/oplog/oplog_test.go b/internal/oplog/oplog_test.go index 2315b12..0e890c2 100644 --- a/internal/oplog/oplog_test.go +++ b/internal/oplog/oplog_test.go @@ -50,6 +50,7 @@ func TestAddOperation(t *testing.T) { UnixTimeStartMs: 1234, RepoId: "testrepo", PlanId: "testplan", + InstanceId: "testinstance", Op: &v1.Operation_OperationBackup{}, }, wantErr: false, @@ -60,6 +61,7 @@ func TestAddOperation(t *testing.T) { UnixTimeStartMs: 1234, RepoId: "testrepo", PlanId: "testplan", + InstanceId: "testinstance", Op: &v1.Operation_OperationIndexSnapshot{ OperationIndexSnapshot: &v1.OperationIndexSnapshot{ Snapshot: &v1.ResticSnapshot{ @@ -76,6 +78,7 @@ func TestAddOperation(t *testing.T) { Id: 1, RepoId: "testrepo", PlanId: "testplan", + InstanceId: "testinstance", UnixTimeStartMs: 1234, Op: &v1.Operation_OperationBackup{}, }, @@ -99,6 +102,15 @@ func TestAddOperation(t *testing.T) { }, wantErr: true, }, + { + name: "operation with instance only", + op: &v1.Operation{ + UnixTimeStartMs: 1234, + InstanceId: "testinstance", + Op: &v1.Operation_OperationBackup{}, + }, + wantErr: true, + }, } for _, tc := range tests { @@ -129,6 +141,7 @@ func TestListOperation(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "plan1", RepoId: "repo1", + InstanceId: "instance1", DisplayMessage: "op1", Op: &v1.Operation_OperationBackup{}, }, @@ -136,6 +149,7 @@ func TestListOperation(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "plan1", RepoId: "repo2", + InstanceId: "instance2", DisplayMessage: "op2", Op: &v1.Operation_OperationBackup{}, }, @@ -143,7 +157,9 @@ func TestListOperation(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "plan2", RepoId: "repo2", + InstanceId: "instance3", DisplayMessage: "op3", + FlowId: 943, Op: &v1.Operation_OperationBackup{}, }, } @@ -156,35 +172,36 @@ func TestListOperation(t *testing.T) { tests := []struct { name string - byPlan bool - byRepo bool - id string + query Query expected []string }{ { name: "list plan1", - byPlan: true, - id: "plan1", + query: Query{PlanId: "plan1"}, expected: []string{"op1", "op2"}, }, { name: "list plan2", - byPlan: true, - id: "plan2", + query: Query{PlanId: "plan2"}, expected: []string{"op3"}, }, { name: "list repo1", - byRepo: true, - id: "repo1", + query: Query{RepoId: "repo1"}, expected: []string{"op1"}, }, { name: "list repo2", - byRepo: true, - id: "repo2", + query: Query{RepoId: "repo2"}, expected: []string{"op2", "op3"}, }, + { + name: "list flow 943", + query: Query{FlowId: 943}, + expected: []string{ + "op3", + }, + }, } for _, tc := range tests { @@ -197,13 +214,7 @@ func TestListOperation(t *testing.T) { ops = append(ops, op) return nil } - if tc.byPlan { - err = log.ForEach(Query{PlanId: tc.id}, indexutil.CollectAll(), collect) - } else if tc.byRepo { - err = log.ForEach(Query{RepoId: tc.id}, indexutil.CollectAll(), collect) - } else { - t.Fatalf("must specify byPlan or byRepo") - } + err = log.ForEach(tc.query, indexutil.CollectAll(), collect) if err != nil { t.Fatalf("error listing operations: %s", err) } @@ -215,42 +226,6 @@ func TestListOperation(t *testing.T) { } } -func TestListByFlowId(t *testing.T) { - t.Parallel() - - log, err := NewOpLog(t.TempDir() + "/test.boltdb") - if err != nil { - t.Fatalf("error creating oplog: %s", err) - } - t.Cleanup(func() { log.Close() }) - - op := &v1.Operation{ - UnixTimeStartMs: 1234, - PlanId: "plan1", - RepoId: "repo1", - FlowId: 1, - Op: &v1.Operation_OperationBackup{}, - } - - if err := log.Add(op); err != nil { - t.Fatalf("error adding operation: %s", err) - } - - var ops []*v1.Operation - if err := log.ForEach(Query{FlowId: 1}, indexutil.CollectAll(), func(op *v1.Operation) error { - ops = append(ops, op) - return nil - }); err != nil { - t.Fatalf("error listing operations: %s", err) - } - if len(ops) != 1 { - t.Fatalf("want 1 operation, got %d", len(ops)) - } - if ops[0].Id != op.Id { - t.Errorf("want operation ID %d, got %d", op.Id, ops[0].Id) - } -} - func TestBigIO(t *testing.T) { t.Parallel() @@ -267,6 +242,7 @@ func TestBigIO(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "plan1", RepoId: "repo1", + InstanceId: "instance1", Op: &v1.Operation_OperationBackup{}, }); err != nil { t.Fatalf("error adding operation: %s", err) @@ -289,6 +265,7 @@ func TestIndexSnapshot(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "plan1", RepoId: "repo1", + InstanceId: "instance1", SnapshotId: snapshotId, Op: &v1.Operation_OperationIndexSnapshot{}, } @@ -324,6 +301,7 @@ func TestUpdateOperation(t *testing.T) { UnixTimeStartMs: 1234, PlanId: "oldplan", RepoId: "oldrepo", + InstanceId: "instance1", SnapshotId: snapshotId, } if err := log.Add(op); err != nil { diff --git a/internal/orchestrator/tasks/task.go b/internal/orchestrator/tasks/task.go index 011c3af..c3b904c 100644 --- a/internal/orchestrator/tasks/task.go +++ b/internal/orchestrator/tasks/task.go @@ -15,8 +15,9 @@ import ( var NeverScheduledTask = ScheduledTask{} const ( - PlanForUnassociatedOperations = "_unassociated_" - PlanForSystemTasks = "_system_" // plan for system tasks e.g. garbage collection, prune, stats, etc. + PlanForUnassociatedOperations = "_unassociated_" + InstanceIDForUnassociatedOperations = "_unassociated_" + PlanForSystemTasks = "_system_" // plan for system tasks e.g. garbage collection, prune, stats, etc. TaskPriorityStats = 0 TaskPriorityDefault = 1 << 1 // default priority diff --git a/internal/orchestrator/tasks/taskindexsnapshots.go b/internal/orchestrator/tasks/taskindexsnapshots.go index 51890d6..40a80d5 100644 --- a/internal/orchestrator/tasks/taskindexsnapshots.go +++ b/internal/orchestrator/tasks/taskindexsnapshots.go @@ -191,7 +191,11 @@ func planForSnapshot(snapshot *v1.ResticSnapshot) string { } func instanceIDForSnapshot(snapshot *v1.ResticSnapshot) string { - return repo.InstanceIDFromTags(snapshot.Tags) + id := repo.InstanceIDFromTags(snapshot.Tags) + if id != "" { + return id + } + return InstanceIDForUnassociatedOperations } // tryMigrate checks if the snapshots use the latest backrest tag set and migrates them if necessary. diff --git a/internal/protoutil/validation.go b/internal/protoutil/validation.go index 77ab4b3..2dc639c 100644 --- a/internal/protoutil/validation.go +++ b/internal/protoutil/validation.go @@ -6,7 +6,6 @@ import ( v1 "github.com/garethgeorge/backrest/gen/go/v1" "github.com/garethgeorge/backrest/pkg/restic" - "go.uber.org/zap" ) // ValidateOperation verifies critical properties of the operation proto. @@ -24,7 +23,7 @@ func ValidateOperation(op *v1.Operation) error { return errors.New("operation.plan_id is required") } if op.InstanceId == "" { - zap.L().Warn("operation.instance_id should typically be set") + return errors.New("operation.instance_id is required") } if op.SnapshotId != "" { if err := restic.ValidateSnapshotId(op.SnapshotId); err != nil {