From cb78298cffb492560717d5f8bdcd5941f7976f2e Mon Sep 17 00:00:00 2001 From: Gareth George Date: Thu, 28 Dec 2023 08:03:47 +0000 Subject: [PATCH] fix: tasks run late when laptops resume from sleep --- internal/orchestrator/orchestrator.go | 8 +++--- internal/orchestrator/scheduledtaskheap.go | 30 +++++++++++++++++----- 2 files changed, 27 insertions(+), 11 deletions(-) diff --git a/internal/orchestrator/orchestrator.go b/internal/orchestrator/orchestrator.go index 36eca249..e6d63956 100644 --- a/internal/orchestrator/orchestrator.go +++ b/internal/orchestrator/orchestrator.go @@ -54,11 +54,9 @@ func NewOrchestrator(resticBin string, cfg *v1.Config, oplog *oplog.OpLog) (*Orc config: cfg, // repoPool created with a memory store to ensure the config is updated in an atomic operation with the repo pool's config value. repoPool: newResticRepoPool(resticBin, &config.MemoryStore{Config: cfg}), - taskQueue: taskQueue{ - Now: func() time.Time { - return o.curTime() - }, - }, + taskQueue: newTaskQueue(func() time.Time { + return o.curTime() + }), } // verify the operation log and mark any incomplete operations as failed. diff --git a/internal/orchestrator/scheduledtaskheap.go b/internal/orchestrator/scheduledtaskheap.go index 8fc6f34d..405f884b 100644 --- a/internal/orchestrator/scheduledtaskheap.go +++ b/internal/orchestrator/scheduledtaskheap.go @@ -7,16 +7,28 @@ import ( "time" ) +var taskQueueDefaultPollInterval = 15 * time.Minute + type taskQueue struct { - dequeueMu sync.Mutex - mu sync.Mutex - heap scheduledTaskHeapByTime - notify chan struct{} - ready scheduledTaskHeapByPriorityThenTime + dequeueMu sync.Mutex + mu sync.Mutex + heap scheduledTaskHeapByTime + notify chan struct{} + ready scheduledTaskHeapByPriorityThenTime + pollInterval time.Duration Now func() time.Time } +func newTaskQueue(now func() time.Time) taskQueue { + return taskQueue{ + heap: scheduledTaskHeapByTime{}, + ready: scheduledTaskHeapByPriorityThenTime{}, + pollInterval: taskQueueDefaultPollInterval, + Now: now, + } +} + func (t *taskQueue) curTime() time.Time { if t.Now != nil { return t.Now() @@ -94,7 +106,13 @@ func (t *taskQueue) Dequeue(ctx context.Context) *scheduledTask { } t.mu.Unlock() - timer := time.NewTimer(first.runAt.Sub(now)) + d := first.runAt.Sub(now) + if t.pollInterval > 0 && d > t.pollInterval { + // A poll interval may be set to work around clock changes + // e.g. when a laptop wakes from sleep or the system clock is adjusted. + d = t.pollInterval + } + timer := time.NewTimer(d) select { case <-timer.C: