Subscribe as the last method
Is there a way to subscribe to a method even though it will be called last when onNext is added?
m_subject.Subscribe(() => Console.writeLine("firstSubscription");
m_subject.SubscribeLast(() => Console.writeLine("secondSubscription");
m_subject.Subscribe(() => Console.writeLine("thirdSubscription");
m_subject.OnNext();
// prints:
// firstSubscription
// thirdSubscription
// secondSubscription
source to share
You can't subscribe last, but you can wrap all your calls into one subscription.
Something like that:
Action action = () => {};
Action lastAction = () => {};
m_subject.Subscribe(() =>
{
action();
lastAction();
});
action += (() => Console.writeLine("firstSubscription");
lastAction += (() => Console.writeLine("secondSubscription");
action += (() => Console.writeLine("thirdSubscription");
m_subject.OnNext();
// prints:
// firstSubscription
// thirdSubscription
// secondSubscription
source to share
You can also do this by specifying a custom Subject<T>
one that internally has a default object as well as the last object.
Update
I added congestion ObserveOn
for storage IScheduler
and SynchronizationContext
then applied them during subscription. A similar technique can be used to enable the function SubscribeOn
.
public class SubscribeLastSubject<T> : ISubject<T>, IDisposable
{
private readonly Subject<T> subject = new Subject<T>();
private readonly Subject<T> lastSubject = new Subject<T>();
private IScheduler observeScheduler;
private SynchronizationContext observerContext;
public void OnNext(T value)
{
subject.OnNext(value);
lastSubject.OnNext(value);
}
public void OnError(Exception error)
{
subject.OnError(error);
lastSubject.OnError(error);
}
public void OnCompleted()
{
subject.OnCompleted();
lastSubject.OnCompleted();
}
public IDisposable Subscribe(IObserver<T> observer)
{
return GetObservable().Subscribe(observer);
}
public IDisposable SubscribeLast(IObserver<T> observer)
{
return GetLastObservable().Subscribe(observer);
}
public IDisposable SubscribeLast(Action<T> action)
{
return GetLastObservable().Subscribe(action);
}
public SubscribeLastSubject<T> ObserveOn(IScheduler scheduler)
{
observeScheduler = scheduler;
return this;
}
public SubscribeLastSubject<T> ObserveOn(SynchronizationContext context)
{
observerContext = context;
return this;
}
public void Dispose()
{
subject.Dispose();
lastSubject.Dispose();
}
private IObservable<T> GetObservable()
{
if (observerContext != null)
{
return subject.ObserveOn(observerContext);
}
if (observeScheduler != null)
{
return subject.ObserveOn(observeScheduler);
}
return subject;
}
private IObservable<T> GetLastObservable()
{
if (observerContext != null)
{
return lastSubject.ObserveOn(observerContext);
}
if (observeScheduler != null)
{
return lastSubject.ObserveOn(observeScheduler);
}
return lastSubject;
}
}
Using
var m_subject = new SubscribeLastSubject<string>();
m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("firstSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).SubscribeLast(s => Console.WriteLine("secondSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("thirdSubscription"));
m_subject.OnNext("1");
Console.ReadKey();
Output
firstSubscription
thirdSubscription
secondSubscription
source to share