mirror of
https://github.com/garethgeorge/backrest.git
synced 2025-12-17 11:05:38 +00:00
fix: bugs in refactored task queue and improved coverage
This commit is contained in:
@@ -59,7 +59,7 @@ func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) {
|
|||||||
func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T {
|
func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
for {
|
for {
|
||||||
for t.tqueue.Len() > 0 {
|
for t.tqueue.heap.Len() > 0 {
|
||||||
thead := t.tqueue.Peek() // peek at the head of the time queue
|
thead := t.tqueue.Peek() // peek at the head of the time queue
|
||||||
if thead.at.Before(time.Now()) {
|
if thead.at.Before(time.Now()) {
|
||||||
tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]])
|
tqe := heap.Pop(&t.tqueue.heap).(timeQueueEntry[priorityEntry[T]])
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package queue
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
@@ -53,3 +54,33 @@ func TestTPQMixedReadinessStates(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTPQStress(t *testing.T) {
|
||||||
|
tpq := NewTimePriorityQueue[int]()
|
||||||
|
start := time.Now()
|
||||||
|
|
||||||
|
totalEnqueued := 0
|
||||||
|
totalEnqueuedSum := 0
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
ctx, _ := context.WithDeadline(context.Background(), start.Add(1*time.Second))
|
||||||
|
for ctx.Err() == nil {
|
||||||
|
v := rand.Intn(100)
|
||||||
|
tpq.Enqueue(time.Now().Add(time.Duration(rand.Intn(1000)-500)*time.Millisecond), rand.Intn(5), v)
|
||||||
|
totalEnqueuedSum += v
|
||||||
|
totalEnqueued++
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
ctx, _ := context.WithDeadline(context.Background(), start.Add(3*time.Second))
|
||||||
|
totalDequeued := 0
|
||||||
|
sum := 0
|
||||||
|
for ctx.Err() == nil || totalDequeued < totalEnqueued {
|
||||||
|
sum += tpq.Dequeue(ctx)
|
||||||
|
totalDequeued++
|
||||||
|
}
|
||||||
|
|
||||||
|
if sum != totalEnqueuedSum {
|
||||||
|
t.Errorf("expected sum to be %d, got %d", totalEnqueuedSum, sum)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import (
|
|||||||
"container/heap"
|
"container/heap"
|
||||||
"context"
|
"context"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@@ -13,7 +14,7 @@ type TimeQueue[T any] struct {
|
|||||||
|
|
||||||
dequeueMu sync.Mutex
|
dequeueMu sync.Mutex
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
notify chan struct{}
|
notify atomic.Pointer[chan struct{}]
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTimeQueue[T any]() *TimeQueue[T] {
|
func NewTimeQueue[T any]() *TimeQueue[T] {
|
||||||
@@ -25,10 +26,13 @@ func NewTimeQueue[T any]() *TimeQueue[T] {
|
|||||||
func (t *TimeQueue[T]) Enqueue(at time.Time, v T) {
|
func (t *TimeQueue[T]) Enqueue(at time.Time, v T) {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
heap.Push(&t.heap, timeQueueEntry[T]{at, v})
|
heap.Push(&t.heap, timeQueueEntry[T]{at, v})
|
||||||
if t.notify != nil {
|
|
||||||
t.notify <- struct{}{}
|
|
||||||
}
|
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
if n := t.notify.Load(); n != nil {
|
||||||
|
select {
|
||||||
|
case *n <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *TimeQueue[T]) Len() int {
|
func (t *TimeQueue[T]) Len() int {
|
||||||
@@ -63,30 +67,24 @@ func (t *TimeQueue[T]) Dequeue(ctx context.Context) T {
|
|||||||
t.dequeueMu.Lock()
|
t.dequeueMu.Lock()
|
||||||
defer t.dequeueMu.Unlock()
|
defer t.dequeueMu.Unlock()
|
||||||
|
|
||||||
t.mu.Lock()
|
notify := make(chan struct{}, 1)
|
||||||
t.notify = make(chan struct{}, 1)
|
t.notify.Store(¬ify)
|
||||||
defer func() {
|
defer t.notify.Store(nil)
|
||||||
t.mu.Lock()
|
|
||||||
close(t.notify)
|
|
||||||
t.notify = nil
|
|
||||||
t.mu.Unlock()
|
|
||||||
}()
|
|
||||||
t.mu.Unlock()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
t.mu.Lock()
|
t.mu.Lock()
|
||||||
|
|
||||||
var wait time.Duration
|
var wait time.Duration
|
||||||
if t.heap.Len() == 0 {
|
if t.heap.Len() > 0 {
|
||||||
wait = 3 * time.Minute
|
|
||||||
} else {
|
|
||||||
val := t.heap.Peek()
|
val := t.heap.Peek()
|
||||||
wait = time.Until(val.at)
|
wait = time.Until(val.at)
|
||||||
if wait <= 0 {
|
if wait <= 0 {
|
||||||
t.mu.Unlock()
|
defer t.mu.Unlock()
|
||||||
return heap.Pop(&t.heap).(timeQueueEntry[T]).v
|
return heap.Pop(&t.heap).(timeQueueEntry[T]).v
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
if wait == 0 || wait > 3*time.Minute {
|
||||||
|
wait = 3 * time.Minute
|
||||||
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
|
|
||||||
timer := time.NewTimer(wait)
|
timer := time.NewTimer(wait)
|
||||||
@@ -101,7 +99,7 @@ func (t *TimeQueue[T]) Dequeue(ctx context.Context) T {
|
|||||||
}
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
return val.v
|
return val.v
|
||||||
case <-t.notify: // new task was added, loop again to ensure we have the earliest task.
|
case <-notify: // new task was added, loop again to ensure we have the earliest task.
|
||||||
if !timer.Stop() {
|
if !timer.Stop() {
|
||||||
<-timer.C
|
<-timer.C
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user