diff --git a/internal/orchestrator/tasks/taskcollectgarbage.go b/internal/orchestrator/tasks/taskcollectgarbage.go index 4c2bd981..3ebd468c 100644 --- a/internal/orchestrator/tasks/taskcollectgarbage.go +++ b/internal/orchestrator/tasks/taskcollectgarbage.go @@ -3,6 +3,7 @@ package tasks import ( "context" "fmt" + "reflect" "time" v1 "github.com/garethgeorge/backrest/gen/go/v1" @@ -11,21 +12,45 @@ import ( "go.uber.org/zap" ) +type gcSettingsForType struct { + maxAge time.Duration + keepMin int + keepMax int +} + +type groupByKey struct { + Repo string + Plan string + Type reflect.Type +} + const ( - gcStartupDelay = 60 * time.Second + gcStartupDelay = 1 * time.Second gcInterval = 24 * time.Hour ) -// gcAgeForOperation returns the age at which an operation is eligible for garbage collection. -func gcAgeForOperation(op *v1.Operation) time.Duration { - switch op.Op.(type) { - // stats, check, and prune operations are kept for a year - case *v1.Operation_OperationStats, *v1.Operation_OperationCheck, *v1.Operation_OperationPrune: - return 365 * 24 * time.Hour - // all other operations are kept for 30 days - default: - return 30 * 24 * time.Hour - } +var gcSettings = map[reflect.Type]gcSettingsForType{ + reflect.TypeOf(&v1.Operation_OperationStats{}): { + maxAge: 365 * 24 * time.Hour, + keepMin: 1, + keepMax: 100, + }, + reflect.TypeOf(&v1.Operation_OperationCheck{}): { + maxAge: 365 * 24 * time.Hour, + keepMin: 1, + keepMax: 12, + }, + reflect.TypeOf(&v1.Operation_OperationPrune{}): { + maxAge: 365 * 24 * time.Hour, + keepMin: 1, + keepMax: 12, + }, +} + +var defaultGcSettings = gcSettingsForType{ + maxAge: 30 * 24 * time.Hour, + keepMin: 1, + keepMax: 100, } type CollectGarbageTask struct { @@ -85,23 +110,65 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error { return fmt.Errorf("identifying forgotten snapshots: %w", err) } + // keep track of IDs that are still valid and of the IDs that are being forgotten validIDs := make(map[int64]struct{}) forgetIDs := []int64{} curTime := curTimeMillis() - if err := log.Query(oplog.SelectAll, func(op *v1.Operation) error { + + var deletedByMaxAge, deletedByMaxCount, deletedByForgottenSnapshot int + deletedByType := make(map[string]int) + stats := make(map[groupByKey]gcSettingsForType) + + if err := log.Query(oplog.Query{Reversed: true}, func(op *v1.Operation) error { validIDs[op.Id] = struct{}{} forgot, ok := snapshotForgottenForFlow[op.FlowId] - if !ok { - // no snapshot associated with this flow; check if it's old enough to be gc'd - maxAgeForOperation := gcAgeForOperation(op) - if curTime-op.UnixTimeStartMs > maxAgeForOperation.Milliseconds() { + if ok { + if forgot { + // snapshot is forgotten; this operation is eligible for gc forgetIDs = append(forgetIDs, op.Id) + deletedByForgottenSnapshot++ + deletedByType[reflect.TypeOf(op.Op).String()]++ } - } else if forgot { - // snapshot is forgotten; this operation is eligible for gc - forgetIDs = append(forgetIDs, op.Id) + return nil } + + key := groupByKey{ + Repo: op.RepoId, + Plan: op.PlanId, + Type: reflect.TypeOf(op.Op), + } + + st, ok := stats[key] + if !ok { + gcSettings, ok := gcSettings[reflect.TypeOf(op.Op)] + if !ok { + st = defaultGcSettings + } else { + st = gcSettings + } + } + + st.keepMax-- // decrement the max retention, when this < 0 operation must be gc'd + st.keepMin-- // decrement the min retention, when this < 0 we can start gc'ing + stats[key] = st // update the stats + + if st.keepMin >= 0 { + // can't delete if within min retention period + return nil + } + if st.keepMax < 0 { + // max retention reached; this operation must be gc'd. + forgetIDs = append(forgetIDs, op.Id) + deletedByMaxCount++ + deletedByType[key.Type.String()]++ + } else if curTime-op.UnixTimeStartMs > st.maxAge.Milliseconds() { + // operation is old enough to be gc'd + forgetIDs = append(forgetIDs, op.Id) + deletedByMaxAge++ + deletedByType[key.Type.String()]++ + } + return nil }); err != nil { return fmt.Errorf("identifying gc eligible operations: %w", err) @@ -109,14 +176,13 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error { if err := log.Delete(forgetIDs...); err != nil { return fmt.Errorf("removing gc eligible operations: %w", err) - } else if len(forgetIDs) > 0 { - for _, id := range forgetIDs { - delete(validIDs, id) - } + } + for _, id := range forgetIDs { // update validIDs with respect to the just deleted operations + delete(validIDs, id) } zap.L().Info("collecting garbage operations", - zap.Any("operations_removed", len(forgetIDs))) + zap.Int("operations_removed", len(forgetIDs)), zap.Int("removed_by_age", deletedByMaxAge), zap.Int("removed_by_limit", deletedByMaxCount), zap.Int("removed_by_snapshot_forgotten", deletedByForgottenSnapshot), zap.Any("removed_by_type", deletedByType)) // cleaning up logstore toDelete := []string{}