Streaming data with Akka streams, Play Framework and MongoDB Reactive not working

Following this question: Play Framework 2.5 Delayed Streaming Content

I am trying to stream data from MongoDB Reactive via Akka Streams and Play Framework. The problem is that for some reason the data is collected first and then sent back to the complete collection instead of streaming each item separately.

Mongo driver code:

public FindPublisher<Document> findAll(){

    FindPublisher iterator = collection.find();
    return iterator;
}

      

Mapping from Mongo Publisher to Source

public Source<Rating, NotUsed> findAll(){
    return Source.fromPublisher(mongoConnection.findAll()).map(doc -> new Rating().fromDocument(doc));
}

      

Return from Play Framework Controller

 public Result findAll(){

    Source<Rating, NotUsed> ratingsStream = ratingRepository.findAll();
    Source<ByteString, NotUsed> byteStringStream = ratingsStream
            .map(rating -> ByteString.fromString(rating.toDocument().toJson() + "\n"))
            .delay(FiniteDuration.create(100, TimeUnit.MILLISECONDS), DelayOverflowStrategy.backpressure());
    HttpEntity.Streamed streamed =
            new HttpEntity.Streamed(byteStringStream, Optional.empty(), Optional.of("text/event-stream"));
    return ok().sendEntity(streamed);
}

      

The delay here only delays the entire thread (e.g. initialDelay ()) and then returns all Mongo data.

Following on from my previous question, this method works when creating a new source:

 public Result test(){
    Source<ByteString, NotUsed> delay = Source.range(0, 99999)
            .map(i -> ByteString.fromString(i.toString() + "\n"))
            .delay(FiniteDuration.create(200, TimeUnit.MILLISECONDS), DelayOverflowStrategy.backpressure());
    HttpEntity http = new HttpEntity.Streamed(delay
            , Optional.empty(), Optional.of("text/event-stream"));

    return ok().sendEntity(http);
}

      

I have checked the data and the item does stand out separately. At first I thought Mongo could return the entire collection instead of streaming, but that was not the case. Matching each item works, so should deferring them work just as well?

I can only think that it has something to do with the static method fromPublisher(Publisher<T>)

. I am using Play Framework 2.5, MongoDB Driver Reactive 1.3.0 and Akka 2.5.2

Any help is appreciated!

+3


source to share





All Articles