Priority BlockingQueue stream in Java 8 not working
These two code snippets have different output order. First part:
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
} else {
return;
}
}
Second part:
jobQueue.stream()
.filter(TimeoutJobRequest::isReady)
.peek(jobQueue::remove)
.forEach(this::execute);
Please note that jobQueue
is PriorityBlockingQueue
.
Reordering only happens when this::execute
relatively long (like a couple of seconds).
source to share
stream
of PriorityBlockingQueue
follows the order Iterator
, which according to the documentation :
The iterator provided in the iterator () method is not guaranteed to traverse the PriorityBlockingQueue elements in any particular order.
If you want order of precedence, you need poll
items from PriorityBlockingQueue
.
PriorityBlockingQueue<Integer> pq = new PriorityBlockingQueue<>();
pq.add(5);
pq.add(8);
pq.add(3);
System.out.println("-- Try 1 --");
pq.stream().forEach(System.out::println);
System.out.println("-- Try 2 --");
IntStream.range(0, pq.size()).map(i -> pq.poll()).forEach(System.out::println);
Conclusion (this may depend on Java implementation):
-- Try 1 --
3
8
5
-- Try 2 --
3
5
8
source to share
The first part of the code is not equal to the second, when the function job.isReady()
returns false
, the first ends and the second is still executed, the filter
stream function is only a filtering operation
you can change the first piece of code to
while(!jobQueue.isEmpty()) {
TimeoutJobRequest job = jobQueue.peek();
if(job.isReady()) {
execute(job);
jobQueue.poll();
}
}
source to share
Unfortunately, iteration order! = Priority order.
I have prepared two copy-paste solutions to use the Stream API to traverse PriorityQueue
using priority order:
static <T> Stream<T> drainToStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
return Stream.generate(queue::poll)
.limit(queue.size());
}
static <T> Stream<T> asStream(PriorityQueue<T> queue) {
Objects.requireNonNull(queue);
Comparator<? super T> comparator = queue.comparator();
return comparator != null
? queue.stream().sorted(comparator)
: queue.stream().sorted();
}
draintToStream
empties the queue and asStream
remains intact.
source to share