Create IO binding observable with RX

I have a worker thread making blocking calls ( ReadFrame

) to read incoming data from a socket (IO bind). The thread runs a loop that transfers data to Subject

, which can be observed by consumers.

private void ReadLoop()
{
    while (!IsDisposed)
    {
        var frame = _Socket.ReadFrame();
        _ReceivedFrames.OnNext(frame);
    }
}

      

I wonder if there is an RX way to do this.

Here is an attempt (toy example) I did:

var src = Observable
         .Repeat(Unit.Default)
         .Select(_ =>
         {
             Thread.Sleep(1000);              // simulated blocking ReadFrame call
             return "data read from socket";
         })
         .SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
         .ObserveOn(NewThreadScheduler.Default)     // spin up new thread (?)
         .Publish()
         .RefCount();

var d = src.Subscribe(s => s.Dump()); // simulated consumer

Console.ReadLine();  // simulated main running code

d.Dispose();  // tear down

      

I am struggling for the correct use ObserveOn

, SubscribeOn

and planners. The toy example seems to work, but I'm not sure if the thread's lifetime is being managed correctly.

Is the read stream closed when called d.Dispose()

?
Do I need to create a new thread?
Should I use Observable.Create

instead? How?

Below is more information requested by @Enigmativity:

The method ReadLoop()

is part of a class that conforms to the following interface:

public interface ICanSocket : IDisposable
{
    IObservable<CanFrame> ReceivedFrames { get; }
    IObserver<CanFrame>   FramesToSend   { get; }
}

      

Its member _Socket

is located (closed) when the parent is posted ICanSocket

.

+3


source to share


1 answer


The most "Rxy" way to do this is to use Rxx , which has observable methods for asynchronous I / O.

It looks like your main problems are:

  • When subscribing, do not block the subscriber thread (otherwise execute the I / O thread on a background thread)
  • When the caller unsubscribes, stop the I / O stream


One way to solve these problems is to simply use the async Create method:

// just use Task.Run to "background" the work
var src = Observable
    .Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
    {
        while (!cancellationToken.IsCancellationRequested)
        {
            var frame = _Socket.ReadFrame();
            if (frame == null) // end of stream?
            {
                // will send a Completed event
                return;
            }

            observer.OnNext(frame);
        }
    }));

var d = src.Subscribe(s => s.Dump());
Console.ReadLine();
d.Dispose();

      

+1


source







All Articles