feat: use new task queue implementation in orchestrator

This commit is contained in:
garethgeorge
2024-04-08 00:16:28 -07:00
parent 8b9280ed57
commit 1d0489847e
6 changed files with 53 additions and 438 deletions
+26 -35
View File
@@ -13,6 +13,7 @@ import (
"github.com/garethgeorge/backrest/internal/config"
"github.com/garethgeorge/backrest/internal/hook"
"github.com/garethgeorge/backrest/internal/oplog"
"github.com/garethgeorge/backrest/internal/queue"
"github.com/garethgeorge/backrest/internal/rotatinglog"
"go.uber.org/zap"
"google.golang.org/protobuf/proto"
@@ -40,14 +41,10 @@ type Orchestrator struct {
config *v1.Config
OpLog *oplog.OpLog
repoPool *resticRepoPool
taskQueue taskQueue
taskQueue *queue.TimePriorityQueue[scheduledTask]
hookExecutor *hook.HookExecutor
logStore *rotatinglog.RotatingLog
// now for the purpose of testing; used by Run() to get the current time.
now func() time.Time
runningTask atomic.Pointer[taskExecutionInfo]
runningTask atomic.Pointer[taskExecutionInfo]
}
func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) {
@@ -59,10 +56,8 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
OpLog: oplog,
config: cfg,
// repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value.
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: newTaskQueue(func() time.Time {
return o.curTime()
}),
repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}),
taskQueue: queue.NewTimePriorityQueue[scheduledTask](),
hookExecutor: hook.NewHookExecutor(oplog, logStore),
logStore: logStore,
}
@@ -104,13 +99,6 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt
return o, nil
}
func (o *Orchestrator) curTime() time.Time {
if o.now != nil {
return o.now()
}
return time.Now()
}
func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
o.mu.Lock()
defer o.mu.Unlock()
@@ -121,12 +109,13 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error {
return fmt.Errorf("failed to update repo pool config: %w", err)
}
return o.ScheduleDefaultTasks(cfg)
return o.scheduleDefaultTasks(cfg)
}
// rescheduleTasksIfNeeded checks if any tasks need to be rescheduled based on config changes.
func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error {
func (o *Orchestrator) scheduleDefaultTasks(config *v1.Config) error {
zap.L().Info("scheduling default tasks, waiting for task queue reset.")
removedTasks := o.taskQueue.Reset()
for _, t := range removedTasks {
if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil {
@@ -195,27 +184,21 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta
}
tasks := o.taskQueue.Reset()
remaining := make([]scheduledTask, 0, len(tasks))
for _, t := range tasks {
if t.task.OperationId() == operationId {
if err := t.task.Cancel(status); err != nil {
return fmt.Errorf("cancel task %q: %w", t.task.Name(), err)
}
// check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run.
if nextTime := t.task.Next(t.runAt); nextTime != nil {
remaining = append(remaining, scheduledTask{
task: t.task,
runAt: *nextTime,
})
nextTime := t.task.Next(t.runAt)
if nextTime == nil {
continue
}
} else {
remaining = append(remaining, *t)
}
}
o.taskQueue.Push(remaining...)
t.runAt = *nextTime
}
o.taskQueue.Enqueue(t.runAt, t.priority, t) // requeue the task.
}
return nil
}
@@ -231,7 +214,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}
t := o.taskQueue.Dequeue(mainCtx)
if t == nil {
if t.task == nil {
continue
}
@@ -267,12 +250,12 @@ func (o *Orchestrator) Run(mainCtx context.Context) {
}
func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) {
nextRun := t.Next(o.curTime())
nextRun := t.Next(time.Now())
if nextRun == nil {
return
}
zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339)))
o.taskQueue.Push(scheduledTask{
o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{
task: t,
runAt: *nextRun,
priority: priority,
@@ -341,3 +324,11 @@ type taskExecutionInfo struct {
operationId int64
cancel func()
}
type scheduledTask struct {
task Task
runAt time.Time
priority int
callbacks []func(error)
config *v1.Config
}
+3 -10
View File
@@ -149,19 +149,14 @@ func TestSchedulerWait(t *testing.T) {
t.Parallel()
// Arrange
curTime := time.Now()
orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil)
if err != nil {
t.Fatalf("failed to create orchestrator: %v", err)
}
orch.now = func() time.Time {
return curTime
}
ran := make(chan struct{})
orch.ScheduleTask(&testTask{
onNext: func(t time.Time) *time.Time {
t = t.Add(5 * time.Millisecond)
t = t.Add(150 * time.Millisecond)
return &t
},
onRun: func() error {
@@ -180,12 +175,10 @@ func TestSchedulerWait(t *testing.T) {
t.Errorf("expected task to not run yet")
}
curTime = time.Now()
// Schedule another task just to trigger a queue refresh
orch.ScheduleTask(&testTask{
onNext: func(t time.Time) *time.Time {
t = t.Add(5 * time.Millisecond)
t = t.Add(1000 * time.Second)
return &t
},
onRun: func() error {
@@ -195,7 +188,7 @@ func TestSchedulerWait(t *testing.T) {
}, TaskPriorityDefault)
select {
case <-time.NewTimer(1000 * time.Millisecond).C:
case <-time.NewTimer(200 * time.Millisecond).C:
t.Errorf("expected task to run")
case <-ran:
}
-220
View File
@@ -1,220 +0,0 @@
package orchestrator
import (
"container/heap"
"context"
"sync"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
)
var taskQueueDefaultPollInterval = 3 * time.Minute
type taskQueue struct {
dequeueMu sync.Mutex
mu sync.Mutex
heap scheduledTaskHeapByTime
notify chan struct{}
ready scheduledTaskHeapByPriorityThenTime
pollInterval time.Duration
Now func() time.Time
}
func newTaskQueue(now func() time.Time) taskQueue {
return taskQueue{
heap: scheduledTaskHeapByTime{},
ready: scheduledTaskHeapByPriorityThenTime{},
pollInterval: taskQueueDefaultPollInterval,
Now: now,
}
}
func (t *taskQueue) curTime() time.Time {
if t.Now != nil {
return t.Now()
}
return time.Now()
}
func (t *taskQueue) Push(tasks ...scheduledTask) {
t.mu.Lock()
defer t.mu.Unlock()
for _, task := range tasks {
task := task
if task.task == nil {
panic("task cannot be nil")
}
heap.Push(&t.heap, &task)
}
if t.notify != nil {
t.notify <- struct{}{}
}
}
func (t *taskQueue) Reset() []*scheduledTask {
t.mu.Lock()
defer t.mu.Unlock()
oldTasks := t.heap.tasks
oldTasks = append(oldTasks, t.ready.tasks...)
t.heap.tasks = nil
t.ready.tasks = nil
if t.notify != nil {
t.notify <- struct{}{}
}
return oldTasks
}
func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask {
t.dequeueMu.Lock()
defer t.dequeueMu.Unlock()
t.mu.Lock()
defer t.mu.Unlock()
t.notify = make(chan struct{}, 10)
defer func() {
close(t.notify)
t.notify = nil
}()
for {
first, ok := t.heap.Peek().(*scheduledTask)
if !ok { // no tasks in heap.
if t.ready.Len() > 0 {
return heap.Pop(&t.ready).(*scheduledTask)
}
t.mu.Unlock()
select {
case <-ctx.Done():
t.mu.Lock()
return nil
case <-t.notify:
}
t.mu.Lock()
continue
}
now := t.curTime()
// if there's a task in the ready queue AND the first task isn't ready yet then immediately return the ready task.
ready, ok := t.ready.Peek().(*scheduledTask)
if ok && now.Before(first.runAt) {
heap.Pop(&t.ready)
return ready
}
t.mu.Unlock()
d := first.runAt.Sub(now)
if t.pollInterval > 0 && d > t.pollInterval {
// A poll interval may be set to work around clock changes
// e.g. when a laptop wakes from sleep or the system clock is adjusted.
d = t.pollInterval
}
timer := time.NewTimer(d)
select {
case <-timer.C:
t.mu.Lock()
if t.heap.Len() == 0 {
break
}
for {
first, ok := t.heap.Peek().(*scheduledTask)
if !ok {
break
}
if first.runAt.After(t.curTime()) {
// task is not yet ready to run
break
}
heap.Pop(&t.heap) // remove the task from the heap
heap.Push(&t.ready, first)
}
if t.ready.Len() == 0 {
break
}
return heap.Pop(&t.ready).(*scheduledTask)
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
t.mu.Lock()
if !timer.Stop() {
<-timer.C
}
case <-ctx.Done():
t.mu.Lock()
if !timer.Stop() {
<-timer.C
}
return nil
}
}
}
type scheduledTask struct {
task Task
runAt time.Time
priority int
callbacks []func(error)
config *v1.Config
}
type scheduledTaskHeap struct {
tasks []*scheduledTask
comparator func(i, j *scheduledTask) bool
}
func (h *scheduledTaskHeap) Len() int {
return len(h.tasks)
}
func (h *scheduledTaskHeap) Swap(i, j int) {
h.tasks[i], h.tasks[j] = h.tasks[j], h.tasks[i]
}
func (h *scheduledTaskHeap) Push(x interface{}) {
h.tasks = append(h.tasks, x.(*scheduledTask))
}
func (h *scheduledTaskHeap) Pop() interface{} {
old := h.tasks
n := len(old)
x := old[n-1]
h.tasks = old[0 : n-1]
return x
}
func (h *scheduledTaskHeap) Peek() interface{} {
if len(h.tasks) == 0 {
return nil
}
return h.tasks[0]
}
type scheduledTaskHeapByTime struct {
scheduledTaskHeap
}
var _ heap.Interface = &scheduledTaskHeapByTime{}
func (h *scheduledTaskHeapByTime) Less(i, j int) bool {
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
}
type scheduledTaskHeapByPriorityThenTime struct {
scheduledTaskHeap
}
var _ heap.Interface = &scheduledTaskHeapByPriorityThenTime{}
func (h *scheduledTaskHeapByPriorityThenTime) Less(i, j int) bool {
if h.tasks[i].priority != h.tasks[j].priority {
return h.tasks[i].priority > h.tasks[j].priority
}
return h.tasks[i].runAt.Before(h.tasks[j].runAt)
}
@@ -1,173 +0,0 @@
package orchestrator
import (
"context"
"math/rand"
"reflect"
"runtime"
"sort"
"strconv"
"testing"
"time"
v1 "github.com/garethgeorge/backrest/gen/go/v1"
)
type heapTestTask struct {
name string
}
var _ Task = &heapTestTask{}
func (t *heapTestTask) Name() string {
return t.name
}
func (t *heapTestTask) Next(now time.Time) *time.Time {
return nil
}
func (t *heapTestTask) Run(ctx context.Context) error {
return nil
}
func (t *heapTestTask) Cancel(withStatus v1.OperationStatus) error {
return nil
}
func (t *heapTestTask) OperationId() int64 {
return 0
}
func TestTaskQueueOrdering(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("test is flaky on Windows")
}
h := taskQueue{}
h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}})
h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "2"}})
h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "3"}})
wantSeq := []string{"1", "2", "3"}
seq := []string{}
for i := 0; i < 3; i++ {
task := h.Dequeue(context.Background())
if task == nil || task.task == nil {
t.Fatal("expected task")
}
seq = append(seq, task.task.Name())
}
if !reflect.DeepEqual(seq, wantSeq) {
t.Errorf("got %v, want %v", seq, wantSeq)
}
}
func TestLiveTaskEnqueue(t *testing.T) {
h := taskQueue{}
go func() {
time.Sleep(1 * time.Millisecond)
h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}})
}()
t1 := h.Dequeue(context.Background())
if t1.task.Name() != "1" {
t.Errorf("got %s, want 1", t1.task.Name())
}
}
func TestTaskQueueReset(t *testing.T) {
h := taskQueue{}
h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}})
h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "2"}})
h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "3"}})
if h.Dequeue(context.Background()).task.Name() != "1" {
t.Fatal("expected 1")
}
h.Reset()
ctx, cancel := context.WithCancel(context.Background())
go func() {
time.Sleep(1 * time.Millisecond)
cancel()
}()
if h.Dequeue(ctx) != nil {
t.Fatal("expected nil task")
}
}
func TestTasksOrderedByPriority(t *testing.T) {
h := taskQueue{}
now := time.Now()
h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "4"}, priority: 1})
h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "3"}, priority: 2})
h.Push(scheduledTask{runAt: now.Add(10 * time.Millisecond), task: &heapTestTask{name: "5"}, priority: 3})
h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "2"}, priority: 3})
h.Push(scheduledTask{runAt: now.Add(-10 * time.Millisecond), task: &heapTestTask{name: "1"}, priority: 3})
wantSeq := []string{"1", "2", "3", "4", "5"}
seq := []string{}
for i := 0; i < 5; i++ {
task := h.Dequeue(context.Background())
if task == nil || task.task == nil {
t.Fatal("expected task")
}
seq = append(seq, task.task.Name())
}
if !reflect.DeepEqual(seq, wantSeq) {
t.Errorf("got %v, want %v", seq, wantSeq)
}
}
func TestFuzzTaskQueue(t *testing.T) {
if runtime.GOOS == "windows" {
t.Skip("test does not pass on Windows")
}
h := taskQueue{}
count := 100
// Setup a bunch of tasks with random priorities and run times.
tasks := make([]scheduledTask, count)
for i := 0; i < count; i++ {
at := time.Now().Add(time.Duration(rand.Intn(200)-50) * time.Millisecond)
tasks[i] = scheduledTask{runAt: at, priority: 0, task: &heapTestTask{name: strconv.Itoa(i)}}
h.Push(tasks[i])
}
seq := []string{}
for i := 0; i < count; i++ {
task := h.Dequeue(context.Background())
if task == nil || task.task == nil {
t.Fatal("expected task")
}
seq = append(seq, task.task.Name())
}
var expectOrdering []string
sort.SliceStable(tasks, func(i, j int) bool {
if tasks[i].runAt.Equal(tasks[j].runAt) {
return tasks[i].priority < tasks[j].priority
}
return tasks[i].runAt.Before(tasks[j].runAt)
})
for _, task := range tasks {
expectOrdering = append(expectOrdering, task.task.Name())
}
if !reflect.DeepEqual(seq, expectOrdering) {
t.Errorf("got %v, want %v", seq, expectOrdering)
}
}
+13
View File
@@ -37,6 +37,19 @@ func (t *TimePriorityQueue[T]) Peek() T {
return t.tqueue.Peek().v
}
func (t *TimePriorityQueue[T]) Reset() []T {
t.mu.Lock()
defer t.mu.Unlock()
var res []T
for t.ready.Len() > 0 {
res = append(res, heap.Pop(&t.ready).(priorityEntry[T]).v)
}
for t.tqueue.Len() > 0 {
res = append(res, heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]).v.v)
}
return res
}
func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) {
t.mu.Lock()
t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v})
+11
View File
@@ -48,6 +48,17 @@ func (t *TimeQueue[T]) Peek() T {
return t.heap.Peek().v
}
func (t *TimeQueue[T]) Reset() []T {
t.mu.Lock()
defer t.mu.Unlock()
var res []T
for t.heap.Len() > 0 {
res = append(res, heap.Pop(&t.heap).(timeQueueEntry[T]).v)
}
return res
}
func (t *TimeQueue[T]) Dequeue(ctx context.Context) T {
t.dequeueMu.Lock()
defer t.dequeueMu.Unlock()