.NET - multithreaded producer-consumer using .NET reactive

I am working on creating a producer template ... using .NET reactive. The producer reads messages from the Kafka message bus. After the message is read, it must be passed to the consumer to process the message.

I was able to do this using .NET reactive. However, I noticed that the consumer is processing the message on the same thread as the producer. See code below. The goal is to: have one producer thread that reads messages from the bus. And then pass them on to consumers on a separate thread to process messages. The code I have is:

 // Producer Code
 private Subject<LGMessage> _onMessageSubject = new Subject<LGMessage>();

 private IObserver<LGMessage> messageBusObserver;

 protected IObservable<LGMessage> _onMessageObservable
    {
        get
        {
            return _onMessageSubject.AsObservable();
        }
    }


public void AddObserver(IObserver<LGMessage> observer)
    {
       _onMessageObservable.ObserveOn(NewThreadScheduler.Default).Subscribe(observer);


    }


// Read is called when the message is read from the bus
public bool Read(Message<string, string> msg)
    {

            // add the message to the observable
            _onMessageSubject.OnNext(msg.Value);


    }

// Consumer Code
public virtual void OnNext(string value)
    {
        Console.WriteLine("Thread {0} Consuming",          

        Thread.CurrentThread.ManagedThreadId);

        Console.WriteLine("{1}: Message I got from bus: {0}", value.Key, 
         this.Name);
        // Take Action
    }

      

+3


source to share


1 answer


It's hard to tell from your code, but it looks like you are not exposing the observable. This negates the use of Rx operators in the downstream. In your case, you would like to use streaming operators .

In producer, instead of exhibiting, AddObserver(IObserver<string> observer)

I would output something like this:

public IObservable<string> Messages => _onMessageSubject.AsObservable();

      

The consumer can then do something like

Messages
    .ObserveOn(NewThreadScheduler.Default)
    .Subscribe(consumerObserver);

      



EDIT

The following code works for me as intended:

var subject = new Subject<int>();

var observer1 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer1: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer2 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer2: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));
var observer3 = new AnonymousObserver<int>(i => Console.WriteLine($"Observer3: Observed {i} on thread {Thread.CurrentThread.ManagedThreadId}."));

subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
subject.AsObservable().ObserveOn(NewThreadScheduler.Default).Subscribe(observer3);

subject.OnNext(1);
subject.OnNext(2);
subject.OnNext(3);
subject.OnCompleted();

      

Here's the output (Observer1 got stream 14, Observer2 got stream 15, Observer3 got stream 16):

Observer1: Observed 1 on thread 14.
Observer2: Observed 1 on thread 15.
Observer1: Observed 2 on thread 14.
Observer1: Observed 3 on thread 14.
Observer2: Observed 2 on thread 15.
Observer2: Observed 3 on thread 15.
Observer3: Observed 1 on thread 16.
Observer3: Observed 2 on thread 16.
Observer3: Observed 3 on thread 16.

      

+2


source







All Articles