Spring Benefits for reactors when the number of publisher threads is much higher than the number of consumers

I have the following use case:

  • N threads publish data (N can range from 10 to 1000 threads), these threads can do HTTP request, jdbc calls, pure java processing using only local machine.
  • 1 to M stream consumes it, doing IO (send HTTP requests, write to database ... in bulk maybe), these streams shouldn't slow down publishers. M must not exceed 10 threads.

N threads can publish data much faster than the consumer consumes it, but the idea is to minimize the slowdown on the publisher.

I've taken an ArrayBlockingQueue based approach where the publisher writes and the thread that takes the data from the queue and processes it works, but the results are not great.

So I'm looking into the Reactor pattern and in particular Spring-Reactor to see if it might be the answer for my use case. This is true?

I read:

In my situation, where the number of publisher threads is much higher than the number of consumers, is this the right choice?

+3


source to share


2 answers


It looks like you can look at the Reactor PersistentQueue

object
and separate your publisher from your subscriber. This is a common implementation Queue

, but it uses a chronic queue for save, discard, and reuse. It's also extremely, very fast.

Basically you will have a publisher pushing data into a PersistentQueue on one side, and a set of subscribers pulling from it on the other. This could mean replacing your current use if you are already using Queue

.



I need to write a wiki page to show some basic usage patterns.

+4


source


I faced a similar problem using a custom container class. It uses a double-buffered methodology through CAS object, which allows you to read all accumulated objects in one action without blocking.

I have no idea how effective it is, but the simplicity should ensure it is there with good ones.



Please note that most of the code below is test code - you can remove all of the code below the comment //TESTING

without affecting functionality.

/**
 * Lock free - thread-safe.
 *
 * Write from many threads - read with fewer threads.
 *
 * Write items of type T.
 *
 * Read items of type List<T>.
 *
 * @author OldCurmudgeon
 * @param <T> - Th etype we plan to write/read.
 */
public class DoubleBufferedList<T> {

    /**
     * Atomic reference so I can atomically swap it through.
     *
     * Mark = true means I am adding to it so momentarily unavailable for iteration.
     */
    private final AtomicMarkableReference<List<T>> list = new AtomicMarkableReference<>(newList(), false);

    // Factory method to create a new list - may be best to abstract this.
    protected List<T> newList() {
        return new ArrayList<>();
    }

    /**
     * Get and replace the current list.
     *
     * Used by readers.
     *
     * @return List<T> of a number (possibly 0) of items of type T.
     */
    public List<T> get() {
        // The list that was there.
        List<T> it;
        // Replace an unmarked list with an empty one.
        if (!list.compareAndSet(it = list.getReference(), newList(), false, false)) {
            // Mark was not false - Failed to replace!
            // It is probably marked as being appended to but may have been replaced by another thread.
            // Return empty and come back again soon.
            return Collections.<T>emptyList();
        }
        // Successfull replaced an unmarked list with an empty list!
        return it;
    }

    /**
     * Grab and lock the list in preparation for append.
     *
     * Used by add.
     */
    private List<T> grab() {
        List<T> it;
        // We cannot fail so spin on get and mark.
        while (!list.compareAndSet(it = list.getReference(), it, false, true)) {
            // Spin on mark - waiting for another grabber to release (which it must).
        }
        return it;
    }

    /**
     * Release the grabbed list.
     *
     * Opposite of grab.
     */
    private void release(List<T> it) {
        // Unmark it - should this be a compareAndSet(it, it, true, false)?
        if (!list.attemptMark(it, false)) {
            // Should never fail because once marked it will not be replaced.
            throw new IllegalMonitorStateException("It changed while we were adding to it!");
        }
    }

    /**
     * Add an entry to the list.
     *
     * Used by writers.
     *
     * @param entry - The new entry to add.
     */
    public void add(T entry) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entry.
            it.add(entry);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add many entries to the list.
     *
     * @param entries - The new entries to add.
     */
    public void add(List<T> entries) {
        List<T> it = grab();
        try {
            // Successfully marked! Add my new entries.
            it.addAll(entries);
        } finally {
            // Always release after a grab.
            release(it);
        }
    }

    /**
     * Add a number of entries.
     *
     * @param entries - The new entries to add.
     */
    @SafeVarargs
    public final void add(T... entries) {
        // Make a list of them.
        add(Arrays.<T>asList(entries));
    }

    // TESTING.
    // How many testers to run.
    static final int N = 10;
    // The next one we're waiting for.
    static final AtomicInteger[] seen = new AtomicInteger[N];
    // The ones that arrived out of order.
    static final ConcurrentSkipListSet<Widget>[] queued = Generics.<ConcurrentSkipListSet<Widget>>newArray(N);

    static class Generics {

        // A new Generics method for when we switch to Java 7.
        @SafeVarargs
        static <E> E[] newArray(int length, E... array) {
            return Arrays16.copyOf(array, length);
        }
    }

    static {
        // Populate the arrays.
        for (int i = 0; i < N; i++) {
            seen[i] = new AtomicInteger();
            queued[i] = new ConcurrentSkipListSet<>();
        }
    }

    // Thing that is produced and consumed.
    private static class Widget implements Comparable<Widget> {

        // Who produced it.
        public final int producer;
        // Its sequence number.
        public final int sequence;

        public Widget(int producer, int sequence) {
            this.producer = producer;
            this.sequence = sequence;
        }

        @Override
        public String toString() {
            return producer + "\t" + sequence;
        }

        @Override
        public int compareTo(Widget o) {
            // Sort on producer
            int diff = Integer.compare(producer, o.producer);
            if (diff == 0) {
                // And then sequence
                diff = Integer.compare(sequence, o.sequence);
            }
            return diff;
        }
    }

    // Produces Widgets and feeds them to the supplied DoubleBufferedList.
    private static class TestProducer implements Runnable {

        // The list to feed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // The sequence we're at
        int sequence = 0;
        // Set this at true to stop me.
        public volatile boolean stop = false;

        public TestProducer(DoubleBufferedList<Widget> list, int id) {
            this.list = list;
            this.id = id;
        }

        @Override
        public void run() {
            // Just pump the list.
            while (!stop) {
                list.add(new Widget(id, sequence++));
            }
        }
    }

    // Consumes Widgets from the suplied DoubleBufferedList
    private static class TestConsumer implements Runnable {

        // The list to bleed.
        final DoubleBufferedList<Widget> list;
        // My ID
        final int id;
        // Set this at true to stop me.
        public volatile boolean stop = false;

        public TestConsumer(DoubleBufferedList<Widget> list, int id) {
            this.list = list;
            this.id = id;
        }

        @Override
        public void run() {
            // The list I am working on.
            List<Widget> l = list.get();
            // Stop when stop == true && list is empty
            while (!(stop && l.isEmpty())) {
                // Record all items in list as arrived.
                arrived(l);
                // Grab another list.
                l = list.get();
            }
        }

        private void arrived(List<Widget> l) {
            for (Widget w : l) {
                // Mark each one as arrived.
                arrived(w);
            }
        }

        // A Widget has arrived.
        private static void arrived(Widget w) {
            // Which one is it?
            AtomicInteger n = seen[w.producer];
            // Don't allow multi-access to the same producer data or we'll end up confused.
            synchronized (n) {
                // Is it the next to be seen?
                if (n.compareAndSet(w.sequence, w.sequence + 1)) {
                    // It was the one we were waiting for! See if any of the ones in the queue can now be consumed.
                    for (Iterator<Widget> i = queued[w.producer].iterator(); i.hasNext();) {
                        Widget it = i.next();
                        // Is it in sequence?
                        if (n.compareAndSet(it.sequence, it.sequence + 1)) {
                            // Done with that one too now!
                            i.remove();
                        } else {
                            // Found a gap! Stop now.
                            break;
                        }
                    }
                } else {
                    // Out of sequence - Queue it.
                    queued[w.producer].add(w);
                }
            }
        }
    }

    // Main tester
    public static void main(String args[]) {
        try {
            System.out.println("DoubleBufferedList:Test");
            // Create my test buffer.
            DoubleBufferedList<Widget> list = new DoubleBufferedList<>();
            // All running threads - Producers then Consumers.
            List<Thread> running = new LinkedList<>();
            // Start some producer tests.
            List<TestProducer> producers = new ArrayList<>();
            for (int i = 0; i < N; i++) {
                TestProducer producer = new TestProducer(list, i);
                Thread t = new Thread(producer);
                t.setName("Producer " + i);
                t.start();
                producers.add(producer);
                running.add(t);
            }

            // Start the same number of consumers (could do less or more if we wanted to).
            List<TestConsumer> consumers = new ArrayList<>();
            for (int i = 0; i < N; i++) {
                TestConsumer consumer = new TestConsumer(list, i);
                Thread t = new Thread(consumer);
                t.setName("Consumer " + i);
                t.start();
                consumers.add(consumer);
                running.add(t);
            }
            // Wait for a while.
            Thread.sleep(5000);
            // Close down all.
            for (TestProducer p : producers) {
                p.stop = true;
            }
            for (TestConsumer c : consumers) {
                c.stop = true;
            }
            // Wait for all to stop.
            for (Thread t : running) {
                System.out.println("Joining " + t.getName());
                t.join();
            }
            // What results did we get?
            int totalMessages = 0;
            for (int i = 0; i < N; i++) {
                // How far did the producer get?
                int gotTo = producers.get(i).sequence;
                // The consumer state
                int seenTo = seen[i].get();
                totalMessages += seenTo;
                Set<Widget> queue = queued[i];
                if (seenTo == gotTo && queue.isEmpty()) {
                    System.out.println("Producer " + i + " ok.");
                } else {
                    // Different set consumed as produced!
                    System.out.println("Producer " + i + " Failed: gotTo=" + gotTo + " seenTo=" + seenTo + " queued=" + queue);
                }
            }
            System.out.println("Total messages " + totalMessages);

        } catch (InterruptedException ex) {
            ex.printStackTrace();
        }
    }
}

      

+1


source







All Articles