Integrating streaming events from eventstore with akka akka using akka streams in Java

I'm working with GetEventStore as a log provider for events persisted by akka-persistence and accessing akka.persistence.query.javadsl to request events from the eventstore. The actor system and log provider are configured with spring.

The eventstore configuration is as follows:

eventstore {
  # IP & port of Event Store
  address {
    host = "xxxx"
    port = 1113
  }

  http {
    protocol = "http"
    port = 2113
    prefix = ""
  }

  # The desired connection timeout
  connection-timeout = 10s

  # Maximum number of reconnections before backing, -1 to reconnect forever
  max-reconnections = 100

  reconnection-delay {
    # Delay before first reconnection
    min = 250ms
    # Maximum delay on reconnections
    max = 1s
  }

  # The default credentials to use for operations where others are not explicitly supplied.
  credentials {
    login = "admin"
    password = "changeit"
  }

  heartbeat {
    # The interval at which to send heartbeat messages.
    interval = 500ms
    # The interval after which an unacknowledged heartbeat will cause the connection to be considered faulted and disconnect.
    timeout = 5s
  }

  operation {
    # The maximum number of operation retries
    max-retries = 10
    # The amount of time before an operation is considered to have timed out
    timeout = 500s
  }

  # Whether to resolve LinkTo events automatically
  resolve-linkTos = false

  # Whether or not to require EventStore to refuse serving read or write request if it is not master
  require-master = true

  # Number of events to be retrieved by client as single message
  read-batch-size = 990

  # The size of the buffer in element count
  buffer-size = 100000

  # Strategy that is used when elements cannot fit inside the buffer
  # Possible values DropHead, DropTail, DropBuffer, DropNew, Fail
  buffer-overflow-strategy = "DropHead"

  # The number of serialization/deserialization functions to be run in parallel
  serialization-parallelism = 8

  # Serialization done asynchronously and these futures may complete in any order,
  # but results will be used with preserved order if set to true
  serialization-ordered = true

  cluster {
    # Endpoints for seeding gossip
    # For example: ["127.0.0.1:1", "127.0.0.2:2"]
    gossip-seeds = []

    # The DNS name to use for discovering endpoints
    dns = null

    # The time given to resolve dns
    dns-lookup-timeout = 2s

    # The well-known endpoint on which cluster managers are running
    external-gossip-port = 30778

    # Maximum number of attempts for discovering endpoints
    max-discover-attempts = 10

    # The interval between cluster discovery attempts
    discover-attempt-interval = 500ms

    # The interval at which to keep discovering cluster
    discovery-interval = 1s

    # Timeout for cluster gossip
    gossip-timeout = 1s
  }

  persistent-subscription {
    # Whether to resolve LinkTo events automatically
    resolve-linkTos = false

    # Where the subscription should start from (position)
    start-from = last

    # Whether or not in depth latency statistics should be tracked on this subscription.
    extra-statistics = false

    # The amount of time after which a message should be considered to be timedout and retried.
    message-timeout = 30s

    # The maximum number of retries (due to timeout) before a message get considered to be parked
    max-retry-count = 500

    # The size of the buffer listening to live messages as they happen
    live-buffer-size = 500

    # The number of events read at a time when paging in history
    read-batch-size = 100

    # The number of events to cache when paging through history
    history-buffer-size = 20

    # The amount of time to try to checkpoint after
    checkpoint-after = 2s

    # The minimum number of messages to checkpoint
    min-checkpoint-count = 10

    # The maximum number of messages to checkpoint if this number is a reached a checkpoint will be forced.
    max-checkpoint-count = 1000

    # The maximum number of subscribers allowed
    max-subscriber-count = 0

    # The [[ConsumerStrategy]] to use for distributing events to client consumers
    # Known are RoundRobin, DispatchToSingle
    # however you can provide a custom one, just make sure it is supported by server
    consumer-strategy = RoundRobin
  }
}

      

The log provider code looks like this:

package com.org.utils;

import static akka.stream.ActorMaterializer.create;
import static java.util.concurrent.CompletableFuture.allOf;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.function.Predicate;
import akka.japi.function.Procedure;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.AllPersistenceIdsQuery;
import akka.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import akka.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import akka.persistence.query.javadsl.EventsByPersistenceIdQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import lombok.extern.log4j.Log4j;

@Service
@Log4j
public class JournalProvider {
  private ActorSystem system;
  private ReadJournal readJournal;

  @Autowired
  public JournalProvider(ActorSystem system) {
    super();
    this.system = system;
  }

  @SuppressWarnings({ "rawtypes", "unchecked" })
  public ReadJournal journal(ActorSystem system) {
    if (readJournal == null) {
      String queryJournalClass = system.settings().config().getString("queryJournalClass");
      String queryIdentifier = system.settings().config().getString("queryIdentifier");

      if (queryJournalClass == null || queryIdentifier == null) {
        throw new RuntimeException(
            "Please set queryIdentifier and queryJournalClass variables in application.conf or reference.conf");
      }

      try {
        Class clasz = Class.forName(queryJournalClass);
        readJournal = PersistenceQuery.get(system).getReadJournalFor(clasz, queryIdentifier);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("Caught exception : " + e);
      }
    }

    return readJournal;
  }

  public CompletableFuture<Void> runForEachId(Procedure<EventEnvelope> function,
      Map<String, Long> idsWithStartSequenceNr) {
    List<CompletableFuture<Done>> allFutures = new ArrayList<>();

    for (String id : idsWithStartSequenceNr.keySet()) {
      Long fromSequenceNr = idsWithStartSequenceNr.get(id);

      CompletionStage<Done> mapPreparedCompletionStage = runForEachEvent(id, fromSequenceNr, function);
      allFutures.add(mapPreparedCompletionStage.toCompletableFuture());
    }
    CompletableFuture<Void> combinedFuture = allOf(allFutures.toArray(new CompletableFuture[0]));
    return combinedFuture;
  }

  public CompletionStage<Done> runForEachEvent(String id, long sequenceNr, Procedure<EventEnvelope> function) {
    ActorMaterializer materializer = ActorMaterializer.create(system);
    Source<EventEnvelope, NotUsed> eventsForId = ((CurrentEventsByPersistenceIdQuery) journal(system))
        .currentEventsByPersistenceId(id, sequenceNr, Long.MAX_VALUE);
    return eventsForId.runForeach(function, materializer);
  }

  public final List<Object> fetchEventsByPersistenceId1(String id, Predicate<EventEnvelope> filter) {
    List<Object> allEvents = new ArrayList<>();
    try {
      ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE)
          .filter(filter).runForeach((event) -> allEvents.add(event.event()), create(system)).toCompletableFuture()
          .get();
    } catch (InterruptedException | ExecutionException e) {
      log.error(" Error while getting currentEventsForPersistenceId for id " + id, e);
    }
    return allEvents;
  }

  public List<Object> fetchEventsByPersistenceId(String id) {
    List<Object> allEvents = new ArrayList<>();
    try {
      ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE)
          .runForeach((event) -> allEvents.add(event.event()), create(system)).toCompletableFuture()
          .get();
    } catch (InterruptedException | ExecutionException e) {
      log.error(" Error while getting currentEventsForPersistenceId for id " + id, e);
    }
    return allEvents;
  }


  @SafeVarargs
  public final List<String> currentPersistenceIds(Materializer materializer, Predicate<String>... filters)
      throws InterruptedException, ExecutionException {
    Source<String, NotUsed> currentPersistenceIds = ((CurrentPersistenceIdsQuery) journal(system))
        .currentPersistenceIds();

    for (Predicate<String> filter : filters)
      currentPersistenceIds = currentPersistenceIds.filter(filter);

    List<String> allIds = new ArrayList<String>();
    CompletionStage<Done> allIdCompletionStage = currentPersistenceIds.runForeach(id -> allIds.add(id), materializer);
    allIdCompletionStage.toCompletableFuture().get();
    return allIds;
  }

  @SafeVarargs
  public final Source<String, NotUsed> allPersistenceIds(Predicate<String>... filters) {
    Source<String, NotUsed> allPersistenceIds = ((AllPersistenceIdsQuery) journal(system)).allPersistenceIds();

    for (Predicate<String> filter : filters)
      allPersistenceIds = allPersistenceIds.filter(filter);

    return allPersistenceIds;
  }

  public final Source<EventEnvelope, NotUsed> currentEventsSourceForPersistenceId(String id) {
    return ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE);
  }

  public final Source<EventEnvelope, NotUsed> allEventsSourceForPersistenceId(String id) {
    return allEventsSourceForPersistenceId(id, 0, Long.MAX_VALUE);
  }

  public final Source<EventEnvelope, NotUsed> allEventsSourceForPersistenceId(String id, long from, long to) {
    return ((EventsByPersistenceIdQuery) journal(system)).eventsByPersistenceId(id, from, to);
  }
}

      

The Eventstore is populated with appropriate events through the actor system, and the following code checks for incoming events and consumes them through the actor as a receiver.

The problem I am facing is that some messages are discarded and not all events are fed to the stream mapping function.

package com.org.utils;

import static com.wt.utils.akka.SpringExtension.SpringExtProvider;

import java.util.List;
import java.util.concurrent.ExecutionException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.stereotype.Service;

import com.org.domain.Ad;
import com.wt.domain.Px;
import com.wt.domain.repo.AdRepo;
import com.wt.domain.write.events.AdCalc;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.MergeHub;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

@Service("Tester")
public class Tester {
  private AdRepo adRepo;
  private ActorRef readerActor;
  private ActorMaterializer materializer;
  private JournalProvider provider;

  @Autowired
  public Tester(JournalProvider provider, AdRepo adRepo, ActorSystem system) {
    super();
    this.provider = provider;
    this.adRepo= adRepo;
    this.readerActor = system.actorOf(SpringExtProvider.get(system).props("ReaderActor"), "reader-actor");
    this.materializer = ActorMaterializer.create(system);
  }

  public void testerFunction() throws InterruptedException, ExecutionException {
// retrieve events of type Event1 from eventstore
    Source<Event1, NotUsed> event1 = provider.allEventsSourceForPersistenceId("persistence-id")
        .filter(evt -> evt.event() instanceof Event1)
        .map(evt -> (Event1) evt.event());

// fetch a list of domain object of type Ad from the repository
    List<Ad> adSym= adRepo.findBySymbol("symbol-name");

    Ad ad = adSym.stream().findAny().get();

// map the event1 source stream to AdCalc domain event source stream
// the ad.calculator function returns a source of AdCalc domain event source stream
// Here lies the issue. Not all the event1 source objects are being converted to
// AdCalc domain event objects and are being dropped
    Source<AdCalc, NotUsed> adCalcResult = event1.map(evt-> ad.calculator(evt, evt.getData());                                                                    

    Sink<AdCalc, NotUsed> consumer = Sink.actorRef(readerActor, PoisonPill.getInstance());

    RunnableGraph<Sink<AdCalc, NotUsed>> runnableGraph = MergeHub
        .of(AdCalc.class).to(consumer);

    Sink<AdCalc, NotUsed> resultAggregator = runnableGraph.run(materializer);

    adCalcResult .runWith(resultAggregator , materializer);

  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    try (AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(CoreAppConfiguration.class)) {
      Tester tester= (Tester) ctx
          .getBean("Tester");
      Tester.historicalPerformance();
    }

  }

}

      

Here is the actor doing the processing

package com.org.utils;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import com.wt.domain.write.events.AdCalc;

import akka.actor.UntypedActor;

@Scope("prototype")
@Service("ReaderActor")
public class ReaderActor extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof AdCalc) {
      final AdCacl adCalculation = (AdCalc) message;


 // the above event also consists a timestamp and by that and ofcourse the persistence id of the events in the eventstore,
// i realize that not all events are being processed and are being dropped
      System.out.println(adCalculation);
    } else
      unhandled(message);

    context().system().stop(getSelf());
  }
}

      

The problem mentioned in the above code snippets:

  • The incoming source stream is discarding many events and that some events are not passed on to the actor.
  • I need help with the syntax for integrating the mapAsync stream as the doc gives the compilation.
  • The syntax for actorWithRef again for thread integration will be very helpful. Akka documentation doesn't have this.

Thanks a ton!

+3


source to share





All Articles