feat: implement 'on error retry' policy (#428)

This commit is contained in:
Gareth
2024-08-26 19:21:18 -07:00
committed by GitHub
parent 8c1cf791bb
commit 038bc87070
16 changed files with 654 additions and 111 deletions

View File

@@ -21,7 +21,6 @@ import (
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/oplog/bboltstore"
"github.com/garethgeorge/backrest/internal/orchestrator"
"github.com/garethgeorge/backrest/internal/orchestrator/tasks"
"github.com/garethgeorge/backrest/internal/resticinstaller"
"github.com/garethgeorge/backrest/internal/rotatinglog"
"golang.org/x/sync/errgroup"
@@ -102,6 +101,7 @@ func TestBackup(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
@@ -143,7 +143,7 @@ func TestBackup(t *testing.T) {
// Wait for the index snapshot operation to appear in the oplog.
var snapshotOp *v1.Operation
if err := retry(t, 10, 2*time.Second, func() error {
if err := retry(t, 10, 1*time.Second, func() error {
operations := getOperations(t, sut.oplog)
if index := slices.IndexFunc(operations, func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationIndexSnapshot)
@@ -162,7 +162,7 @@ func TestBackup(t *testing.T) {
}
// Wait for a forget operation to appear in the oplog.
if err := retry(t, 10, 2*time.Second, func() error {
if err := retry(t, 10, 1*time.Second, func() error {
operations := getOperations(t, sut.oplog)
if index := slices.IndexFunc(operations, func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationForget)
@@ -181,6 +181,8 @@ func TestBackup(t *testing.T) {
}
func TestMultipleBackup(t *testing.T) {
t.Parallel()
sut := createSystemUnderTest(t, &config.MemoryStore{
Config: &v1.Config{
Modno: 1234,
@@ -190,6 +192,7 @@ func TestMultipleBackup(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
@@ -226,7 +229,7 @@ func TestMultipleBackup(t *testing.T) {
}
// Wait for a forget that removed 1 snapshot to appear in the oplog
if err := retry(t, 10, 2*time.Second, func() error {
if err := retry(t, 10, 1*time.Second, func() error {
operations := getOperations(t, sut.oplog)
if index := slices.IndexFunc(operations, func(op *v1.Operation) bool {
forget, ok := op.GetOp().(*v1.Operation_OperationForget)
@@ -261,6 +264,7 @@ func TestHookExecution(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
@@ -312,7 +316,7 @@ func TestHookExecution(t *testing.T) {
}
// Wait for two hook operations to appear in the oplog
if err := retry(t, 10, 2*time.Second, func() error {
if err := retry(t, 10, 1*time.Second, func() error {
hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationRunHook)
return !ok
@@ -336,7 +340,7 @@ func TestHookExecution(t *testing.T) {
}
}
func TestHookCancellation(t *testing.T) {
func TestHookOnErrorHandling(t *testing.T) {
t.Parallel()
if runtime.GOOS == "windows" {
@@ -352,11 +356,12 @@ func TestHookCancellation(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
{
Id: "test",
Id: "test-cancel",
Repo: "local",
Paths: []string{
t.TempDir(),
@@ -378,6 +383,75 @@ func TestHookCancellation(t *testing.T) {
},
},
},
{
Id: "test-error",
Repo: "local",
Paths: []string{
t.TempDir(),
},
Schedule: &v1.Schedule{
Schedule: &v1.Schedule_Disabled{Disabled: true},
},
Hooks: []*v1.Hook{
{
Conditions: []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_START,
},
Action: &v1.Hook_ActionCommand{
ActionCommand: &v1.Hook_Command{
Command: "exit 123",
},
},
OnError: v1.Hook_ON_ERROR_FATAL,
},
},
},
{
Id: "test-ignore",
Repo: "local",
Paths: []string{
t.TempDir(),
},
Schedule: &v1.Schedule{
Schedule: &v1.Schedule_Disabled{Disabled: true},
},
Hooks: []*v1.Hook{
{
Conditions: []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_START,
},
Action: &v1.Hook_ActionCommand{
ActionCommand: &v1.Hook_Command{
Command: "exit 123",
},
},
OnError: v1.Hook_ON_ERROR_IGNORE,
},
},
},
{
Id: "test-retry",
Repo: "local",
Paths: []string{
t.TempDir(),
},
Schedule: &v1.Schedule{
Schedule: &v1.Schedule_Disabled{Disabled: true},
},
Hooks: []*v1.Hook{
{
Conditions: []v1.Hook_Condition{
v1.Hook_CONDITION_SNAPSHOT_START,
},
Action: &v1.Hook_ActionCommand{
ActionCommand: &v1.Hook_Command{
Command: "exit 123",
},
},
OnError: v1.Hook_ON_ERROR_RETRY_10MINUTES,
},
},
},
},
},
})
@@ -388,43 +462,93 @@ func TestHookCancellation(t *testing.T) {
sut.orch.Run(ctx)
}()
_, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: "test"}))
if !errors.Is(err, tasks.ErrTaskCancelled) {
t.Fatalf("Backup() error = %v, want errors.Is(err, tasks.ErrTaskCancelled)", err)
tests := []struct {
name string
plan string
wantHookStatus v1.OperationStatus
wantBackupStatus v1.OperationStatus
wantBackupError bool
noWaitForBackup bool
}{
{
name: "cancel",
plan: "test-cancel",
wantHookStatus: v1.OperationStatus_STATUS_ERROR,
wantBackupStatus: v1.OperationStatus_STATUS_USER_CANCELLED,
wantBackupError: true,
},
{
name: "error",
plan: "test-error",
wantHookStatus: v1.OperationStatus_STATUS_ERROR,
wantBackupStatus: v1.OperationStatus_STATUS_ERROR,
wantBackupError: true,
},
{
name: "ignore",
plan: "test-ignore",
wantHookStatus: v1.OperationStatus_STATUS_ERROR,
wantBackupStatus: v1.OperationStatus_STATUS_SUCCESS,
wantBackupError: false,
},
{
name: "retry",
plan: "test-retry",
wantHookStatus: v1.OperationStatus_STATUS_ERROR,
wantBackupStatus: v1.OperationStatus_STATUS_PENDING,
wantBackupError: false,
noWaitForBackup: true,
},
}
// Wait for a hook operation to appear in the oplog
if err := retry(t, 10, 2*time.Second, func() error {
hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationRunHook)
return !ok
})
if len(hookOps) != 1 {
return fmt.Errorf("expected 1 hook operations, got %d", len(hookOps))
}
if hookOps[0].Status != v1.OperationStatus_STATUS_ERROR {
return fmt.Errorf("expected hook operation error status, got %v", hookOps[0].Status)
}
return nil
}); err != nil {
t.Fatalf("Couldn't find hooks in oplog: %v", err)
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
sut.opstore.ResetForTest(t)
// assert that the backup operation is in the log and is cancelled
if err := retry(t, 10, 2*time.Second, func() error {
backupOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
return !ok
var errgroup errgroup.Group
errgroup.Go(func() error {
_, err := sut.handler.Backup(context.Background(), connect.NewRequest(&types.StringValue{Value: tc.plan}))
if (err != nil) != tc.wantBackupError {
return fmt.Errorf("Backup() error = %v, wantErr %v", err, tc.wantBackupError)
}
return nil
})
if !tc.noWaitForBackup {
if err := errgroup.Wait(); err != nil {
t.Fatalf(err.Error())
}
}
// Wait for hook operation to be attempted in the oplog
if err := retry(t, 10, 1*time.Second, func() error {
hookOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationRunHook)
return !ok
})
if len(hookOps) != 1 {
return fmt.Errorf("expected 1 hook operations, got %d", len(hookOps))
}
if hookOps[0].Status != tc.wantHookStatus {
return fmt.Errorf("expected hook operation error status, got %v", hookOps[0].Status)
}
return nil
}); err != nil {
t.Fatalf("Couldn't find hook operation in oplog: %v", err)
}
backupOps := slices.DeleteFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
return !ok
})
if len(backupOps) != 1 {
t.Errorf("expected 1 backup operation, got %d", len(backupOps))
}
if backupOps[0].Status != tc.wantBackupStatus {
t.Errorf("expected backup operation cancelled status, got %v", backupOps[0].Status)
}
})
if len(backupOps) != 1 {
return fmt.Errorf("expected 1 backup operation, got %d", len(backupOps))
}
if backupOps[0].Status != v1.OperationStatus_STATUS_USER_CANCELLED {
return fmt.Errorf("expected backup operation cancelled status, got %v", backupOps[0].Status)
}
return nil
}); err != nil {
t.Fatalf("Couldn't find hooks in oplog: %v", err)
}
}
@@ -440,6 +564,7 @@ func TestCancelBackup(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
@@ -478,7 +603,7 @@ func TestCancelBackup(t *testing.T) {
// Find the backup operation ID in the oplog
var backupOpId int64
if err := retry(t, 100, 50*time.Millisecond, func() error {
if err := retry(t, 100, 10*time.Millisecond, func() error {
operations := getOperations(t, sut.oplog)
for _, op := range operations {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
@@ -498,6 +623,7 @@ func TestCancelBackup(t *testing.T) {
}
return nil
})
if err := errgroup.Wait(); err != nil {
t.Fatalf(err.Error())
}
@@ -505,9 +631,9 @@ func TestCancelBackup(t *testing.T) {
// Assert that the backup operation was cancelled
if slices.IndexFunc(getOperations(t, sut.oplog), func(op *v1.Operation) bool {
_, ok := op.GetOp().(*v1.Operation_OperationBackup)
return op.Status == v1.OperationStatus_STATUS_USER_CANCELLED && ok
return op.Status == v1.OperationStatus_STATUS_ERROR && ok
}) == -1 {
t.Fatalf("Expected a cancelled backup operation in the log")
t.Fatalf("Expected a failed backup operation in the log")
}
}
@@ -528,6 +654,7 @@ func TestRestore(t *testing.T) {
Id: "local",
Uri: t.TempDir(),
Password: "test",
Flags: []string{"--no-cache"},
},
},
Plans: []*v1.Plan{
@@ -637,6 +764,7 @@ func TestRestore(t *testing.T) {
type systemUnderTest struct {
handler *BackrestHandler
oplog *oplog.OpLog
opstore *bboltstore.BboltStore
orch *orchestrator.Orchestrator
logStore *rotatinglog.RotatingLog
config *v1.Config
@@ -684,6 +812,7 @@ func createSystemUnderTest(t *testing.T, config config.ConfigStore) systemUnderT
return systemUnderTest{
handler: h,
oplog: oplog,
opstore: opstore,
orch: orch,
logStore: logStore,
config: cfg,