Priority BlockingQueue stream in Java 8 not working

These two code snippets have different output order. First part:

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } else {
        return;
    }
}

      

Second part:

jobQueue.stream()
        .filter(TimeoutJobRequest::isReady)
        .peek(jobQueue::remove)
        .forEach(this::execute);

      

Please note that jobQueue

is PriorityBlockingQueue

.

Reordering only happens when this::execute

relatively long (like a couple of seconds).

+3


source to share


4 answers


stream

of PriorityBlockingQueue

follows the order Iterator

, which according to the documentation :

The iterator provided in the iterator () method is not guaranteed to traverse the PriorityBlockingQueue elements in any particular order.

If you want order of precedence, you need poll

items from PriorityBlockingQueue

.



PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);

System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);

System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);

      

Conclusion (this may depend on Java implementation):

-- Try 1 --
3
8
5
-- Try 2 --
3
5
8

      

+6


source


If you want to create a thread following the order of the queues, you can try the following code (it empties the queue):



Stream.generate(jobQueue::poll).limit(jobQueue.size())

      

+3


source


The first part of the code is not equal to the second, when the function job.isReady()

returns false

, the first ends and the second is still executed, the filter

stream function is only a filtering operation

you can change the first piece of code to

while(!jobQueue.isEmpty()) {
    TimeoutJobRequest job = jobQueue.peek();
    if(job.isReady()) {
        execute(job);
        jobQueue.poll();
    } 
}

      

+1


source


Unfortunately, iteration order! = Priority order.

I have prepared two copy-paste solutions to use the Stream API to traverse PriorityQueue

using priority order:

static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    return Stream.generate(queue::poll)
      .limit(queue.size());
}

static <T> Stream<T> asStream(PriorityQueue<T> queue) {
    Objects.requireNonNull(queue);
    Comparator<? super T> comparator = queue.comparator();
    return comparator != null
      ? queue.stream().sorted(comparator)
      : queue.stream().sorted();
}

      

draintToStream

empties the queue and asStream

remains intact.

0


source







All Articles