diff --git a/internal/queue/timepriorityqueue.go b/internal/queue/timepriorityqueue.go index b5f9d37..de511ac 100644 --- a/internal/queue/timepriorityqueue.go +++ b/internal/queue/timepriorityqueue.go @@ -3,13 +3,11 @@ package queue import ( "container/heap" "context" - "sync" "time" ) // TimePriorityQueue is a priority queue that dequeues elements at (or after) a specified time, and prioritizes elements based on a priority value. It is safe for concurrent use. type TimePriorityQueue[T equals[T]] struct { - mu sync.Mutex tqueue TimeQueue[priorityEntry[T]] ready genericHeap[priorityEntry[T]] } @@ -22,17 +20,13 @@ func NewTimePriorityQueue[T equals[T]]() *TimePriorityQueue[T] { } func (t *TimePriorityQueue[T]) Len() int { - t.mu.Lock() t.tqueue.mu.Lock() - defer t.mu.Unlock() defer t.tqueue.mu.Unlock() return t.tqueue.heap.Len() + t.ready.Len() } func (t *TimePriorityQueue[T]) Peek() T { - t.mu.Lock() t.tqueue.mu.Lock() - defer t.mu.Unlock() defer t.tqueue.mu.Unlock() if t.ready.Len() > 0 { @@ -46,11 +40,8 @@ func (t *TimePriorityQueue[T]) Peek() T { } func (t *TimePriorityQueue[T]) Reset() []T { - t.mu.Lock() t.tqueue.mu.Lock() - defer t.mu.Unlock() defer t.tqueue.mu.Unlock() - var res []T for t.ready.Len() > 0 { res = append(res, heap.Pop(&t.ready).(priorityEntry[T]).v) @@ -62,9 +53,7 @@ func (t *TimePriorityQueue[T]) Reset() []T { } func (t *TimePriorityQueue[T]) GetAll() []T { - t.mu.Lock() t.tqueue.mu.Lock() - defer t.mu.Unlock() defer t.tqueue.mu.Unlock() res := make([]T, 0, t.tqueue.heap.Len()+t.ready.Len()) for _, entry := range t.tqueue.heap { @@ -77,9 +66,7 @@ func (t *TimePriorityQueue[T]) GetAll() []T { } func (t *TimePriorityQueue[T]) Remove(v T) { - t.mu.Lock() t.tqueue.mu.Lock() - defer t.mu.Unlock() defer t.tqueue.mu.Unlock() for idx := 0; idx < t.tqueue.heap.Len(); idx++ { @@ -98,15 +85,12 @@ func (t *TimePriorityQueue[T]) Remove(v T) { } func (t *TimePriorityQueue[T]) Enqueue(at time.Time, priority int, v T) { - t.mu.Lock() t.tqueue.Enqueue(at, priorityEntry[T]{at, priority, v}) - t.mu.Unlock() } func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { - t.mu.Lock() + t.tqueue.mu.Lock() for { - t.tqueue.mu.Lock() for t.tqueue.heap.Len() > 0 { thead := t.tqueue.heap.Peek() // peek at the head of the time queue if thead.at.Before(time.Now()) { @@ -116,15 +100,14 @@ func (t *TimePriorityQueue[T]) Dequeue(ctx context.Context) T { break } } - t.tqueue.mu.Unlock() if t.ready.Len() > 0 { - defer t.mu.Unlock() + defer t.tqueue.mu.Unlock() return heap.Pop(&t.ready).(priorityEntry[T]).v } - t.mu.Unlock() + t.tqueue.mu.Unlock() // wait for the next element to be ready val := t.tqueue.Dequeue(ctx) - t.mu.Lock() + t.tqueue.mu.Lock() heap.Push(&t.ready, val) } } diff --git a/internal/queue/timequeue.go b/internal/queue/timequeue.go index 842f561..0e9b409 100644 --- a/internal/queue/timequeue.go +++ b/internal/queue/timequeue.go @@ -119,11 +119,12 @@ func (t *TimeQueue[T]) Dequeue(ctx context.Context) T { t.mu.Unlock() continue } - val, ok := heap.Pop(&t.heap).(timeQueueEntry[T]) - if !ok || val.at.After(time.Now()) { + val := t.heap.Peek() + if val.at.After(time.Now()) { t.mu.Unlock() continue } + heap.Pop(&t.heap) t.mu.Unlock() return val.v case <-notify: // new task was added, loop again to ensure we have the earliest task.