From 1d0489847e6fee5baed807117379738aceca4a2d Mon Sep 17 00:00:00 2001 From: garethgeorge Date: Mon, 8 Apr 2024 00:16:28 -0700 Subject: [PATCH] feat: use new task queue implementation in orchestrator --- internal/orchestrator/orchestrator.go | 61 +++-- internal/orchestrator/orchestrator_test.go | 13 +- internal/orchestrator/scheduledtaskheap.go | 220 ------------------ .../orchestrator/scheduledtaskheap_test.go | 173 -------------- internal/queue/timepriorityqueue.go | 13 ++ internal/queue/timequeue.go | 11 + 6 files changed, 53 insertions(+), 438 deletions(-) delete mode 100644 internal/orchestrator/scheduledtaskheap.go delete mode 100644 internal/orchestrator/scheduledtaskheap_test.go diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 6a95e10b..b0dc1071 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -13,6 +13,7 @@ import ( "github.com/garethgeorge/backrest/internal/config" "github.com/garethgeorge/backrest/internal/hook" "github.com/garethgeorge/backrest/internal/oplog" + "github.com/garethgeorge/backrest/internal/queue" "github.com/garethgeorge/backrest/internal/rotatinglog" "go.uber.org/zap" "google.golang.org/protobuf/proto" @@ -40,14 +41,10 @@ type Orchestrator struct { config *v1.Config OpLog *oplog.OpLog repoPool *resticRepoPool - taskQueue taskQueue + taskQueue *queue.TimePriorityQueue[scheduledTask] hookExecutor *hook.HookExecutor logStore *rotatinglog.RotatingLog - - // now for the purpose of testing; used by Run() to get the current time. - now func() time.Time - - runningTask atomic.Pointer[taskExecutionInfo] + runningTask atomic.Pointer[taskExecutionInfo] } func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logStore *rotatinglog.RotatingLog) (*Orchestrator, error) { @@ -59,10 +56,8 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt OpLog: oplog, config: cfg, // repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value. - repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), - taskQueue: newTaskQueue(func() time.Time { - return o.curTime() - }), + repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), + taskQueue: queue.NewTimePriorityQueue[scheduledTask](), hookExecutor: hook.NewHookExecutor(oplog, logStore), logStore: logStore, } @@ -104,13 +99,6 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog, logSt return o, nil } -func (o *Orchestrator) curTime() time.Time { - if o.now != nil { - return o.now() - } - return time.Now() -} - func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error { o.mu.Lock() defer o.mu.Unlock() @@ -121,12 +109,13 @@ func (o *Orchestrator) ApplyConfig(cfg *v1.Config) error { return fmt.Errorf("failed to update repo pool config: %w", err) } - return o.ScheduleDefaultTasks(cfg) + return o.scheduleDefaultTasks(cfg) } // rescheduleTasksIfNeeded checks if any tasks need to be rescheduled based on config changes. -func (o *Orchestrator) ScheduleDefaultTasks(config *v1.Config) error { +func (o *Orchestrator) scheduleDefaultTasks(config *v1.Config) error { zap.L().Info("scheduling default tasks, waiting for task queue reset.") + removedTasks := o.taskQueue.Reset() for _, t := range removedTasks { if err := t.task.Cancel(v1.OperationStatus_STATUS_SYSTEM_CANCELLED); err != nil { @@ -195,27 +184,21 @@ func (o *Orchestrator) CancelOperation(operationId int64, status v1.OperationSta } tasks := o.taskQueue.Reset() - remaining := make([]scheduledTask, 0, len(tasks)) - for _, t := range tasks { if t.task.OperationId() == operationId { if err := t.task.Cancel(status); err != nil { return fmt.Errorf("cancel task %q: %w", t.task.Name(), err) } - // check if the task has a next after it's current 'runAt' time, if it does then we will schedule the next run. - if nextTime := t.task.Next(t.runAt); nextTime != nil { - remaining = append(remaining, scheduledTask{ - task: t.task, - runAt: *nextTime, - }) + nextTime := t.task.Next(t.runAt) + if nextTime == nil { + continue } - } else { - remaining = append(remaining, *t) - } - } - o.taskQueue.Push(remaining...) + t.runAt = *nextTime + } + o.taskQueue.Enqueue(t.runAt, t.priority, t) // requeue the task. + } return nil } @@ -231,7 +214,7 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } t := o.taskQueue.Dequeue(mainCtx) - if t == nil { + if t.task == nil { continue } @@ -267,12 +250,12 @@ func (o *Orchestrator) Run(mainCtx context.Context) { } func (o *Orchestrator) ScheduleTask(t Task, priority int, callbacks ...func(error)) { - nextRun := t.Next(o.curTime()) + nextRun := t.Next(time.Now()) if nextRun == nil { return } zap.L().Info("scheduling task", zap.String("task", t.Name()), zap.String("runAt", nextRun.Format(time.RFC3339))) - o.taskQueue.Push(scheduledTask{ + o.taskQueue.Enqueue(*nextRun, priority, scheduledTask{ task: t, runAt: *nextRun, priority: priority, @@ -341,3 +324,11 @@ type taskExecutionInfo struct { operationId int64 cancel func() } + +type scheduledTask struct { + task Task + runAt time.Time + priority int + callbacks []func(error) + config *v1.Config +} diff --git a/internal/orchestrator/orchestrator_test.go b/internal/orchestrator/orchestrator_test.go index 0e6a1dd9..1f58fa86 100644 --- a/internal/orchestrator/orchestrator_test.go +++ b/internal/orchestrator/orchestrator_test.go @@ -149,19 +149,14 @@ func TestSchedulerWait(t *testing.T) { t.Parallel() // Arrange - curTime := time.Now() orch, err := NewOrchestrator("", config.NewDefaultConfig(), nil, nil) if err != nil { t.Fatalf("failed to create orchestrator: %v", err) } - orch.now = func() time.Time { - return curTime - } - ran := make(chan struct{}) orch.ScheduleTask(&testTask{ onNext: func(t time.Time) *time.Time { - t = t.Add(5 * time.Millisecond) + t = t.Add(150 * time.Millisecond) return &t }, onRun: func() error { @@ -180,12 +175,10 @@ func TestSchedulerWait(t *testing.T) { t.Errorf("expected task to not run yet") } - curTime = time.Now() - // Schedule another task just to trigger a queue refresh orch.ScheduleTask(&testTask{ onNext: func(t time.Time) *time.Time { - t = t.Add(5 * time.Millisecond) + t = t.Add(1000 * time.Second) return &t }, onRun: func() error { @@ -195,7 +188,7 @@ func TestSchedulerWait(t *testing.T) { }, TaskPriorityDefault) select { - case <-time.NewTimer(1000 * time.Millisecond).C: + case <-time.NewTimer(200 * time.Millisecond).C: t.Errorf("expected task to run") case <-ran: } diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go deleted file mode 100644 index 03abf5d0..00000000 --- a/internal/orchestrator/scheduledtaskheap.go +++ /dev/null @@ -1,220 +0,0 @@ -package orchestrator - -import ( - "container/heap" - "context" - "sync" - "time" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -var taskQueueDefaultPollInterval = 3 * time.Minute - -type taskQueue struct { - dequeueMu sync.Mutex - mu sync.Mutex - heap scheduledTaskHeapByTime - notify chan struct{} - ready scheduledTaskHeapByPriorityThenTime - pollInterval time.Duration - - Now func() time.Time -} - -func newTaskQueue(now func() time.Time) taskQueue { - return taskQueue{ - heap: scheduledTaskHeapByTime{}, - ready: scheduledTaskHeapByPriorityThenTime{}, - pollInterval: taskQueueDefaultPollInterval, - Now: now, - } -} - -func (t *taskQueue) curTime() time.Time { - if t.Now != nil { - return t.Now() - } - return time.Now() -} - -func (t *taskQueue) Push(tasks ...scheduledTask) { - t.mu.Lock() - defer t.mu.Unlock() - - for _, task := range tasks { - task := task - if task.task == nil { - panic("task cannot be nil") - } - heap.Push(&t.heap, &task) - } - - if t.notify != nil { - t.notify <- struct{}{} - } -} - -func (t *taskQueue) Reset() []*scheduledTask { - t.mu.Lock() - defer t.mu.Unlock() - - oldTasks := t.heap.tasks - oldTasks = append(oldTasks, t.ready.tasks...) - t.heap.tasks = nil - t.ready.tasks = nil - - if t.notify != nil { - t.notify <- struct{}{} - } - return oldTasks -} - -func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { - t.dequeueMu.Lock() - defer t.dequeueMu.Unlock() - - t.mu.Lock() - defer t.mu.Unlock() - t.notify = make(chan struct{}, 10) - defer func() { - close(t.notify) - t.notify = nil - }() - - for { - first, ok := t.heap.Peek().(*scheduledTask) - if !ok { // no tasks in heap. - if t.ready.Len() > 0 { - return heap.Pop(&t.ready).(*scheduledTask) - } - t.mu.Unlock() - select { - case <-ctx.Done(): - t.mu.Lock() - return nil - case <-t.notify: - } - t.mu.Lock() - continue - } - - now := t.curTime() - - // if there's a task in the ready queue AND the first task isn't ready yet then immediately return the ready task. - ready, ok := t.ready.Peek().(*scheduledTask) - if ok && now.Before(first.runAt) { - heap.Pop(&t.ready) - return ready - } - - t.mu.Unlock() - d := first.runAt.Sub(now) - if t.pollInterval > 0 && d > t.pollInterval { - // A poll interval may be set to work around clock changes - // e.g. when a laptop wakes from sleep or the system clock is adjusted. - d = t.pollInterval - } - timer := time.NewTimer(d) - - select { - case <-timer.C: - t.mu.Lock() - if t.heap.Len() == 0 { - break - } - - for { - first, ok := t.heap.Peek().(*scheduledTask) - if !ok { - break - } - if first.runAt.After(t.curTime()) { - // task is not yet ready to run - break - } - heap.Pop(&t.heap) // remove the task from the heap - heap.Push(&t.ready, first) - } - - if t.ready.Len() == 0 { - break - } - return heap.Pop(&t.ready).(*scheduledTask) - case <-t.notify: // new task was added, loop again to ensure we have the earliest task. - t.mu.Lock() - if !timer.Stop() { - <-timer.C - } - case <-ctx.Done(): - t.mu.Lock() - if !timer.Stop() { - <-timer.C - } - return nil - } - } -} - -type scheduledTask struct { - task Task - runAt time.Time - priority int - callbacks []func(error) - config *v1.Config -} - -type scheduledTaskHeap struct { - tasks []*scheduledTask - comparator func(i, j *scheduledTask) bool -} - -func (h *scheduledTaskHeap) Len() int { - return len(h.tasks) -} - -func (h *scheduledTaskHeap) Swap(i, j int) { - h.tasks[i], h.tasks[j] = h.tasks[j], h.tasks[i] -} - -func (h *scheduledTaskHeap) Push(x interface{}) { - h.tasks = append(h.tasks, x.(*scheduledTask)) -} - -func (h *scheduledTaskHeap) Pop() interface{} { - old := h.tasks - n := len(old) - x := old[n-1] - h.tasks = old[0 : n-1] - return x -} - -func (h *scheduledTaskHeap) Peek() interface{} { - if len(h.tasks) == 0 { - return nil - } - return h.tasks[0] -} - -type scheduledTaskHeapByTime struct { - scheduledTaskHeap -} - -var _ heap.Interface = &scheduledTaskHeapByTime{} - -func (h *scheduledTaskHeapByTime) Less(i, j int) bool { - return h.tasks[i].runAt.Before(h.tasks[j].runAt) -} - -type scheduledTaskHeapByPriorityThenTime struct { - scheduledTaskHeap -} - -var _ heap.Interface = &scheduledTaskHeapByPriorityThenTime{} - -func (h *scheduledTaskHeapByPriorityThenTime) Less(i, j int) bool { - if h.tasks[i].priority != h.tasks[j].priority { - return h.tasks[i].priority > h.tasks[j].priority - } - return h.tasks[i].runAt.Before(h.tasks[j].runAt) -} diff --git a/internal/orchestrator/scheduledtaskheap_test.go b/internal/orchestrator/scheduledtaskheap_test.go deleted file mode 100644 index 93a540da..00000000 --- a/internal/orchestrator/scheduledtaskheap_test.go +++ /dev/null @@ -1,173 +0,0 @@ -package orchestrator - -import ( - "context" - "math/rand" - "reflect" - "runtime" - "sort" - "strconv" - "testing" - "time" - - v1 "github.com/garethgeorge/backrest/gen/go/v1" -) - -type heapTestTask struct { - name string -} - -var _ Task = &heapTestTask{} - -func (t *heapTestTask) Name() string { - return t.name -} - -func (t *heapTestTask) Next(now time.Time) *time.Time { - return nil -} - -func (t *heapTestTask) Run(ctx context.Context) error { - return nil -} - -func (t *heapTestTask) Cancel(withStatus v1.OperationStatus) error { - return nil -} - -func (t *heapTestTask) OperationId() int64 { - return 0 -} - -func TestTaskQueueOrdering(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("test is flaky on Windows") - } - - h := taskQueue{} - - h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}}) - h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "2"}}) - h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "3"}}) - - wantSeq := []string{"1", "2", "3"} - seq := []string{} - for i := 0; i < 3; i++ { - task := h.Dequeue(context.Background()) - if task == nil || task.task == nil { - t.Fatal("expected task") - } - seq = append(seq, task.task.Name()) - } - - if !reflect.DeepEqual(seq, wantSeq) { - t.Errorf("got %v, want %v", seq, wantSeq) - } -} - -func TestLiveTaskEnqueue(t *testing.T) { - h := taskQueue{} - - go func() { - time.Sleep(1 * time.Millisecond) - h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}}) - }() - - t1 := h.Dequeue(context.Background()) - if t1.task.Name() != "1" { - t.Errorf("got %s, want 1", t1.task.Name()) - } -} - -func TestTaskQueueReset(t *testing.T) { - h := taskQueue{} - - h.Push(scheduledTask{runAt: time.Now().Add(1 * time.Millisecond), task: &heapTestTask{name: "1"}}) - h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "2"}}) - h.Push(scheduledTask{runAt: time.Now().Add(2 * time.Millisecond), task: &heapTestTask{name: "3"}}) - - if h.Dequeue(context.Background()).task.Name() != "1" { - t.Fatal("expected 1") - } - h.Reset() - - ctx, cancel := context.WithCancel(context.Background()) - go func() { - time.Sleep(1 * time.Millisecond) - cancel() - }() - - if h.Dequeue(ctx) != nil { - t.Fatal("expected nil task") - } -} - -func TestTasksOrderedByPriority(t *testing.T) { - h := taskQueue{} - - now := time.Now() - h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "4"}, priority: 1}) - h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "3"}, priority: 2}) - h.Push(scheduledTask{runAt: now.Add(10 * time.Millisecond), task: &heapTestTask{name: "5"}, priority: 3}) - h.Push(scheduledTask{runAt: now, task: &heapTestTask{name: "2"}, priority: 3}) - h.Push(scheduledTask{runAt: now.Add(-10 * time.Millisecond), task: &heapTestTask{name: "1"}, priority: 3}) - - wantSeq := []string{"1", "2", "3", "4", "5"} - - seq := []string{} - - for i := 0; i < 5; i++ { - task := h.Dequeue(context.Background()) - if task == nil || task.task == nil { - t.Fatal("expected task") - } - seq = append(seq, task.task.Name()) - } - - if !reflect.DeepEqual(seq, wantSeq) { - t.Errorf("got %v, want %v", seq, wantSeq) - } -} - -func TestFuzzTaskQueue(t *testing.T) { - if runtime.GOOS == "windows" { - t.Skip("test does not pass on Windows") - } - - h := taskQueue{} - - count := 100 - - // Setup a bunch of tasks with random priorities and run times. - tasks := make([]scheduledTask, count) - for i := 0; i < count; i++ { - at := time.Now().Add(time.Duration(rand.Intn(200)-50) * time.Millisecond) - tasks[i] = scheduledTask{runAt: at, priority: 0, task: &heapTestTask{name: strconv.Itoa(i)}} - h.Push(tasks[i]) - } - - seq := []string{} - for i := 0; i < count; i++ { - task := h.Dequeue(context.Background()) - if task == nil || task.task == nil { - t.Fatal("expected task") - } - seq = append(seq, task.task.Name()) - } - - var expectOrdering []string - sort.SliceStable(tasks, func(i, j int) bool { - if tasks[i].runAt.Equal(tasks[j].runAt) { - return tasks[i].priority < tasks[j].priority - } - return tasks[i].runAt.Before(tasks[j].runAt) - }) - - for _, task := range tasks { - expectOrdering = append(expectOrdering, task.task.Name()) - } - - if !reflect.DeepEqual(seq, expectOrdering) { - t.Errorf("got %v, want %v", seq, expectOrdering) - } -} diff --git a/internal/queue/timepriorityqueue.go b/internal/queue/timepriorityqueue.go index 18e7b5e2..4c924197 100644 --- a/internal/queue/timepriorityqueue.go +++ b/internal/queue/timepriorityqueue.go @@ -37,6 +37,19 @@ func (t *TimePriorityQueue[T]) Peek() T { return t.tqueue.Peek().v } +func (t *TimePriorityQueue[T]) Reset() []T { + t.mu.Lock() + defer t.mu.Unlock() + var res []T + for t.ready.Len() > 0 { + res = append(res, heap.Pop(&t.ready).(priorityEntry[T]).v) + } + for t.tqueue.Len() > 0 { + res = append(res, heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]]).v.v) + } + return res +} + func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) { t.mu.Lock() t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v}) diff --git a/internal/queue/timequeue.go b/internal/queue/timequeue.go index 448d1f57..45653b43 100644 --- a/internal/queue/timequeue.go +++ b/internal/queue/timequeue.go @@ -48,6 +48,17 @@ func (t *TimeQueue[T]) Peek() T { return t.heap.Peek().v } +func (t *TimeQueue[T]) Reset() []T { + t.mu.Lock() + defer t.mu.Unlock() + + var res []T + for t.heap.Len() > 0 { + res = append(res, heap.Pop(&t.heap).(timeQueueEntry[T]).v) + } + return res +} + func (t *TimeQueue[T]) Dequeue(ctx context.Context) T { t.dequeueMu.Lock() defer t.dequeueMu.Unlock()