mirror of
https://github.com/garethgeorge/backrest.git
synced 2025-12-16 02:25:37 +00:00
feat: improve support for instance ID tag
This commit is contained in:
@@ -1,16 +0,0 @@
|
||||
package stringutil
|
||||
|
||||
import "regexp"
|
||||
|
||||
var (
|
||||
sanitizeIDRegex = regexp.MustCompile(`[^a-zA-Z0-9_\-\.]+`) // matches invalid characters in an ID
|
||||
idRegex = regexp.MustCompile(`[a-zA-Z0-9_\-\.]*`) // matches a valid ID (including empty string)
|
||||
)
|
||||
|
||||
func SanitizeID(id string) string {
|
||||
return sanitizeIDRegex.ReplaceAllString(id, "_")
|
||||
}
|
||||
|
||||
func ValidateID(id string) bool {
|
||||
return idRegex.MatchString(id)
|
||||
}
|
||||
@@ -7,7 +7,7 @@ import (
|
||||
"strings"
|
||||
|
||||
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
||||
"github.com/garethgeorge/backrest/internal/config/stringutil"
|
||||
"github.com/garethgeorge/backrest/internal/config/validationutil"
|
||||
"github.com/gitploy-io/cronexpr"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
)
|
||||
@@ -15,9 +15,8 @@ import (
|
||||
func ValidateConfig(c *v1.Config) error {
|
||||
var err error
|
||||
|
||||
c.Instance, err = validateID(c.Instance)
|
||||
if err != nil {
|
||||
err = multierror.Append(err, fmt.Errorf("instance ID: %w", err))
|
||||
if e := validationutil.ValidateID(c.Instance, validationutil.IDMaxLen); e != nil {
|
||||
err = multierror.Append(err, fmt.Errorf("instance ID %q invalid: %w", c.Instance, e))
|
||||
}
|
||||
|
||||
repos := make(map[string]*v1.Repo)
|
||||
@@ -63,9 +62,8 @@ func ValidateConfig(c *v1.Config) error {
|
||||
|
||||
func validateRepo(repo *v1.Repo) error {
|
||||
var err error
|
||||
|
||||
if repo.Id == "" || !stringutil.ValidateID(repo.Id) {
|
||||
err = multierror.Append(err, fmt.Errorf("id %q contains invalid characters (or empty)", repo.Id))
|
||||
if e := validationutil.ValidateID(repo.Id, 0); e != nil {
|
||||
err = multierror.Append(err, fmt.Errorf("id %q invalid: %w", repo.Id, e))
|
||||
}
|
||||
|
||||
if repo.Uri == "" {
|
||||
@@ -85,12 +83,8 @@ func validateRepo(repo *v1.Repo) error {
|
||||
|
||||
func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {
|
||||
var err error
|
||||
if plan.Paths == nil || len(plan.Paths) == 0 {
|
||||
err = multierror.Append(err, fmt.Errorf("path is required"))
|
||||
}
|
||||
|
||||
if plan.Id == "" || !stringutil.ValidateID(plan.Id) {
|
||||
err = multierror.Append(err, fmt.Errorf("id %q contains invalid characters (or empty)", plan.Id))
|
||||
if e := validationutil.ValidateID(plan.Id, 0); e != nil {
|
||||
err = multierror.Append(err, fmt.Errorf("id %q invalid: %w", plan.Id, e))
|
||||
}
|
||||
|
||||
for idx, p := range plan.Paths {
|
||||
@@ -121,10 +115,3 @@ func validatePlan(plan *v1.Plan, repos map[string]*v1.Repo) error {
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
func validateID(id string) (string, error) {
|
||||
if len(id) > 32 {
|
||||
return "", fmt.Errorf("id %q is too long", id)
|
||||
}
|
||||
return stringutil.SanitizeID(id), nil
|
||||
}
|
||||
|
||||
33
internal/config/validationutil/validationutil.go
Normal file
33
internal/config/validationutil/validationutil.go
Normal file
@@ -0,0 +1,33 @@
|
||||
package validationutil
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"regexp"
|
||||
)
|
||||
|
||||
var (
|
||||
IDMaxLen = 50 // maximum length of an ID
|
||||
sanitizeIDRegex = regexp.MustCompile(`[^a-zA-Z0-9_\-\.]+`) // matches invalid characters in an ID
|
||||
idRegex = regexp.MustCompile(`[a-zA-Z0-9_\-\.]*`) // matches a valid ID (including empty string)
|
||||
)
|
||||
|
||||
func SanitizeID(id string) string {
|
||||
return sanitizeIDRegex.ReplaceAllString(id, "_")
|
||||
}
|
||||
|
||||
// ValidateID checks if an ID is valid.
|
||||
// It returns an error if the ID contains invalid characters, is empty, or is too long.
|
||||
// The maxLen parameter is the maximum length of the ID. If maxLen is 0, the ID length is not checked.
|
||||
func ValidateID(id string, maxLen int) error {
|
||||
if !idRegex.MatchString(id) {
|
||||
return errors.New("contains invalid characters")
|
||||
}
|
||||
if len(id) == 0 {
|
||||
return errors.New("empty")
|
||||
}
|
||||
if maxLen > 0 && len(id) > maxLen {
|
||||
return fmt.Errorf("too long (> %d chars)", maxLen)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
@@ -1,6 +1,8 @@
|
||||
package stringutil
|
||||
package validationutil
|
||||
|
||||
import "testing"
|
||||
import (
|
||||
"testing"
|
||||
)
|
||||
|
||||
func TestSanitizeID(t *testing.T) {
|
||||
tcs := []struct {
|
||||
@@ -31,6 +31,10 @@ type RepoOrchestrator struct {
|
||||
|
||||
// NewRepoOrchestrator accepts a config and a repo that is configured with the properties of that config object.
|
||||
func NewRepoOrchestrator(config *v1.Config, repoConfig *v1.Repo, resticPath string) (*RepoOrchestrator, error) {
|
||||
if config.Instance == "" {
|
||||
return nil, errors.New("instance is a required field in the backrest config")
|
||||
}
|
||||
|
||||
var opts []restic.GenericOption
|
||||
opts = append(opts, restic.WithEnviron())
|
||||
opts = append(opts, restic.WithEnv("RESTIC_PROGRESS_FPS=0.5"))
|
||||
@@ -93,7 +97,7 @@ func (r *RepoOrchestrator) SnapshotsForPlan(ctx context.Context, plan *v1.Plan)
|
||||
ctx, flush := forwardResticLogs(ctx)
|
||||
defer flush()
|
||||
|
||||
snapshots, err := r.repo.Snapshots(ctx, restic.WithFlags("--tag", tagForPlan(plan), "--tag", r.config.Instance))
|
||||
snapshots, err := r.repo.Snapshots(ctx, restic.WithFlags("--tag", TagForPlan(plan.Id), "--tag", TagForInstance(r.config.Instance)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("get snapshots for plan %q: %w", plan.Id, err)
|
||||
}
|
||||
@@ -119,19 +123,17 @@ func (r *RepoOrchestrator) Backup(ctx context.Context, plan *v1.Plan, progressCa
|
||||
return nil, fmt.Errorf("failed to get snapshots for plan: %w", err)
|
||||
}
|
||||
|
||||
r.l.Debug("got snapshots for plan", zap.String("repo", r.repoConfig.Id), zap.Int("count", len(snapshots)), zap.String("plan", plan.Id), zap.String("tag", tagForPlan(plan)))
|
||||
r.l.Debug("got snapshots for plan", zap.String("repo", r.repoConfig.Id), zap.Int("count", len(snapshots)), zap.String("plan", plan.Id), zap.String("tag", TagForPlan(plan.Id)))
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
var opts []restic.GenericOption
|
||||
opts = append(opts, restic.WithFlags("--exclude-caches"))
|
||||
opts = append(opts, restic.WithFlags("--tag", tagForPlan(plan)))
|
||||
if r.config.Instance != "" {
|
||||
opts = append(opts, restic.WithFlags("--host", r.config.Instance))
|
||||
opts = append(opts, restic.WithFlags("--tag", tagForInstance(r.config.Instance)))
|
||||
} else {
|
||||
return nil, errors.New("host is a required field in the backrest config")
|
||||
}
|
||||
opts = append(opts, restic.WithFlags(
|
||||
"--exclude-caches",
|
||||
"--tag", TagForPlan(plan.Id),
|
||||
"--tag", TagForInstance(r.config.Instance),
|
||||
"--host", r.config.Instance),
|
||||
)
|
||||
|
||||
for _, exclude := range plan.Excludes {
|
||||
opts = append(opts, restic.WithFlags("--exclude", exclude))
|
||||
@@ -180,7 +182,7 @@ func (r *RepoOrchestrator) ListSnapshotFiles(ctx context.Context, snapshotId str
|
||||
return lsEnts, nil
|
||||
}
|
||||
|
||||
func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan) ([]*v1.ResticSnapshot, error) {
|
||||
func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan, tags []string) ([]*v1.ResticSnapshot, error) {
|
||||
r.mu.Lock()
|
||||
defer r.mu.Unlock()
|
||||
ctx, flush := forwardResticLogs(ctx)
|
||||
@@ -191,9 +193,13 @@ func (r *RepoOrchestrator) Forget(ctx context.Context, plan *v1.Plan) ([]*v1.Res
|
||||
return nil, fmt.Errorf("plan %q has no retention policy", plan.Id)
|
||||
}
|
||||
|
||||
if r.config.Instance == "" {
|
||||
return nil, errors.New("instance is a required field in the backrest config")
|
||||
}
|
||||
|
||||
result, err := r.repo.Forget(
|
||||
ctx, protoutil.RetentionPolicyFromProto(plan.Retention),
|
||||
restic.WithFlags("--tag", tagForPlan(plan)+","+tagForInstance(r.config.Instance)),
|
||||
restic.WithFlags("--tag", strings.Join(tags, ",")),
|
||||
restic.WithFlags("--group-by", ""),
|
||||
)
|
||||
if err != nil {
|
||||
@@ -339,14 +345,6 @@ func (r *RepoOrchestrator) Config() *v1.Repo {
|
||||
return r.repoConfig
|
||||
}
|
||||
|
||||
func tagForPlan(plan *v1.Plan) string {
|
||||
return fmt.Sprintf("plan:%s", plan.Id)
|
||||
}
|
||||
|
||||
func tagForInstance(host string) string {
|
||||
return fmt.Sprintf("created-by:%s", host)
|
||||
}
|
||||
|
||||
func sortSnapshotsByTime(snapshots []*restic.Snapshot) {
|
||||
sort.SliceStable(snapshots, func(i, j int) bool {
|
||||
return snapshots[i].UnixTimeMs() < snapshots[j].UnixTimeMs()
|
||||
|
||||
36
internal/orchestrator/repo/tags.go
Normal file
36
internal/orchestrator/repo/tags.go
Normal file
@@ -0,0 +1,36 @@
|
||||
package repo
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
)
|
||||
|
||||
// TagForPlan returns a tag for the plan.
|
||||
func TagForPlan(planId string) string {
|
||||
return fmt.Sprintf("plan:%s", planId)
|
||||
}
|
||||
|
||||
// TagForInstance returns a tag for the instance.
|
||||
func TagForInstance(instanceId string) string {
|
||||
return fmt.Sprintf("created-by:%s", instanceId)
|
||||
}
|
||||
|
||||
// InstanceIDFromTags returns the instance ID from the tags, or an empty string if not found.
|
||||
func InstanceIDFromTags(tags []string) string {
|
||||
for _, tag := range tags {
|
||||
if strings.HasPrefix(tag, "created-by:") {
|
||||
return tag[len("created-by:"):]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// PlanFromTags returns the plan ID from the tags, or an empty string if not found.
|
||||
func PlanFromTags(tags []string) string {
|
||||
for _, tag := range tags {
|
||||
if strings.HasPrefix(tag, "plan:") {
|
||||
return tag[len("plan:"):]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
}
|
||||
@@ -7,7 +7,9 @@ import (
|
||||
|
||||
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
||||
"github.com/garethgeorge/backrest/internal/hook"
|
||||
"github.com/garethgeorge/backrest/internal/oplog"
|
||||
"github.com/garethgeorge/backrest/internal/oplog/indexutil"
|
||||
"github.com/garethgeorge/backrest/internal/orchestrator/repo"
|
||||
"github.com/hashicorp/go-multierror"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
@@ -49,13 +51,14 @@ func NewOneoffForgetTask(repoID, planID string, flowID int64, at time.Time) Task
|
||||
|
||||
func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner) error {
|
||||
t := st.Task
|
||||
oplog := taskRunner.OpLog()
|
||||
|
||||
repo, err := taskRunner.GetRepoOrchestrator(t.RepoID())
|
||||
r, err := taskRunner.GetRepoOrchestrator(t.RepoID())
|
||||
if err != nil {
|
||||
return fmt.Errorf("get repo %q: %w", t.RepoID(), err)
|
||||
}
|
||||
|
||||
err = repo.UnlockIfAutoEnabled(ctx)
|
||||
err = r.UnlockIfAutoEnabled(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("auto unlock repo %q: %w", t.RepoID(), err)
|
||||
}
|
||||
@@ -65,7 +68,19 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
|
||||
return fmt.Errorf("get plan %q: %w", t.PlanID(), err)
|
||||
}
|
||||
|
||||
forgot, err := repo.Forget(ctx, plan)
|
||||
tags := []string{repo.TagForPlan(t.PlanID())}
|
||||
if compat, err := useLegacyCompatMode(oplog, t.PlanID()); err != nil {
|
||||
return fmt.Errorf("check legacy compat mode: %w", err)
|
||||
} else if !compat {
|
||||
tags = append(tags, repo.TagForInstance(taskRunner.Config().Instance))
|
||||
} else {
|
||||
zap.L().Warn("forgetting snapshots without instance ID, using legacy behavior (e.g. --tags not including instance ID)")
|
||||
zap.S().Warnf("to avoid this warning, tag all snapshots with the instance ID e.g. by running: \r\n"+
|
||||
"restic tag --set '%s' --set '%s' --tag '%s'", repo.TagForPlan(t.PlanID()), repo.TagForInstance(taskRunner.Config().Instance), repo.TagForPlan(t.PlanID()))
|
||||
}
|
||||
|
||||
// check if any other instance IDs exist in the repo (unassociated don't count)
|
||||
forgot, err := r.Forget(ctx, plan, tags)
|
||||
if err != nil {
|
||||
return fmt.Errorf("forget: %w", err)
|
||||
}
|
||||
@@ -108,3 +123,28 @@ func forgetHelper(ctx context.Context, st ScheduledTask, taskRunner TaskRunner)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// useLegacyCompatMode checks if there are any snapshots that were created without a `created-by` tag still exist in the repo.
|
||||
// The property is overridden if mixed `created-by` tag values are found.
|
||||
func useLegacyCompatMode(oplog *oplog.OpLog, planID string) (bool, error) {
|
||||
instanceIDs := make(map[string]struct{})
|
||||
if err := oplog.ForEachByPlan(planID, indexutil.CollectAll(), func(op *v1.Operation) error {
|
||||
if snapshotOp, ok := op.Op.(*v1.Operation_OperationIndexSnapshot); ok {
|
||||
tags := snapshotOp.OperationIndexSnapshot.GetSnapshot().GetTags()
|
||||
instanceIDs[repo.InstanceIDFromTags(tags)] = struct{}{}
|
||||
}
|
||||
return nil
|
||||
}); err != nil {
|
||||
return false, err
|
||||
}
|
||||
if _, ok := instanceIDs[""]; !ok {
|
||||
return false, nil
|
||||
}
|
||||
delete(instanceIDs, "")
|
||||
if len(instanceIDs) > 1 {
|
||||
zap.L().Warn("found mixed instance IDs in indexed snapshots, forcing forget to use new behavior (e.g. --tags including instance ID) despite the presence of legacy (e.g. untagged) snapshots.")
|
||||
return false, nil
|
||||
}
|
||||
zap.L().Warn("found legacy snapshots without instance ID, forget will use legacy behavior e.g. --tags not including instance ID")
|
||||
return true, nil
|
||||
}
|
||||
|
||||
@@ -15,7 +15,6 @@ import (
|
||||
"github.com/garethgeorge/backrest/internal/protoutil"
|
||||
"github.com/garethgeorge/backrest/pkg/restic"
|
||||
"go.uber.org/zap"
|
||||
"golang.org/x/exp/maps"
|
||||
)
|
||||
|
||||
func NewOneoffIndexSnapshotsTask(repoID string, at time.Time) Task {
|
||||
@@ -71,20 +70,20 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
|
||||
}
|
||||
|
||||
// check if any migrations are required
|
||||
if migrated, err := tryMigrate(ctx, repo, config, snapshots); err != nil {
|
||||
return fmt.Errorf("migrate snapshots for repo %q: %w", t.RepoID(), err)
|
||||
} else if migrated {
|
||||
// Delete snapshot operations
|
||||
if err := oplog.Delete(maps.Values(currentIds)...); err != nil {
|
||||
return fmt.Errorf("delete prior indexed operations: %w", err)
|
||||
}
|
||||
// if migrated, err := tryMigrate(ctx, repo, config, snapshots); err != nil {
|
||||
// return fmt.Errorf("migrate snapshots for repo %q: %w", t.RepoID(), err)
|
||||
// } else if migrated {
|
||||
// // Delete snapshot operations
|
||||
// if err := oplog.Delete(maps.Values(currentIds)...); err != nil {
|
||||
// return fmt.Errorf("delete prior indexed operations: %w", err)
|
||||
// }
|
||||
|
||||
snapshots, err = repo.Snapshots(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("get snapshots for repo %q: %w", t.RepoID(), err)
|
||||
}
|
||||
currentIds = nil
|
||||
}
|
||||
// snapshots, err = repo.Snapshots(ctx)
|
||||
// if err != nil {
|
||||
// return fmt.Errorf("get snapshots for repo %q: %w", t.RepoID(), err)
|
||||
// }
|
||||
// currentIds = nil
|
||||
// }
|
||||
|
||||
foundIds := make(map[string]struct{})
|
||||
|
||||
@@ -103,7 +102,7 @@ func indexSnapshotsHelper(ctx context.Context, st ScheduledTask, taskRunner Task
|
||||
return fmt.Errorf("get flow ID for snapshot %q: %w", snapshot.Id, err)
|
||||
}
|
||||
planId := planForSnapshot(snapshotProto)
|
||||
instanceID := hostForSnapshot(snapshotProto)
|
||||
instanceID := instanceIDForSnapshot(snapshotProto)
|
||||
indexOps = append(indexOps, &v1.Operation{
|
||||
RepoId: t.RepoID(),
|
||||
PlanId: planId,
|
||||
@@ -185,21 +184,15 @@ func indexCurrentSnapshotIdsForRepo(log *oplog.OpLog, repoId string) (map[string
|
||||
}
|
||||
|
||||
func planForSnapshot(snapshot *v1.ResticSnapshot) string {
|
||||
for _, tag := range snapshot.Tags {
|
||||
if strings.HasPrefix(tag, "plan:") {
|
||||
return tag[len("plan:"):]
|
||||
}
|
||||
p := repo.PlanFromTags(snapshot.Tags)
|
||||
if p != "" {
|
||||
return p
|
||||
}
|
||||
return PlanForUnassociatedOperations
|
||||
}
|
||||
|
||||
func hostForSnapshot(snapshot *v1.ResticSnapshot) string {
|
||||
for _, tag := range snapshot.Tags {
|
||||
if strings.HasPrefix(tag, "created-by:") {
|
||||
return tag[len("created-by:"):]
|
||||
}
|
||||
}
|
||||
return ""
|
||||
func instanceIDForSnapshot(snapshot *v1.ResticSnapshot) string {
|
||||
return repo.InstanceIDFromTags(snapshot.Tags)
|
||||
}
|
||||
|
||||
// tryMigrate checks if the snapshots use the latest backrest tag set and migrates them if necessary.
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
|
||||
v1 "github.com/garethgeorge/backrest/gen/go/v1"
|
||||
"github.com/garethgeorge/backrest/pkg/restic"
|
||||
"go.uber.org/zap"
|
||||
)
|
||||
|
||||
// ValidateOperation verifies critical properties of the operation proto.
|
||||
@@ -23,7 +24,7 @@ func ValidateOperation(op *v1.Operation) error {
|
||||
return errors.New("operation.plan_id is required")
|
||||
}
|
||||
if op.InstanceId == "" {
|
||||
return errors.New("operation.instance_id is required")
|
||||
zap.L().Warn("operation.instance_id should typically be set")
|
||||
}
|
||||
if op.SnapshotId != "" {
|
||||
if err := restic.ValidateSnapshotId(op.SnapshotId); err != nil {
|
||||
|
||||
Reference in New Issue
Block a user