Files
backrest/internal/orchestrator/orchestrator.go
2023-11-22 17:59:33 -08:00

263 lines
6.6 KiB
Go

package orchestrator
import (
"context"
"errors"
"fmt"
"sync"
"time"
v1 "github.com/garethgeorge/resticui/gen/go/v1"
"github.com/garethgeorge/resticui/internal/config"
"github.com/garethgeorge/resticui/internal/database/oplog"
"github.com/garethgeorge/resticui/pkg/restic"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
)
var ErrRepoNotFound = errors.New("repo not found")
var ErrRepoInitializationFailed = errors.New("repo initialization failed")
var ErrPlanNotFound = errors.New("plan not found")
// Orchestrator is responsible for managing repos and backups.
type Orchestrator struct {
mu sync.Mutex
config *v1.Config
oplog *oplog.OpLog
repoPool *resticRepoPool
configUpdates chan *v1.Config // configUpdates chan makes config changes available to Run()
externTasks chan Task // externTasks is a channel that externally added tasks can be added to, they will be consumed by Run()
}
func NewOrchestrator(configProvider config.ConfigStore, oplog *oplog.OpLog) (*Orchestrator, error) {
cfg, err := configProvider.Get()
if err != nil {
return nil, fmt.Errorf("failed to get config: %w", err)
}
return &Orchestrator{
config: cfg,
oplog: oplog,
repoPool: newResticRepoPool(&config.MemoryStore{Config: cfg}),
externTasks: make(chan Task, 2),
}, nil
}
func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
o.mu.Lock()
defer o.mu.Unlock()
o.config = cfg
zap.L().Debug("Applying config to orchestrator", zap.Any("config", cfg))
// Update the config provided to the repo pool.
if err := o.repoPool.configProvider.Update(cfg); err != nil {
return fmt.Errorf("failed to update repo pool config: %w", err)
}
if o.configUpdates != nil {
// orchestrator loop is running, notify it of the config change.
o.configUpdates <- cfg
}
return nil
}
func (o *Orchestrator) GetRepo(repoId string) (repo *RepoOrchestrator, err error) {
o.mu.Lock()
defer o.mu.Unlock()
r, err := o.repoPool.GetRepo(repoId)
if err != nil {
return nil, fmt.Errorf("failed to get repo %q: %w", repoId, err)
}
return r, nil
}
func (o *Orchestrator) GetPlan(planId string) (*v1.Plan, error) {
o.mu.Lock()
defer o.mu.Unlock()
if o.config.Plans == nil {
return nil, ErrPlanNotFound
}
for _, p := range o.config.Plans {
if p.Id == planId {
return p, nil
}
}
return nil, ErrPlanNotFound
}
// Run is the main orchestration loop. Cancel the context to stop the loop.
func (o *Orchestrator) Run(mainCtx context.Context) error {
zap.L().Info("starting orchestrator loop")
o.mu.Lock()
o.configUpdates = make(chan *v1.Config)
o.mu.Unlock()
for {
o.mu.Lock()
config := o.config
o.mu.Unlock()
if o.runVersion(mainCtx, config) {
zap.L().Info("restarting orchestrator loop")
} else {
zap.L().Info("exiting orchestrator loop, context cancelled.")
break
}
}
return nil
}
// runImmutable is a helper function for Run() that runs the orchestration loop with a single version of the config.
func (o *Orchestrator) runVersion(mainCtx context.Context, config *v1.Config) bool {
var lock sync.Mutex
ctx, cancel := context.WithCancel(mainCtx)
var wg sync.WaitGroup
var execTask func(t Task)
execTask = func(t Task) {
curTime := time.Now()
runAt := t.Next(curTime)
if runAt == nil {
zap.L().Debug("task has no next run, not scheduling.", zap.String("task", t.Name()))
return
}
timer := time.NewTimer(runAt.Sub(curTime))
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", runAt.Format(time.RFC3339)))
wg.Add(1)
go func() {
defer wg.Done()
select {
case <-ctx.Done():
if !timer.Stop() {
<-timer.C
}
zap.L().Debug("cancelled scheduled (but not running) task, orchestrator context is cancelled.", zap.String("task", t.Name()))
return
case <-timer.C:
lock.Lock()
defer lock.Unlock()
zap.L().Info("running task", zap.String("task", t.Name()))
// Task execution runs with mainCtx meaning config changes do not interrupt it, but cancelling the orchestration loop will.
if err := t.Run(mainCtx); err != nil {
zap.L().Error("task failed", zap.String("task", t.Name()), zap.Error(err))
} else {
zap.L().Debug("task finished", zap.String("task", t.Name()))
}
if ctx.Err() != nil {
zap.L().Debug("not attempting to reschedule task, orchestrator context is cancelled.", zap.String("task", t.Name()))
return
}
execTask(t)
}
}()
}
// Schedule all backup tasks.
for _, plan := range config.Plans {
t, err := NewScheduledBackupTask(o, plan)
if err != nil {
zap.L().Error("failed to create backup task for plan", zap.String("plan", plan.Id), zap.Error(err))
}
execTask(t)
}
// wait for either an error or the context to be cancelled, then wait for all tasks.
for {
select {
case t := <-o.externTasks:
execTask(t)
case <-mainCtx.Done():
zap.L().Info("orchestrator context cancelled, shutting down orchestrator")
cancel()
wg.Wait()
return false
case <-o.configUpdates:
zap.L().Info("orchestrator received config change, waiting for in-progress operations then restarting")
cancel()
wg.Wait()
return true
}
}
}
func (o *Orchestrator) EnqueueTask(t Task) {
o.externTasks <- t
}
// resticRepoPool caches restic repos.
type resticRepoPool struct {
mu sync.Mutex
repos map[string]*RepoOrchestrator
configProvider config.ConfigStore
}
func newResticRepoPool(configProvider config.ConfigStore) *resticRepoPool {
return &resticRepoPool{
repos: make(map[string]*RepoOrchestrator),
configProvider: configProvider,
}
}
func (rp *resticRepoPool) GetRepo(repoId string) (repo *RepoOrchestrator, err error) {
cfg, err := rp.configProvider.Get()
if err != nil {
return nil, fmt.Errorf("failed to get config: %w", err)
}
rp.mu.Lock()
defer rp.mu.Unlock()
if cfg.Repos == nil {
return nil, ErrRepoNotFound
}
var repoProto *v1.Repo
for _, r := range cfg.Repos {
if r.GetId() == repoId {
repoProto = r
}
}
if repoProto == nil {
return nil, ErrRepoNotFound
}
// Check if we already have a repo for this id, if we do return it.
repo, ok := rp.repos[repoId]
if ok && proto.Equal(repo.repoConfig, repoProto) {
return repo, nil
}
delete(rp.repos, repoId);
var opts []restic.GenericOption
opts = append(opts, restic.WithPropagatedEnvVars(restic.EnvToPropagate...))
if len(repoProto.GetEnv()) > 0 {
opts = append(opts, restic.WithEnv(repoProto.GetEnv()...))
}
if len(repoProto.GetFlags()) > 0 {
opts = append(opts, restic.WithFlags(repoProto.GetFlags()...))
}
// Otherwise create a new repo.
repo = newRepoOrchestrator(repoProto, restic.NewRepo(repoProto, opts...))
rp.repos[repoId] = repo
return repo, nil
}