How to treat a dynamic collection like a stream?

Java 8 Collections provide functions to get a collection as a stream. However, as soon as we call the stream () method, we get the current contents of the collection as a stream. What if my collection grows while processing a stream? Operations on a stream can update a collection with a lot of data. Is there a simple and effective way to handle this situation?

(I tried to execute Stream.concat () from a stream processing operation, but I get an exception: Exception in thread "main" java.lang.IllegalStateException: Stream has already started or is closed)

Taking a concrete example, let's say I have a parallel url queue.

Queue<Url> concurrentUrlQue= initUrlQueue();

      

Now I want to get the stream of this url queue and process urls one by one. The process involves removing the URL from the queue, reading the web pages that the URL points to, fetching the URLs from the page, and adding those URLs to the parallel queue.

concurrentUrlQue.stream().forEach((url)->readAndExtractUrls(url, concurrentUrlQue));

      

I want to be able to handle the above dynamically growing queue as a thread. (Also, I want to be able to handle this dynamic queue using a parallel thread)

Is there an easy way to achieve this using java streams?

+3


source to share


1 answer


You need to write a spliterator that blocks waiting for a new element.

class QueueSpliterator<T> extends Spliterators.AbstractSpliterator<T> {

  private final BlockingQueue<T> queue;

  public QueueSpliterator(BlockingQueue<T> queue) {
    super(Long.MAX_VALUE, 0);
    this.queue = queue;
  }

  public boolean tryAdvance(Consumer<? super T> action) {
    try {
      T element = queue.take();
      action.accept(element);
      return true;
    } catch (InterruptedException e) {
      return false;
    }
  }
}

      



Then you create a stream using this separator and treat it like a normal endless stream.

public class Main {
  public static void main(String... args) {
    BlockingQueue<Integer> queue = new ArrayBlockingQueue<>(1000);

    new Thread(() -> {
      for (int i = 0; i < 1000; ++i) {
        try {
          queue.put(i);
        } catch (InterruptedException e) {
          throw new RuntimeException(e);
        }
      }
    }).start();


    Spliterator<Integer> queueSpliterator = new QueueSpliterator<>(queue);
    Stream<Integer> stream = StreamSupport.stream(queueSpliterator, false);

    stream.forEach(System.out::println);
  }
}

      

+4


source







All Articles