Akka Stream: what does mapMaterializedValue mean

I have read "Akka Thread Materialization Concept" , and understand that thread materialization:

the process of obtaining a description of the flow (graph) and allocating all the resources it needs to start.

I followed one example to build an akka stream using mapMaterializedValue to post a message to the queue. The purpose of the code is to send a message to the queue after the blue print of the stream has been created, and the code works, but I really don't understand what the mapMaterrializaedValue does in the code:

Promise<SourceQueueWithComplete<String>> promise = new Promise.DefaultPromise<>();

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail())
    .mapMaterializaedValue(queue -> {
        promise.trySuccess(queue);
    });

source.toMat(Sink.foreach(x -> System.out.println(x)), Keep.left()).run(materIalizer);

promise.<SourceQueueWithComplete<String>>future().map(mapMapperFunction(), actorSystem.dispatcher());

      

+3


source to share


1 answer


The goal mapMaterializedValue

is to transform the materialized value immediately after it materializes. For example, suppose you have a third party library that accepts a callback like this:

interface Callback<T> {
    void onNext(T next);
    void onError(Throwable t);
    void onComplete();
}

      

Then you can create a method that returns Source<T, Callback<T>>

, the materialized value of which you can immediately pass to this third-party library when the thread is actually running:

<T> Source<T, Callback<T>> callbackSource() {
    return Source.queue(1024, OverflowStrategy.fail())
        .mapMaterializedValue(queue -> new Callback<T> {
            // an implementation of Callback which pushes the data
            // to the queue
        });
}

Source<Integer, Callback<Integer>> source = callbackSource();

Callback<Integer> callback = source
    .toMat(Sink.foreach(System.out::println), Keep.left())
    .run(materializer);

thirdPartyApiObject.runSomethingWithCallback(callback);

      



You can see here that it can simplify code that needs to use this kind of third party API because you are doing this queue -> callback transformation only once and encapsulating it in a method.

In your case, however, you really don't need this. You are using mapMaterializedValue

to fulfill the outer promise with the materialized value, which is completely unnecessary, since you can simply use the materialized value after materializing it directly:

Source<String, SourceQueueWithComplete<String>> s = Source
    .queue(100, OverflowStrategy.fail());

SourceQueueWithComplete<String> queue = source
    .toMat(Sink.foreach(x -> System.out.println(x)), Keep.left())
    .run(materIalizer);

mapMapperFunction().apply(queue);

      

+4


source







All Articles