Go - wait for the next item in the priority queue, if empty
I am trying to implement a priority queue to send json objects over a priority based network socket. I am using a package container/heap
to implement a queue. I came up with something like this:
for {
if pq.Len() > 0 {
item := heap.Pop(&pq).(*Item)
jsonEncoder.Encode(&item)
} else {
time.Sleep(10 * time.Millisecond)
}
}
Are there better ways to wait for a new item than just polling the priority queue?
source to share
One way is to use sync.Cond
:
Cond implements a condition variable, a rendezvous point for gorutts waiting for or announcing the occurrence of an event.
The example from the package can be modified as follows (for the consumer):
c.L.Lock()
for heap.Len() == 0 {
c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item
And the producer could just do:
c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()
(It might be a good idea to wrap these functions and use runs.)
Here is an example of a threaded (naive) heap that pops a method to wait until an element is available:
package main
import (
"fmt"
"sort"
"sync"
"time"
"math/rand"
)
type Heap struct {
b []int
c *sync.Cond
}
func NewHeap() *Heap {
return &Heap{c: sync.NewCond(new(sync.Mutex))}
}
// Pop (waits until anything available)
func (h *Heap) Pop() int {
h.c.L.Lock()
defer h.c.L.Unlock()
for len(h.b) == 0 {
h.c.Wait()
}
// There is definitely something in there
x := h.b[len(h.b)-1]
h.b = h.b[:len(h.b)-1]
return x
}
func (h *Heap) Push(x int) {
defer h.c.Signal() // will wake up a popper
h.c.L.Lock()
defer h.c.L.Unlock()
// Add and sort to maintain priority (not really how the heap works)
h.b = append(h.b, x)
sort.Ints(h.b)
}
func main() {
heap := NewHeap()
go func() {
for range time.Tick(time.Second) {
for n := 0; n < 3; n++ {
x := rand.Intn(100)
fmt.Println("push:", x)
heap.Push(x)
}
}
}()
for {
item := heap.Pop()
fmt.Println("pop: ", item)
}
}
(Note that this does not work in playground due to looping for range time.Tick
. Run it locally.)
source to share
I would probably use a queue couple in the queue. Starting with the data structures in the PriorityQueue example , I would build a function like this:
http://play.golang.org/p/hcNFX8ehBW
func queue(in <-chan *Item, out chan<- *Item) {
// Make us a queue!
pq := make(PriorityQueue, 0)
heap.Init(&pq)
var currentItem *Item // Our item "in hand"
var currentIn = in // Current input channel (may be nil sometimes)
var currentOut chan<- *Item // Current output channel (starts nil until we have something)
defer close(out)
for {
select {
// Read from the input
case item, ok := <-currentIn:
if !ok {
// The input has been closed. Don't keep trying to read it
currentIn = nil
// If there nothing pending to write, we're done
if currentItem == nil {
return
}
continue
}
// Were we holding something to write? Put it back.
if currentItem != nil {
heap.Push(&pq, currentItem)
}
// Put our new thing on the queue
heap.Push(&pq, item)
// Turn on the output queue if it not turned on
currentOut = out
// Grab our best item. We know there at least one. We just put it there.
currentItem = heap.Pop(&pq).(*Item)
// Write to the output
case currentOut <- currentItem:
// OK, we wrote. Is there anything else?
if len(pq) > 0 {
// Hold onto it for next time
currentItem = heap.Pop(&pq).(*Item)
} else {
// Oh well, nothing to write. Is the input stream done?
if currentIn == nil {
// Then we're done
return
}
// Otherwise, turn off the output stream for now.
currentItem = nil
currentOut = nil
}
}
}
}
Here's an example of using it:
func main() {
// Some items and their priorities.
items := map[string]int{
"banana": 3, "apple": 2, "pear": 4,
}
in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
out := make(chan *Item) // But the system will "work" for any particular values
// Start the queuing engine!
go queue(in, out)
// Stick some stuff on in another goroutine
go func() {
i := 0
for value, priority := range items {
in <- &Item{
value: value,
priority: priority,
index: i,
}
i++
}
close(in)
}()
// Read the results
for item := range out {
fmt.Printf("%.2d:%s ", item.priority, item.value)
}
fmt.Println()
}
Note that if you run this example, the order will be slightly different each time. This was, of course, expected. It depends on how fast the input and output channels are.
source to share