Create IO binding observable with RX
I have a worker thread making blocking calls ( ReadFrame
) to read incoming data from a socket (IO bind). The thread runs a loop that transfers data to Subject
, which can be observed by consumers.
private void ReadLoop()
{
while (!IsDisposed)
{
var frame = _Socket.ReadFrame();
_ReceivedFrames.OnNext(frame);
}
}
I wonder if there is an RX way to do this.
Here is an attempt (toy example) I did:
var src = Observable
.Repeat(Unit.Default)
.Select(_ =>
{
Thread.Sleep(1000); // simulated blocking ReadFrame call
return "data read from socket";
})
.SubscribeOn(ThreadPoolScheduler.Instance) // avoid blocking when subscribing
.ObserveOn(NewThreadScheduler.Default) // spin up new thread (?)
.Publish()
.RefCount();
var d = src.Subscribe(s => s.Dump()); // simulated consumer
Console.ReadLine(); // simulated main running code
d.Dispose(); // tear down
I am struggling for the correct use ObserveOn
, SubscribeOn
and planners. The toy example seems to work, but I'm not sure if the thread's lifetime is being managed correctly.
Is the read stream closed when called d.Dispose()
?
Do I need to create a new thread?
Should I use Observable.Create
instead? How?
Below is more information requested by @Enigmativity:
The method ReadLoop()
is part of a class that conforms to the following interface:
public interface ICanSocket : IDisposable
{
IObservable<CanFrame> ReceivedFrames { get; }
IObserver<CanFrame> FramesToSend { get; }
}
Its member _Socket
is located (closed) when the parent is posted ICanSocket
.
source to share
The most "Rxy" way to do this is to use Rxx , which has observable methods for asynchronous I / O.
It looks like your main problems are:
- When subscribing, do not block the subscriber thread (otherwise execute the I / O thread on a background thread)
- When the caller unsubscribes, stop the I / O stream
One way to solve these problems is to simply use the async Create method:
// just use Task.Run to "background" the work
var src = Observable
.Create<CanFrame>((observer, cancellationToken) => Task.Run(() =>
{
while (!cancellationToken.IsCancellationRequested)
{
var frame = _Socket.ReadFrame();
if (frame == null) // end of stream?
{
// will send a Completed event
return;
}
observer.OnNext(frame);
}
}));
var d = src.Subscribe(s => s.Dump());
Console.ReadLine();
d.Dispose();
source to share