Akka Observer Captures the Future
I am trying to develop an application using Futures and Akka supervisors, but when an actor's rejection returns in the future, his supervisor gets no exception.
Here is my code.
1) Supervisor-actor
class TransaccionActorSupervisor() extends Actor with ActorLogging {
val actor: ActorRef = context.actorOf(Props[TransaccionActor].withRouter(RoundRobinPool(nrOfInstances = 5)), "transaccion-actor")
def receive = {
case msg: Any => actor forward msg
}
override val supervisorStrategy = OneForOneStrategy() {
case exception =>
println("<<<<<<<<<<<<<<<<<<< IN SUPERVISOR >>>>>>>>>>>>>>>>>>>>>>>>>>>>")
Restart
}
}
Watched actor
Class TransaccionActor() extends Actor with ActorLogging {
implicit val _: ExecutionContext = context.dispatcher
val transaccionAdapter = (new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter
def receive = {
case msg: GetTransaccionById =>
val currentSender: ActorRef = sender()
transaccionAdapter.searchTransaction(msg.id).onComplete {
case Success(transaction) => currentSender ! transaction
case Failure(error) => throw error
}
}
What am I doing wrong?
Thanks everyone!
source to share
I had the same problem and Ryan's answer helped. But since I'm new to Akka, it wasn't trivial to understand the answer, so I would like to provide some details.
First, I think onComplete
it won't work at all. It just registers a callback function that can be called on a completely separate thread and doesn't return a new one Future
. Therefore, any exception thrown internally onComplete
will be lost.
Instead, use the map
, recover
, recoverWith
, or transform
, as they returned to the new Future
s. Then you need to pass the result to the result to the actor, eg. self
; the receiving actor should pipe the result and recover the exception.
In other words, your controlled actor should look like this:
import akka.pattern.pipe
import akka.actor.Status.Failure
import akka.actor.Actor
class TransaccionActor() extends Actor with ActorLogging {
import context.dispatcher
val transaccionAdapter =
(new TransaccionComponentImpl with TransaccionRepositoryComponentImpl).adapter
def receive = {
case msg: GetTransaccionById =>
val currentSender: ActorRef = sender()
transaccionAdapter searchTransaction msg.id map { transaction =>
currentSender ! transaction
} pipeTo self
case Failure(throwable) => throw throwable
}
}
source to share