Integrating streaming events from eventstore with akka akka using akka streams in Java

I'm working with GetEventStore as a log provider for events persisted by akka-persistence and accessing akka.persistence.query.javadsl to request events from the eventstore. The actor system and log provider are configured with spring.

The eventstore configuration is as follows:

eventstore {
  # IP & port of Event Store
  address {
    host = "xxxx"
    port = 1113
  }

  http {
    protocol = "http"
    port = 2113
    prefix = ""
  }

  # The desired connection timeout
  connection-timeout = 10s

  # Maximum number of reconnections before backing, -1 to reconnect forever
  max-reconnections = 100

  reconnection-delay {
    # Delay before first reconnection
    min = 250ms
    # Maximum delay on reconnections
    max = 1s
  }

  # The default credentials to use for operations where others are not explicitly supplied.
  credentials {
    login = "admin"
    password = "changeit"
  }

  heartbeat {
    # The interval at which to send heartbeat messages.
    interval = 500ms
    # The interval after which an unacknowledged heartbeat will cause the connection to be considered faulted and disconnect.
    timeout = 5s
  }

  operation {
    # The maximum number of operation retries
    max-retries = 10
    # The amount of time before an operation is considered to have timed out
    timeout = 500s
  }

  # Whether to resolve LinkTo events automatically
  resolve-linkTos = false

  # Whether or not to require EventStore to refuse serving read or write request if it is not master
  require-master = true

  # Number of events to be retrieved by client as single message
  read-batch-size = 990

  # The size of the buffer in element count
  buffer-size = 100000

  # Strategy that is used when elements cannot fit inside the buffer
  # Possible values DropHead, DropTail, DropBuffer, DropNew, Fail
  buffer-overflow-strategy = "DropHead"

  # The number of serialization/deserialization functions to be run in parallel
  serialization-parallelism = 8

  # Serialization done asynchronously and these futures may complete in any order,
  # but results will be used with preserved order if set to true
  serialization-ordered = true

  cluster {
    # Endpoints for seeding gossip
    # For example: ["127.0.0.1:1", "127.0.0.2:2"]
    gossip-seeds = []

    # The DNS name to use for discovering endpoints
    dns = null

    # The time given to resolve dns
    dns-lookup-timeout = 2s

    # The well-known endpoint on which cluster managers are running
    external-gossip-port = 30778

    # Maximum number of attempts for discovering endpoints
    max-discover-attempts = 10

    # The interval between cluster discovery attempts
    discover-attempt-interval = 500ms

    # The interval at which to keep discovering cluster
    discovery-interval = 1s

    # Timeout for cluster gossip
    gossip-timeout = 1s
  }

  persistent-subscription {
    # Whether to resolve LinkTo events automatically
    resolve-linkTos = false

    # Where the subscription should start from (position)
    start-from = last

    # Whether or not in depth latency statistics should be tracked on this subscription.
    extra-statistics = false

    # The amount of time after which a message should be considered to be timedout and retried.
    message-timeout = 30s

    # The maximum number of retries (due to timeout) before a message get considered to be parked
    max-retry-count = 500

    # The size of the buffer listening to live messages as they happen
    live-buffer-size = 500

    # The number of events read at a time when paging in history
    read-batch-size = 100

    # The number of events to cache when paging through history
    history-buffer-size = 20

    # The amount of time to try to checkpoint after
    checkpoint-after = 2s

    # The minimum number of messages to checkpoint
    min-checkpoint-count = 10

    # The maximum number of messages to checkpoint if this number is a reached a checkpoint will be forced.
    max-checkpoint-count = 1000

    # The maximum number of subscribers allowed
    max-subscriber-count = 0

    # The [[ConsumerStrategy]] to use for distributing events to client consumers
    # Known are RoundRobin, DispatchToSingle
    # however you can provide a custom one, just make sure it is supported by server
    consumer-strategy = RoundRobin
  }
}

      

The log provider code looks like this:

package com.org.utils;

import static akka.stream.ActorMaterializer.create;
import static java.util.concurrent.CompletableFuture.allOf;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

import akka.Done;
import akka.NotUsed;
import akka.actor.ActorSystem;
import akka.japi.function.Predicate;
import akka.japi.function.Procedure;
import akka.persistence.query.EventEnvelope;
import akka.persistence.query.PersistenceQuery;
import akka.persistence.query.javadsl.AllPersistenceIdsQuery;
import akka.persistence.query.javadsl.CurrentEventsByPersistenceIdQuery;
import akka.persistence.query.javadsl.CurrentPersistenceIdsQuery;
import akka.persistence.query.javadsl.EventsByPersistenceIdQuery;
import akka.persistence.query.javadsl.ReadJournal;
import akka.stream.ActorMaterializer;
import akka.stream.Materializer;
import akka.stream.javadsl.Source;
import lombok.extern.log4j.Log4j;

@Service
@Log4j
public class JournalProvider {
  private ActorSystem system;
  private ReadJournal readJournal;

  @Autowired
  public JournalProvider(ActorSystem system) {
    super();
    this.system = system;
  }

  @SuppressWarnings({ "rawtypes", "unchecked" })
  public ReadJournal journal(ActorSystem system) {
    if (readJournal == null) {
      String queryJournalClass = system.settings().config().getString("queryJournalClass");
      String queryIdentifier = system.settings().config().getString("queryIdentifier");

      if (queryJournalClass == null || queryIdentifier == null) {
        throw new RuntimeException(
            "Please set queryIdentifier and queryJournalClass variables in application.conf or reference.conf");
      }

      try {
        Class clasz = Class.forName(queryJournalClass);
        readJournal = PersistenceQuery.get(system).getReadJournalFor(clasz, queryIdentifier);
      } catch (ClassNotFoundException e) {
        throw new RuntimeException("Caught exception : " + e);
      }
    }

    return readJournal;
  }

  public CompletableFuture<Void> runForEachId(Procedure<EventEnvelope> function,
      Map<String, Long> idsWithStartSequenceNr) {
    List<CompletableFuture<Done>> allFutures = new ArrayList<>();

    for (String id : idsWithStartSequenceNr.keySet()) {
      Long fromSequenceNr = idsWithStartSequenceNr.get(id);

      CompletionStage<Done> mapPreparedCompletionStage = runForEachEvent(id, fromSequenceNr, function);
      allFutures.add(mapPreparedCompletionStage.toCompletableFuture());
    }
    CompletableFuture<Void> combinedFuture = allOf(allFutures.toArray(new CompletableFuture[0]));
    return combinedFuture;
  }

  public CompletionStage<Done> runForEachEvent(String id, long sequenceNr, Procedure<EventEnvelope> function) {
    ActorMaterializer materializer = ActorMaterializer.create(system);
    Source<EventEnvelope, NotUsed> eventsForId = ((CurrentEventsByPersistenceIdQuery) journal(system))
        .currentEventsByPersistenceId(id, sequenceNr, Long.MAX_VALUE);
    return eventsForId.runForeach(function, materializer);
  }

  public final List<Object> fetchEventsByPersistenceId1(String id, Predicate<EventEnvelope> filter) {
    List<Object> allEvents = new ArrayList<>();
    try {
      ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE)
          .filter(filter).runForeach((event) -> allEvents.add(event.event()), create(system)).toCompletableFuture()
          .get();
    } catch (InterruptedException | ExecutionException e) {
      log.error(" Error while getting currentEventsForPersistenceId for id " + id, e);
    }
    return allEvents;
  }

  public List<Object> fetchEventsByPersistenceId(String id) {
    List<Object> allEvents = new ArrayList<>();
    try {
      ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE)
          .runForeach((event) -> allEvents.add(event.event()), create(system)).toCompletableFuture()
          .get();
    } catch (InterruptedException | ExecutionException e) {
      log.error(" Error while getting currentEventsForPersistenceId for id " + id, e);
    }
    return allEvents;
  }


  @SafeVarargs
  public final List<String> currentPersistenceIds(Materializer materializer, Predicate<String>... filters)
      throws InterruptedException, ExecutionException {
    Source<String, NotUsed> currentPersistenceIds = ((CurrentPersistenceIdsQuery) journal(system))
        .currentPersistenceIds();

    for (Predicate<String> filter : filters)
      currentPersistenceIds = currentPersistenceIds.filter(filter);

    List<String> allIds = new ArrayList<String>();
    CompletionStage<Done> allIdCompletionStage = currentPersistenceIds.runForeach(id -> allIds.add(id), materializer);
    allIdCompletionStage.toCompletableFuture().get();
    return allIds;
  }

  @SafeVarargs
  public final Source<String, NotUsed> allPersistenceIds(Predicate<String>... filters) {
    Source<String, NotUsed> allPersistenceIds = ((AllPersistenceIdsQuery) journal(system)).allPersistenceIds();

    for (Predicate<String> filter : filters)
      allPersistenceIds = allPersistenceIds.filter(filter);

    return allPersistenceIds;
  }

  public final Source<EventEnvelope, NotUsed> currentEventsSourceForPersistenceId(String id) {
    return ((CurrentEventsByPersistenceIdQuery) journal(system)).currentEventsByPersistenceId(id, 0, Long.MAX_VALUE);
  }

  public final Source<EventEnvelope, NotUsed> allEventsSourceForPersistenceId(String id) {
    return allEventsSourceForPersistenceId(id, 0, Long.MAX_VALUE);
  }

  public final Source<EventEnvelope, NotUsed> allEventsSourceForPersistenceId(String id, long from, long to) {
    return ((EventsByPersistenceIdQuery) journal(system)).eventsByPersistenceId(id, from, to);
  }
}

      

The Eventstore is populated with appropriate events through the actor system, and the following code checks for incoming events and consumes them through the actor as a receiver.

The problem I am facing is that some messages are discarded and not all events are fed to the stream mapping function.

package com.org.utils;

import static com.wt.utils.akka.SpringExtension.SpringExtProvider;

import java.util.List;
import java.util.concurrent.ExecutionException;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.AnnotationConfigApplicationContext;
import org.springframework.stereotype.Service;

import com.org.domain.Ad;
import com.wt.domain.Px;
import com.wt.domain.repo.AdRepo;
import com.wt.domain.write.events.AdCalc;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.PoisonPill;
import akka.stream.ActorMaterializer;
import akka.stream.javadsl.MergeHub;
import akka.stream.javadsl.RunnableGraph;
import akka.stream.javadsl.Sink;
import akka.stream.javadsl.Source;

@Service("Tester")
public class Tester {
  private AdRepo adRepo;
  private ActorRef readerActor;
  private ActorMaterializer materializer;
  private JournalProvider provider;

  @Autowired
  public Tester(JournalProvider provider, AdRepo adRepo, ActorSystem system) {
    super();
    this.provider = provider;
    this.adRepo= adRepo;
    this.readerActor = system.actorOf(SpringExtProvider.get(system).props("ReaderActor"), "reader-actor");
    this.materializer = ActorMaterializer.create(system);
  }

  public void testerFunction() throws InterruptedException, ExecutionException {
// retrieve events of type Event1 from eventstore
    Source<Event1, NotUsed> event1 = provider.allEventsSourceForPersistenceId("persistence-id")
        .filter(evt -> evt.event() instanceof Event1)
        .map(evt -> (Event1) evt.event());

// fetch a list of domain object of type Ad from the repository
    List<Ad> adSym= adRepo.findBySymbol("symbol-name");

    Ad ad = adSym.stream().findAny().get();

// map the event1 source stream to AdCalc domain event source stream
// the ad.calculator function returns a source of AdCalc domain event source stream
// Here lies the issue. Not all the event1 source objects are being converted to
// AdCalc domain event objects and are being dropped
    Source<AdCalc, NotUsed> adCalcResult = event1.map(evt-> ad.calculator(evt, evt.getData());                                                                    

    Sink<AdCalc, NotUsed> consumer = Sink.actorRef(readerActor, PoisonPill.getInstance());

    RunnableGraph<Sink<AdCalc, NotUsed>> runnableGraph = MergeHub
        .of(AdCalc.class).to(consumer);

    Sink<AdCalc, NotUsed> resultAggregator = runnableGraph.run(materializer);

    adCalcResult .runWith(resultAggregator , materializer);

  }

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    try (AnnotationConfigApplicationContext ctx = new AnnotationConfigApplicationContext(CoreAppConfiguration.class)) {
      Tester tester= (Tester) ctx
          .getBean("Tester");
      Tester.historicalPerformance();
    }

  }

}

      

Here is the actor doing the processing

package com.org.utils;

import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Service;

import com.wt.domain.write.events.AdCalc;

import akka.actor.UntypedActor;

@Scope("prototype")
@Service("ReaderActor")
public class ReaderActor extends UntypedActor {

  public void onReceive(Object message) throws Exception {
    if (message instanceof AdCalc) {
      final AdCacl adCalculation = (AdCalc) message;


 // the above event also consists a timestamp and by that and ofcourse the persistence id of the events in the eventstore,
// i realize that not all events are being processed and are being dropped
      System.out.println(adCalculation);
    } else
      unhandled(message);

    context().system().stop(getSelf());
  }
}

      

The problem mentioned in the above code snippets:

  • The incoming source stream is discarding many events and that some events are not passed on to the actor.
  • I need help with the syntax for integrating the mapAsync stream as the doc gives the compilation.
  • The syntax for actorWithRef again for thread integration will be very helpful. Akka documentation doesn't have this.

Thanks a ton!

+3
java spring get-event-store akka-stream


source to share


No one has answered this question yet

Check out similar questions:

2108
How can I call one constructor from another in Java?
1818
How to get enum value from string value in Java?
1376
How to create Java string from file content?
712
How to convert Java 8 stream to array?
644
Removing spaces from strings in Java
167
How to get started with Akka Streams?
2
Handle Akka HttpRequest with Actors?
0
order of events when using akka streams
0
akka-streams + akka Actors: mapAsyncUnordered + ask Problem with template



All Articles
Loading...
X
Show
Funny
Dev
Pics