Multiple future calls in an actor receive method

I am trying to make two external calls (to the Redis database) inside the Actor method receive

. Both calls return Future

and I want the result of the first Future

in the second. I wrap both calls inside a Redis transaction so that no one changes the value in the database while I read it.

The internal state of the actor is updated based on the value of the second Future

.

This is what my current code looks like, which I am wrong because I am updating the internal state of the actor inside the callback Future.onComplete

.

I cannot use a template PipeTo

because I need both Futures to be in a transaction. If I use Await

for the former Future

, then my receive method will be blocked. Any idea how to fix this?

My second question has to do with how I use Future

s. Is this use correct Future

? Is there a better way to deal with multiple futures in general? Imagine if there were 3 or 4 Future depending on the previous one.

import akka.actor.{Props, ActorLogging, Actor}
import akka.util.ByteString
import redis.RedisClient

import scala.concurrent.Future
import scala.util.{Failure, Success}


object GetSubscriptionsDemo extends App {
  val akkaSystem = akka.actor.ActorSystem("redis-demo")
  val actor = akkaSystem.actorOf(Props(new SimpleRedisActor("localhost", "dummyzset")), name = "simpleactor")
  actor ! UpdateState
}

case object UpdateState

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  //required by Future
  implicit val ctx = context dispatcher

  var rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => {
      log.info("Start of UpdateState ...")

      val tran = rClient.transaction()

      val zf: Future[Long] = tran.zcard(key)  //FIRST Future 
      zf.onComplete {

        case Success(z) => {
          //SECOND Future, depends on result of FIRST Future 
          val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 
          rf.onComplete {
            case Success(x) => {
              //convert ByteString to UTF8 String
              val v = x.map(_.utf8String)
              log.info(s"Updating state with $v ")
              //update actor internal state inside callback for a Future
              //IS THIS CORRECT ?
              mutableState ++ v
            }
            case Failure(e) => {
              log.warning("ZRANGE future failed ...", e)
            }
          }
        }
        case Failure(f) => log.warning("ZCARD future failed ...", f)
      }
      tran.exec()

    }
  }

}

      

Compiles, but when I run it hits.

2014-08-07  INFO [redis-demo-akka.actor.default-dispatcher-3] a.e.s.Slf4jLogger - Slf4jLogger started
2014-08-07 04:38:35.106UTC INFO [redis-demo-akka.actor.default-dispatcher-3] e.c.s.e.a.g.SimpleRedisActor - Start of UpdateState ...
2014-08-07 04:38:35.134UTC INFO [redis-demo-akka.actor.default-dispatcher-8] r.a.RedisClientActor - Connect to localhost/127.0.0.1:6379
2014-08-07 04:38:35.172UTC INFO [redis-demo-akka.actor.default-dispatcher-4] r.a.RedisClientActor - Connected to localhost/127.0.0.1:6379

      

UPDATE 1

To use the template PipeTo

, I will need access to tran

and FIRST Future ( zf

) in the actor where I write Future

, because SECOND Future

depends on the value of ( z

) FIRST.

    //SECOND Future, depends on result of FIRST Future 
      val rf: Future[Seq[ByteString]] = tran.zrange(key, z - 1, z) 

      

+3


source to share


2 answers


Without knowing too much about the redis client you are using, I can come up with an alternative solution that should be cleaner and will have no problem closing more volatile state. The idea is to take advantage of a master / worker situation where the master (SimpleRedisActor) receives a request to do the work and then passes to the delegate the worker who does the work and responds to the state that needs to be updated. This solution will look something like this:

object SimpleRedisActor{
  case object UpdateState
  def props(ip:String, key:String) = Props(classOf[SimpleRedisActor], ip, key)
}

class SimpleRedisActor(ip: String, key: String) extends Actor with ActorLogging {
  import SimpleRedisActor._
  import SimpleRedisWorker._

  //mutable state that is updated on a periodic basis
  var mutableState: Set[String] = Set.empty

  val rClient = RedisClient(ip)(context.system)

  def receive = {
    case UpdateState => 
      log.info("Start of UpdateState ...")      
      val worker = context.actorOf(SimpleRedisWorker.props)
      worker ! DoWork(rClient, key)

    case WorkResult(result) =>
      mutableState ++ result

    case FailedWorkResult(ex) =>
      log.error("Worker got failed work result", ex)
  }
}

object SimpleRedisWorker{
  case class DoWork(client:RedisClient, key:String)
  case class WorkResult(result:Seq[String])
  case class FailedWorkResult(ex:Throwable)
  def props = Props[SimpleRedisWorker]
}

class SimpleRedisWorker extends Actor{
  import SimpleRedisWorker._
  import akka.pattern.pipe
  import context._

  def receive = {
    case DoWork(client, key) =>
      val trans = client.transaction()
      trans.zcard(key) pipeTo self
      become(waitingForZCard(sender, trans, key) orElse failureHandler(sender, trans))
  }

  def waitingForZCard(orig:ActorRef, trans:RedisTransaction, key:String):Receive = {      
    case l:Long =>
      trans.zrange(key, l -1, l) pipeTo self
      become(waitingForZRange(orig, trans) orElse failureHandler(orig, trans))
  }

  def waitingForZRange(orig:ActorRef, trans:RedisTransaction):Receive = {
    case s:Seq[ByteString] =>
      orig ! WorkResult(s.map(_.utf8String))
      finishAndStop(trans)
  }

  def failureHandler(orig:ActorRef, trans:RedisTransaction):Receive = {
    case Status.Failure(ex) => 
      orig ! FailedWorkResult(ex)
      finishAndStop(trans)   
  }

  def finishAndStop(trans:RedisTransaction) {
    trans.exec()
    context stop self
  }
}

      



The worker starts a transaction and then makes calls to redis and eventually ends the transaction before stopping himself. When it calls redis, it gets the future and returns to itself to continue processing, changing the receive method as the mechanism for displaying progress through its states. In a model like this (which I suppose is somewhat similar to the kernel error pattern), the master owns and protects the state by delegating the "risky" work to the child, who can figure out what the state should change, but the master still owns the change.

Now again, I have no idea about the capabilities of the redis client you are using and if it is secure enough to even do this sort of thing, but that is not entirely true. The point was to demonstrate a safer framework for doing something like this, including futures and states that need to be safely changed.

+1


source


Using a callback to change internal state is not a good idea, excerpt from akka docs :

When using future callbacks such as onComplete, onSuccess, and onFailure, within the members, you must carefully avoid closing the reference to the contained members, i.e. do not call methods or access mutable state on the surrounding actor from the callback.

Why are you worried about pipeTo and transactions? Not sure how redis transactions work, but I would assume the transaction will not include an onComplete callback in the second future.

I would put the state in a separate actor, which you will also forget in the future. Thus, you have a separate mailbox, and the order there will be the same as the order of messages that came to change the state. Also if any read requests come in, they will be placed in the correct order as well.



Edit to edit the edited question: Ok, so you don't want to pass the first future, which makes sense and shouldn't be a problem since the first callback is harmless. The second future callback is a problem as it manages state. But this future can be a channel without the need for transaction access.

So basically my suggestion:

val firstFuture = tran.zcard
firstFuture.onComplete {
   val secondFuture = tran.zrange
   secondFuture pipeTo stateActor
}

      

C stateActor

containing mutable state.

0


source







All Articles