Go - wait for the next item in the priority queue, if empty

I am trying to implement a priority queue to send json objects over a priority based network socket. I am using a package container/heap

to implement a queue. I came up with something like this:

for {
    if pq.Len() > 0 {
        item := heap.Pop(&pq).(*Item)
        jsonEncoder.Encode(&item)
    } else {
        time.Sleep(10 * time.Millisecond)
    }
}

      

Are there better ways to wait for a new item than just polling the priority queue?

+3


source to share


2 answers


One way is to use sync.Cond

:

Cond implements a condition variable, a rendezvous point for gorutts waiting for or announcing the occurrence of an event.

The example from the package can be modified as follows (for the consumer):

c.L.Lock()
for heap.Len() == 0 {
    c.Wait() // Will wait until signalled by pushing routine
}
item := heap.Pop(&pq).(*Item)
c.L.Unlock()
// Do stuff with the item

      

And the producer could just do:



c.L.Lock()
heap.Push(x)
c.L.Unlock()
c.Signal()

      

(It might be a good idea to wrap these functions and use runs.)

Here is an example of a threaded (naive) heap that pops a method to wait until an element is available:

package main

import (
    "fmt"
    "sort"
    "sync"
    "time"
    "math/rand"
)

type Heap struct {
    b []int
    c *sync.Cond
}

func NewHeap() *Heap {
    return &Heap{c: sync.NewCond(new(sync.Mutex))}
}

// Pop (waits until anything available)
func (h *Heap) Pop() int {
    h.c.L.Lock()
    defer h.c.L.Unlock()
    for len(h.b) == 0 {
        h.c.Wait()
    }
    // There is definitely something in there
    x := h.b[len(h.b)-1]
    h.b = h.b[:len(h.b)-1]
    return x
}

func (h *Heap) Push(x int) {
    defer h.c.Signal() // will wake up a popper
    h.c.L.Lock()
    defer h.c.L.Unlock()
    // Add and sort to maintain priority (not really how the heap works)
    h.b = append(h.b, x)
    sort.Ints(h.b)
}

func main() {
    heap := NewHeap()

    go func() {
        for range time.Tick(time.Second) {
            for n := 0; n < 3; n++ {
                x := rand.Intn(100)
                fmt.Println("push:", x)
                heap.Push(x)
            }
        }
    }()

    for {
        item := heap.Pop()
        fmt.Println("pop: ", item)
    }
}

      

(Note that this does not work in playground due to looping for range time.Tick

. Run it locally.)

+1


source


I would probably use a queue couple in the queue. Starting with the data structures in the PriorityQueue example , I would build a function like this:

http://play.golang.org/p/hcNFX8ehBW

func queue(in <-chan *Item, out chan<- *Item) {
    // Make us a queue!
    pq := make(PriorityQueue, 0)
    heap.Init(&pq)

    var currentItem *Item       // Our item "in hand"
    var currentIn = in          // Current input channel (may be nil sometimes)
    var currentOut chan<- *Item // Current output channel (starts nil until we have something)

    defer close(out)

    for {
        select {
        // Read from the input
        case item, ok := <-currentIn:
            if !ok {
                // The input has been closed. Don't keep trying to read it
                currentIn = nil
                // If there nothing pending to write, we're done
                if currentItem == nil {
                    return
                }
                continue
            }

            // Were we holding something to write? Put it back.
            if currentItem != nil {
                heap.Push(&pq, currentItem)
            }

            // Put our new thing on the queue
            heap.Push(&pq, item)

            // Turn on the output queue if it not turned on
            currentOut = out

            // Grab our best item. We know there at least one. We just put it there.
            currentItem = heap.Pop(&pq).(*Item)

            // Write to the output
        case currentOut <- currentItem:
            // OK, we wrote. Is there anything else?
            if len(pq) > 0 {
                // Hold onto it for next time
                currentItem = heap.Pop(&pq).(*Item)
            } else {
                // Oh well, nothing to write. Is the input stream done?
                if currentIn == nil {
                    // Then we're done
                    return
                }

                // Otherwise, turn off the output stream for now.
                currentItem = nil
                currentOut = nil
            }
        }
    }
}

      



Here's an example of using it:

func main() {
    // Some items and their priorities.
    items := map[string]int{
        "banana": 3, "apple": 2, "pear": 4,
    }

    in := make(chan *Item, 10) // Big input buffer and unbuffered output should give best sort ordering.
    out := make(chan *Item)    // But the system will "work" for any particular values

    // Start the queuing engine!
    go queue(in, out)

    // Stick some stuff on in another goroutine
    go func() {
        i := 0
        for value, priority := range items {
            in <- &Item{
                value:    value,
                priority: priority,
                index:    i,
            }
            i++
        }
        close(in)
    }()

    // Read the results
    for item := range out {
        fmt.Printf("%.2d:%s ", item.priority, item.value)
    }
    fmt.Println()
}

      

Note that if you run this example, the order will be slightly different each time. This was, of course, expected. It depends on how fast the input and output channels are.

+1


source







All Articles