Akka Observer Captures the Future

I am trying to develop an application using Futures and Akka supervisors, but when an actor's rejection returns in the future, his supervisor gets no exception.

Here is my code.

1) Supervisor-actor

class TransaccionActorSupervisor() extends Actor with ActorLogging {

  val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")

  def receive = {
    case msg: Any => actor forward msg
  }

  override val supervisorStrategy = OneForOneStrategy() {
    case exception =>
      println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
      Restart
  }

}

      

Watched actor

Class TransaccionActor() extends Actor with ActorLogging {

  implicit val _: ExecutionContext = context.dispatcher
  val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter.searchTransaction(msg.id).onComplete {
         case Success(transaction) => currentSender ! transaction
         case Failure(error) => throw error
      }

  }

      

What am I doing wrong?

Thanks everyone!

+3


source to share


2 answers


I had the same problem and Ryan's answer helped. But since I'm new to Akka, it wasn't trivial to understand the answer, so I would like to provide some details.

First, I think onComplete

it won't work at all. It just registers a callback function that can be called on a completely separate thread and doesn't return a new one Future

. Therefore, any exception thrown internally onComplete

will be lost.

Instead, use the map

, recover

, recoverWith

, or transform

, as they returned to the new Future

s. Then you need to pass the result to the result to the actor, eg. self

; the receiving actor should pipe the result and recover the exception.



In other words, your controlled actor should look like this:

import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor

class TransaccionActor() extends Actor with ActorLogging {

  import context.dispatcher

  val transaccionAdapter = 
    (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter

  def receive = {

    case msg: GetTransaccionById =>
      val currentSender: ActorRef = sender()
      transaccionAdapter searchTransaction msg.id map { transaction =>
         currentSender ! transaction
      } pipeTo self

    case Failure(throwable) => throw throwable

  }

}

      

+3


source


Exceptions thrown in the future in the actor are not caught by the actor. You will need to pass the exception in self

and then re-throw if you want the controlling actor to handle it.



0


source







All Articles