Stream data from replay enumerator via spray using distributed responses

I have data that is being pulled from Reactive Mongo that I need to run through the Spray Rest API. I was hoping to do this with Chunked Responses. However, I found that the Enumerator that comes back from Reactive Mongo is capable of pushing the Spray faster than the network connection can handle. As a result, the connection is terminated.

I was able to solve this problem by using the Spray Ack function in an intermediate actor. This, together with blocking Await, allowed me to create backpressure in the Enumerator. However, I really don't want Waiting. I would like to figure out a way to transfer data through Spray in a non-blocking way.

Is it possible? I have few ideas that might work if I can fill in the missing pieces.

1) Create back pressure on the Enumerator in a non-blocking way (not sure how to do this. Suggestions?)

2) Split the enumerator into smaller counters. Start using each counter only after completing the previous one. I can do this with an actor. What I'm missing is a way to split a larger enumerator into smaller counters.

3) Use something like the "Enumeratee.take" method. Where I will get some number of records from the Enumerator, then when I'm ready, take some more. This is indeed the same solution as 2), but from a slightly different point of view. This will require the enumerator to maintain state. Is there a way to use Enumeratee.take multiple times against the same counter without restarting from the start every time?

Can anyone suggest any alternative suggestions that might work? Or, if this is not possible, please let me know.

I am using Play Enumerators 2.3.5

+3


source to share


2 answers


After a lot of experimentation (and help from stackoverflow), I was able to find a solution that seems to work. It uses Spray Chunked Responses and iterates around it.

Below are the relevant code snippets:

ChunkedResponder.scala

package chunkedresponses

import akka.actor.{Actor, ActorRef}
import spray.http.HttpHeaders.RawHeader
import spray.http._

object ChunkedResponder {
  case class Chunk(data: HttpData)
  case object Shutdown
  case object Ack
}

class ChunkedResponder(contentType: ContentType, responder: ActorRef) extends Actor {
  import ChunkedResponder._
  def receive:Receive = {
    case chunk: Chunk =>
      responder.forward(ChunkedResponseStart(HttpResponse(entity = HttpEntity(contentType, chunk.data))).withAck(Ack))
      context.become(chunking)
    case Shutdown =>
      responder.forward(HttpResponse(headers = List(RawHeader("Content-Type", contentType.value))).withAck(Ack))
      context.stop(self)
  }

  def chunking:Receive = {
    case chunk: Chunk =>
      responder.forward(MessageChunk(chunk.data).withAck(Ack))
    case Shutdown =>
      responder.forward(ChunkedMessageEnd().withAck(Ack))
      context.stop(self)
  }
}

      



ChunkIteratee.scala

package chunkedresponses

import akka.actor.ActorRef
import akka.util.Timeout
import akka.pattern.ask
import play.api.libs.iteratee.{Done, Step, Input, Iteratee}
import spray.http.HttpData
import scala.concurrent.duration._

import scala.concurrent.{ExecutionContext, Future}

class ChunkIteratee(chunkedResponder: ActorRef) extends Iteratee[HttpData, Unit] {
  import ChunkedResponder._
  private implicit val timeout = Timeout(30.seconds)

  def fold[B](folder: (Step[HttpData, Unit]) => Future[B])(implicit ec: ExecutionContext): Future[B] = {
    def waitForAck(future: Future[Any]):Iteratee[HttpData, Unit] = Iteratee.flatten(future.map(_ => this))

    def step(input: Input[HttpData]):Iteratee[HttpData, Unit] = input match {
      case Input.El(e) => waitForAck(chunkedResponder ? Chunk(e))
      case Input.Empty => waitForAck(Future.successful(Unit))
      case Input.EOF =>
        chunkedResponder ! Shutdown
        Done(Unit, Input.EOF)
    }

    folder(Step.Cont(step))
  }
}

      

package.scala

import akka.actor.{ActorContext, ActorRefFactory, Props}
import play.api.libs.iteratee.Enumerator
import spray.http.{HttpData, ContentType}
import spray.routing.RequestContext

import scala.concurrent.ExecutionContext

package object chunkedresponses {
  implicit class ChunkedRequestContext(requestContext: RequestContext) {
    def completeChunked(contentType: ContentType, enumerator: Enumerator[HttpData])
                       (implicit executionContext: ExecutionContext, actorRefFactory: ActorRefFactory) {
      val chunkedResponder = actorRefFactory.actorOf(Props(new ChunkedResponder(contentType, requestContext.responder)))
      val iteratee = new ChunkIteratee(chunkedResponder)
      enumerator.run(iteratee)
    }
  }
}

      

0


source


I think the idea is that you implement Iteratee

, the method fold

only invokes the called callback after receiving the Spray Ack. Something like:

def handleData(input: Input[String]) = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]): Future[B] = {
    (sprayActor ? input).flatMap {
      case success => folder(Cont(handleData))
      case error => folder(Error(...))
      case done => ...
    }
  }
}

val initialIteratee = new Iteratee[String] {
  def fold[B](folder: Step[Error, String] => Future[B]) = folder(Cont(handleData))
}

enumerator.run(initialIteratee)

      



This should be non-blocking, but ensures that the next chunk is only sent after the previous chunk has succeeded.

+1


source







All Articles