Parallel dispatch of groupBy groups in reactor

I am learning Reactor and am wondering how to achieve certain behavior. Let's say I have a stream of incoming messages. Each message is associated with a specific entity and contains some data.

interface Message {
    String getEntityId();
    Data getData();
}

      

Messages related to different objects can be processed in parallel. However, messages pertaining to any particular entity must be processed one at a time, that is, processing of message 2 for an entity "abc"

cannot begin until after processing of message 1 for an entity "abc"

. While message processing continues, additional messages for this entiy must be buffered. Communication to other objects can be unimpeded. You might think about this, since on the thread of every object executing the code there is:

public void run() {
    for (;;) {
        // Blocks until there a message available
        Message msg = messageQueue.nextMessageFor(this.entityId);

        // Blocks until processing is finished
        processMessage(msg);
    }
}

      

How can I achieve this with React without blocking? The overall message rate may be high, but the message rate for a single entity will be very low. The set of objects can be very large and not necessarily known in advance.

I think it might look something like this, but I don't know.

{
    incomingMessages()
            .groupBy(Message::getEntityId)
            .flatMap(entityStream -> entityStream
                    /* ... */
                    .map(msg -> /* process the message */)))
                    /* ... */
}

public static Stream<Message> incomingMessages() { /* ... */ }

      

+3


source to share


1 answer


With ProjectReactor, you can solve it like this:

@Test
public void testMessages() {
    Flux.fromStream(incomingMessages())
            .groupBy(Message::getEntityId)
            .map(g -> g.publishOn(Schedulers.newParallel("groupByPool", 16))) //create new publisher for groups of messages
            .subscribe( //create consumer for main stream
                    stream ->
                            stream.subscribe(this::processMessage) // create consumer for group stream
            );
}

public Stream<Message> incomingMessages() {
    return IntStream.range(0, 100).mapToObj(i -> new Message(i, i % 10));
}

public void processMessage(Message message) {
    System.out.println(String.format("Message: %s processed by the thread: %s", message, Thread.currentThread().getName()));
}

private static class Message {
    private final int id;
    private final int entityId;

    public Message(int id, int entityId) {
        this.id = id;
        this.entityId = entityId;
    }

    public int getId() {
        return id;
    }

    public int getEntityId() {
        return entityId;
    }

    @Override
    public String toString() {
        return "Message{" +
                "id=" + id +
                ", entityId=" + entityId +
                '}';
    }
}

      



I think a similar solution could be in RxJava

+2


source







All Articles