Akka persistent custom java plugin
I am currently writing my own API plugin Akka
SyncWriteJournal
to implement a connection to HSQLDB
.
The problem is I don't understand the requirements for the method doAsyncReplayMessages
. It says that he needs to return the future and that all messages should be invoked replayCallback
.
Let's say that I have a query that returns a list of posts List<Message> messages
. Can anyone provide a minimal example (with explanation) of usage replayCallback
and Future
for correct method implementation using this list? How do replayCallback
both Future
work together and what should be returned by the method doAsyncReplayMessages
?
Thank!
-Edit -
With some comments, I hacked into an implementation that is not complete but includes the suggested idea:
public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
final Procedure<PersistentRepr> replayCallback) {
final ExecutionContext ec = context().system().dispatcher();
final Future<Void> future = Futures.future(new Callable<Void>() {
@Override
public Void call() throws Exception {
final List<Message> messages = getMessages();
for (int i = 0; i < feedbackList.size(); i++) {
replayCallback.apply(
new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
}
return null;
}
}, ec);
return future;
}
As you may have noticed, this missed a few key concepts that I am still missing. PersistentImpl needs one more argument Seq<String> confirm
, which one more null
. And, perhaps more importantly, I am returning null
since the future expects Void
both a return type and I am not sure how to implement this. It is currently throwing NPE:
[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
at akka.actor.ActorCell.invoke(ActorCell.scala:487)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
at akka.dispatch.Mailbox.run(Mailbox.scala:220)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
at java.lang.Thread.run(Thread.java:724)
source to share
You can simply complete the locking operation in the future, for example Future { fetchStuff() }
.
You can refer to dnvriend / akka-persistence-jdbc: JdbcSyncWriteJournal for a full blown synchronous journal implementation.
source to share