How do I correctly send and receive multiple futures in one Java stream?

I am trying to submit and get 10 Future

on the same thread. They each take 1 second to process and I would like to run them in parallel.

My first try is takes_10_sec()

which is done sequentially and takes 10 seconds.

My second attempt takes_1_sec()

, which runs in parallel and takes 1 sec. However, it uses an intermediate .collect(Collectors.toList()).stream()

which I believe is not a good way to do it.

Is there any other recommended way?

public class FutureStream {
    private ExecutorService executor = Executors.newFixedThreadPool(10);;

    @Test
    public void takes_10_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    @Test
    public void takes_1_sec() {
        IntStream.range(0, 10)
                .mapToObj(i -> longTask())
                .collect(Collectors.toList())
                .stream()
                .map(task -> {
                    try {
                        return task.get();
                    } catch (InterruptedException | ExecutionException e) {
                        throw new RuntimeException(e);
                    }
                })
                .forEach(System.out::println);
    }

    private Future<String> longTask() {
        return executor.submit(() -> {
            Thread.sleep(1000);
            return Thread.currentThread().getName();
        });
    }
}

      

+1


source to share


1 answer


Threads are lazy and will only process the elements necessary for the terminal to work. For each item, the entire pipeline is processed before starting the next item (except for parallel threads). This method allows, for example, short-circuit operations.

Since you have an intermediate operation map()

that blocks the outcome of the created future, processing will wait for each future to complete before creating the next.



Collecting them all, just like you, make sure all futures are created first. This is a good solution as you need to make sure the entire stream is processed before processing the results.

+3


source







All Articles