Is there a way in RX to bind the source stream to the dest stream so that the source can be changed without affecting the customer's subscription?
I am using RX and I want to bind / map the source stream to the destination stream so that the original stream can be dynamically changed without affecting any subscription to the target stream.
I am planning my (naive) solution here in the hope that someone can show me a better solution.
I hope existing extension methods will be created to achieve this. And if not, I hope to create my own extension method that will simplify my solution.
/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T>
{
/// <summary>
/// The source stream that is only set when we bind it.
/// </summary>
private IObservable<T> sourceStream;
/// <summary>
/// Used to unsubscribe from the source stream.
/// </summary>
private IDisposable sourceStreamDisposer;
/// <summary>
/// Subject used as the destination stream.
/// For passing data from source to dest stream.
/// </summary>
private Subject<T> destStream = new Subject<T>();
/// <summary>
/// Get the destination stream. Clients can subscribe to this to receive data that is passed on from the source stream.
/// Later on we can set or change the underlying source stream without affecting the destination stream.
/// </summary>
public IObservable<T> GetDestStream()
{
return destStream;
}
/// <summary>
/// Bind to a source stream that is to be propagated to the destination stream.
/// </summary>
public void Bind(IObservable<T> sourceStream)
{
Unbind();
this.sourceStream = sourceStream;
this.sourceStreamDisposer = sourceStream.Subscribe(dataItem =>
{
//
// Pass the source item on to the client via the subject.
//
destStream.OnNext(dataItem);
});
}
/// <summary>
/// Unsubscribe from the source stream.
/// </summary>
public void Unbind()
{
if (sourceStreamDisposer != null)
{
sourceStreamDisposer.Dispose();
}
sourceStreamDisposer = null;
sourceStream = null;
}
}
Here's a very simple example of how this is used:
static void Main(string[] args)
{
var bindableStream = new BindableStream<long>();
// Subscribe before binding the source stream.
bindableStream.GetDestStream().Subscribe(i => Console.WriteLine(i));
Thread.Sleep(1000);
// Bind a source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(5000);
// Bind a new source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.ReadKey();
}
source to share
You can use the operator Observable.Switch(...)
to get what you want.
The switch creates a "rolling" subscription. As a new observable is checked out, it disposes of its subscription to the previous observable and subscribes to the new one.
static void Main(string[] args)
{
var streams = new Subject<IObservable<long>>();
// Subscribe before binding the source stream.
streams.Switch().Subscribe(Console.WriteLine);
Thread.Sleep(1000);
// Bind a source stream.
streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(5000);
// Bind a new source stream.
streams.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Console.ReadKey();
}
Or if you know where your "streams" come from ...
static void Main(string[] args)
{
var interval = Observable.IntervalTimeSpan.FromSeconds(1));
var sourcesOvertime = new [] {
// Yield the first source after one second
Observable.Return(interval).Delay(TimeSpan.FromSeconds(1)),
// Yield the second source after five seconds
Observable.Return(interval).Delay(TimeSpan.FromSeconds(5))
};
sourcesOvertime
// merge these together so we end up with a "stream" of our source observables
.Merge()
// Now only listen to the latest one.
.SwitchLatest()
// Feed the values from the latest source to the console.
.Subscribe(Console.WriteLine);
Console.ReadKey();
}
EDIT:
As a simplification for the class BindableStream
...
static void Main(string[] args)
{
// var bindableStream = new BindableStream<long>();
var bindableStream = new Subject<IObservable<long>>();
var dest = bindableStream.Switch();
// Subscribe before binding the source stream.
// bindableStream.Subscribe(i => Console.WriteLine(i));
dest.Subscribe(i => Console.WriteLine(i));
Thread.Sleep(1000);
// Bind a source stream.
// bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(5000);
// Bind a new source stream.
// bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
bindableStream.OnNext(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(4000);
Console.WriteLine("Unbound!");
// Unbind the source and dest streams.
// bindableStream.Unbind();
bindableStream.OnNext(Observable.Empty<long>());
Console.ReadKey();
}
Or if it's too wordy ...
public static class SubjectEx
{
public static class OnNextEmpty<T>(this ISubject<IObservable<T>> subject)
{
subject.OnNext(Observable.Empty<T>());
}
}
source to share
Following input from @ChristopherHarris, I revisited my original solution. I think this is much better than my original example, although I would still like to weld this down to a custom extension method.
If you can figure out how to simplify this please post an answer.
NOTE. Using a switch simplifies my initial decision and eliminates the need to manually subscribe to the original sequence.
/// <summary>
/// Used to bind a source stream to destination stream
/// Clients can subscribe to the destination stream before the source stream has been bound.
/// The source stream can be changed as desired without affecting the subscription to the destination stream.
/// </summary>
public class BindableStream<T> : IObservable<T>
{
/// <summary>
/// Subject used as the destination stream.
/// For passing data from source to dest stream.
/// This is a stream of streams.
/// When a new stream is added it replaces whichever stream was previously added.
/// </summary>
private Subject<IObservable<T>> destStream = new Subject<IObservable<T>>();
/// <summary>
/// Subscribe to the destination stream.
/// Clients can subscribe to this to receive data that is passed on from the source stream.
/// Later on we can set or change the underlying source stream without affecting the destination stream.
/// </summary>
public IDisposable Subscribe(IObserver<T> observer)
{
return destStream.Switch().Subscribe(observer);
}
/// <summary>
/// Bind to a new source stream that is to be propagated to the destination stream.
/// </summary>
public void Bind(IObservable<T> sourceStream)
{
destStream.OnNext(sourceStream);
}
/// <summary>
/// Unbind the source stream.
/// </summary>
public void Unbind()
{
destStream.OnNext(Observable.Empty<T>());
}
}
An example of using "BindableStream":
static void Main(string[] args)
{
var bindableStream = new BindableStream<long>();
// Subscribe before binding the source stream.
bindableStream.Subscribe(i => Console.WriteLine(i));
Thread.Sleep(1000);
// Bind a source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(5000);
// Bind a new source stream.
bindableStream.Bind(Observable.Interval(TimeSpan.FromSeconds(1)));
Thread.Sleep(4000);
Console.WriteLine("Unbound!");
// Unbind the source and dest streams.
bindableStream.Unbind();
Console.ReadKey();
}
source to share