Akka persistent custom java plugin

I am currently writing my own API plugin Akka

SyncWriteJournal

to implement a connection to HSQLDB

.

The problem is I don't understand the requirements for the method doAsyncReplayMessages

. It says that he needs to return the future and that all messages should be invoked replayCallback

.

Let's say that I have a query that returns a list of posts List<Message> messages

. Can anyone provide a minimal example (with explanation) of usage replayCallback

and Future

for correct method implementation using this list? How do replayCallback

both Future

work together and what should be returned by the method doAsyncReplayMessages

?

Thank!

-Edit -

With some comments, I hacked into an implementation that is not complete but includes the suggested idea:

public Future<Void> doAsyncReplayMessages(final String persistenceId, long fromSequenceNr, long toSequenceNr, long max,
        final Procedure<PersistentRepr> replayCallback) {
    final ExecutionContext ec = context().system().dispatcher();

    final Future<Void> future = Futures.future(new Callable<Void>() {
        @Override
        public Void call() throws Exception {
            final List<Message> messages = getMessages();
            for (int i = 0; i < feedbackList.size(); i++) {
                replayCallback.apply(
                        new PersistentImpl(messages.get(i), i, persistenceId, false, null, null));
            }
            return null;
        }
    }, ec);

    return future;
}

      

As you may have noticed, this missed a few key concepts that I am still missing. PersistentImpl needs one more argument Seq<String> confirm

, which one more null

. And, perhaps more importantly, I am returning null

since the future expects Void

both a return type and I am not sure how to implement this. It is currently throwing NPE:

[ERROR] [08/28/2014 12:31:19.582] [akkaSystem-akka.actor.default-dispatcher-7] [akka://akkaSystem/system/journal] null
java.lang.NullPointerException
    at akka.persistence.journal.japi.AsyncRecovery.asyncReadHighestSequenceNr(AsyncRecovery.scala:26)
    at akka.persistence.journal.SyncWriteJournal$$anonfun$receive$1.applyOrElse(SyncWriteJournal.scala:53)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465)
    at akka.persistence.journal.japi.SyncWriteJournal.aroundReceive(SyncWriteJournal.scala:16)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516)
    at akka.actor.ActorCell.invoke(ActorCell.scala:487)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238)
    at akka.dispatch.Mailbox.run(Mailbox.scala:220)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
    at java.lang.Thread.run(Thread.java:724)

      

+3


source to share


1 answer


You can simply complete the locking operation in the future, for example Future { fetchStuff() }

.



You can refer to dnvriend / akka-persistence-jdbc: JdbcSyncWriteJournal for a full blown synchronous journal implementation.

+2


source







All Articles