fix: garbage collection with more sensible limits grouped by plan/repo (#555)
Build Snapshot Release / build (push) Has been cancelled
Release Please / release-please (push) Has been cancelled
Test / test-nix (push) Has been cancelled
Test / test-win (push) Has been cancelled

This commit is contained in:
Gareth
2024-11-13 21:17:03 -08:00
committed by GitHub
parent 78c01a1a35
commit 492beb2935
@@ -3,6 +3,7 @@ package tasks
import (
"context"
"fmt"
"reflect"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
@@ -11,21 +12,45 @@ import (
"go.uber.org/zap"
)
type gcSettingsForType struct {
maxAge time.Duration
keepMin int
keepMax int
}
type groupByKey struct {
Repo string
Plan string
Type reflect.Type
}
const (
gcStartupDelay = 60 * time.Second
gcStartupDelay = 1 * time.Second
gcInterval = 24 * time.Hour
)
// gcAgeForOperation returns the age at which an operation is eligible for garbage collection.
func gcAgeForOperation(op *v1.Operation) time.Duration {
switch op.Op.(type) {
// stats, check, and prune operations are kept for a year
case *v1.Operation_OperationStats, *v1.Operation_OperationCheck, *v1.Operation_OperationPrune:
return 365 * 24 * time.Hour
// all other operations are kept for 30 days
default:
return 30 * 24 * time.Hour
}
var gcSettings = map[reflect.Type]gcSettingsForType{
reflect.TypeOf(&v1.Operation_OperationStats{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 100,
},
reflect.TypeOf(&v1.Operation_OperationCheck{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 12,
},
reflect.TypeOf(&v1.Operation_OperationPrune{}): {
maxAge: 365 * 24 * time.Hour,
keepMin: 1,
keepMax: 12,
},
}
var defaultGcSettings = gcSettingsForType{
maxAge: 30 * 24 * time.Hour,
keepMin: 1,
keepMax: 100,
}
type CollectGarbageTask struct {
@@ -85,23 +110,65 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error {
return fmt.Errorf("identifying forgotten snapshots: %w", err)
}
// keep track of IDs that are still valid and of the IDs that are being forgotten
validIDs := make(map[int64]struct{})
forgetIDs := []int64{}
curTime := curTimeMillis()
if err := log.Query(oplog.SelectAll, func(op *v1.Operation) error {
var deletedByMaxAge, deletedByMaxCount, deletedByForgottenSnapshot int
deletedByType := make(map[string]int)
stats := make(map[groupByKey]gcSettingsForType)
if err := log.Query(oplog.Query{Reversed: true}, func(op *v1.Operation) error {
validIDs[op.Id] = struct{}{}
forgot, ok := snapshotForgottenForFlow[op.FlowId]
if !ok {
// no snapshot associated with this flow; check if it's old enough to be gc'd
maxAgeForOperation := gcAgeForOperation(op)
if curTime-op.UnixTimeStartMs > maxAgeForOperation.Milliseconds() {
if ok {
if forgot {
// snapshot is forgotten; this operation is eligible for gc
forgetIDs = append(forgetIDs, op.Id)
deletedByForgottenSnapshot++
deletedByType[reflect.TypeOf(op.Op).String()]++
}
} else if forgot {
// snapshot is forgotten; this operation is eligible for gc
forgetIDs = append(forgetIDs, op.Id)
return nil
}
key := groupByKey{
Repo: op.RepoId,
Plan: op.PlanId,
Type: reflect.TypeOf(op.Op),
}
st, ok := stats[key]
if !ok {
gcSettings, ok := gcSettings[reflect.TypeOf(op.Op)]
if !ok {
st = defaultGcSettings
} else {
st = gcSettings
}
}
st.keepMax-- // decrement the max retention, when this < 0 operation must be gc'd
st.keepMin-- // decrement the min retention, when this < 0 we can start gc'ing
stats[key] = st // update the stats
if st.keepMin >= 0 {
// can't delete if within min retention period
return nil
}
if st.keepMax < 0 {
// max retention reached; this operation must be gc'd.
forgetIDs = append(forgetIDs, op.Id)
deletedByMaxCount++
deletedByType[key.Type.String()]++
} else if curTime-op.UnixTimeStartMs > st.maxAge.Milliseconds() {
// operation is old enough to be gc'd
forgetIDs = append(forgetIDs, op.Id)
deletedByMaxAge++
deletedByType[key.Type.String()]++
}
return nil
}); err != nil {
return fmt.Errorf("identifying gc eligible operations: %w", err)
@@ -109,14 +176,13 @@ func (t *CollectGarbageTask) gcOperations(log *oplog.OpLog) error {
if err := log.Delete(forgetIDs...); err != nil {
return fmt.Errorf("removing gc eligible operations: %w", err)
} else if len(forgetIDs) > 0 {
for _, id := range forgetIDs {
delete(validIDs, id)
}
}
for _, id := range forgetIDs { // update validIDs with respect to the just deleted operations
delete(validIDs, id)
}
zap.L().Info("collecting garbage operations",
zap.Any("operations_removed", len(forgetIDs)))
zap.Int("operations_removed", len(forgetIDs)), zap.Int("removed_by_age", deletedByMaxAge), zap.Int("removed_by_limit", deletedByMaxCount), zap.Int("removed_by_snapshot_forgotten", deletedByForgottenSnapshot), zap.Any("removed_by_type", deletedByType))
// cleaning up logstore
toDelete := []string{}