feat: ensure instance ID is set for all operations

This commit is contained in:
garethgeorge
2024-05-03 00:46:49 -07:00
committed by Gareth
parent f0ee20f53d
commit 65d4a1df0e
5 changed files with 39 additions and 13 deletions

View File

@@ -22,12 +22,14 @@ var (
)
type HookExecutor struct {
config *v1.Config
oplog *oplog.OpLog
logStore *rotatinglog.RotatingLog
}
func NewHookExecutor(oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog) *HookExecutor {
func NewHookExecutor(config *v1.Config, oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog) *HookExecutor {
return &HookExecutor{
config: config,
oplog: oplog,
logStore: bigOutputStore,
}
@@ -37,10 +39,11 @@ func NewHookExecutor(oplog *oplog.OpLog, bigOutputStore *rotatinglog.RotatingLog
// Hooks are pulled both from the provided plan and from the repo config.
func (e *HookExecutor) ExecuteHooks(flowID int64, repo *v1.Repo, plan *v1.Plan, events []v1.Hook_Condition, vars HookVars) error {
operationBase := v1.Operation{
Status: v1.OperationStatus_STATUS_INPROGRESS,
PlanId: plan.GetId(),
RepoId: repo.GetId(),
FlowId: flowID,
Status: v1.OperationStatus_STATUS_INPROGRESS,
PlanId: plan.GetId(),
RepoId: repo.GetId(),
InstanceId: e.config.Instance,
FlowId: flowID,
}
vars.Repo = repo

View File

@@ -13,6 +13,7 @@ import (
var migrations = []func(*OpLog, *bbolt.Tx) error{
migration001FlowID,
migration002InstanceID,
}
var CurrentVersion = int64(len(migrations))
@@ -108,3 +109,14 @@ func migration001FlowID(oplog *OpLog, tx *bbolt.Tx) error {
return nil
})
}
func migration002InstanceID(oplog *OpLog, tx *bbolt.Tx) error {
return transformOperations(oplog, tx, func(op *v1.Operation) error {
if op.InstanceId != "" {
return nil
}
op.InstanceId = "__unassociated__"
return nil
})
}

View File

@@ -66,10 +66,9 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
OpLog: oplog,
config: cfg,
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
repoPool: newResticRepoPool(resticBin, cfg),
taskQueue: queue.NewTimePriorityQueue[stContainer](),
hookExecutor: hook.NewHookExecutor(oplog, logStore),
logStore: logStore,
repoPool: newResticRepoPool(resticBin, cfg),
taskQueue: queue.NewTimePriorityQueue[stContainer](),
logStore: logStore,
}
// verify the operation log and mark any incomplete operations as failed.
@@ -383,6 +382,7 @@ func (o *Orchestrator) scheduleTaskHelper(t tasks.Task, priority int, curTime ti
}
if stc.Op != nil {
stc.Op.InstanceId = o.config.Instance
stc.Op.PlanId = t.PlanID()
stc.Op.RepoId = t.RepoID()
stc.Op.Status = v1.OperationStatus_STATUS_PENDING

View File

@@ -13,8 +13,9 @@ type taskRunnerImpl struct {
orchestrator *Orchestrator
t tasks.Task
op *v1.Operation
repo *v1.Repo // cache, populated on first call to Repo()
plan *v1.Plan // cache, populated on first call to Plan()
repo *v1.Repo // cache, populated on first call to Repo()
plan *v1.Plan // cache, populated on first call to Plan()
config *v1.Config // cache, populated on first call to Config()
}
var _ tasks.TaskRunner = &taskRunnerImpl{}
@@ -46,10 +47,12 @@ func newTaskRunnerImpl(orchestrator *Orchestrator, task tasks.Task, op *v1.Opera
}
func (t *taskRunnerImpl) CreateOperation(op *v1.Operation) error {
op.InstanceId = t.orchestrator.config.Instance
return t.orchestrator.OpLog.Add(op)
}
func (t *taskRunnerImpl) UpdateOperation(op *v1.Operation) error {
op.InstanceId = t.orchestrator.config.Instance
return t.orchestrator.OpLog.Update(op)
}
@@ -84,7 +87,8 @@ func (t *taskRunnerImpl) ExecuteHooks(events []v1.Hook_Condition, vars hook.Hook
if t.op != nil {
flowID = t.op.FlowId
}
return t.orchestrator.hookExecutor.ExecuteHooks(flowID, repo, plan, events, vars)
executor := hook.NewHookExecutor(t.Config(), t.orchestrator.OpLog, t.orchestrator.logStore)
return executor.ExecuteHooks(flowID, repo, plan, events, vars)
}
func (t *taskRunnerImpl) GetRepo(repoID string) (*v1.Repo, error) {
@@ -104,5 +108,9 @@ func (t *taskRunnerImpl) ScheduleTask(task tasks.Task, priority int) error {
}
func (t *taskRunnerImpl) Config() *v1.Config {
return t.orchestrator.Config()
if t.config != nil {
return t.config
}
t.config = t.orchestrator.Config()
return t.config
}

View File

@@ -22,6 +22,9 @@ func ValidateOperation(op *v1.Operation) error {
if op.PlanId == "" {
return errors.New("operation.plan_id is required")
}
if op.InstanceId == "" {
return errors.New("operation.instance_id is required")
}
if op.SnapshotId != "" {
if err := restic.ValidateSnapshotId(op.SnapshotId); err != nil {
return fmt.Errorf("operation.snapshot_id is invalid: %w", err)