Subscribe as the last method

Is there a way to subscribe to a method even though it will be called last when onNext is added?

m_subject.Subscribe(() => Console.writeLine("firstSubscription");
m_subject.SubscribeLast(() => Console.writeLine("secondSubscription");
m_subject.Subscribe(() => Console.writeLine("thirdSubscription");

m_subject.OnNext();

// prints:
// firstSubscription
// thirdSubscription
// secondSubscription

      

+3


source to share


2 answers


You can't subscribe last, but you can wrap all your calls into one subscription.

Something like that:



Action action = () => {};
Action lastAction = () => {};

m_subject.Subscribe(() => 
{
    action();
    lastAction();
});

action += (() => Console.writeLine("firstSubscription");
lastAction += (() => Console.writeLine("secondSubscription");
action += (() => Console.writeLine("thirdSubscription");

m_subject.OnNext();

// prints:
// firstSubscription
// thirdSubscription
// secondSubscription

      

+5


source


You can also do this by specifying a custom Subject<T>

one that internally has a default object as well as the last object.

Update

I added congestion ObserveOn

for storage IScheduler

and SynchronizationContext

then applied them during subscription. A similar technique can be used to enable the function SubscribeOn

.

public class SubscribeLastSubject<T> : ISubject<T>, IDisposable
{
    private readonly Subject<T> subject = new Subject<T>();
    private readonly Subject<T> lastSubject = new Subject<T>();
    private IScheduler observeScheduler;
    private SynchronizationContext observerContext;

    public void OnNext(T value)
    {
        subject.OnNext(value);
        lastSubject.OnNext(value);
    }

    public void OnError(Exception error)
    {
        subject.OnError(error);
        lastSubject.OnError(error);
    }

    public void OnCompleted()
    {
        subject.OnCompleted();
        lastSubject.OnCompleted();
    }

    public IDisposable Subscribe(IObserver<T> observer)
    {
        return GetObservable().Subscribe(observer);           
    }

    public IDisposable SubscribeLast(IObserver<T> observer)
    {
        return GetLastObservable().Subscribe(observer);     
    }

    public IDisposable SubscribeLast(Action<T> action)
    {
        return GetLastObservable().Subscribe(action);
    }

    public SubscribeLastSubject<T> ObserveOn(IScheduler scheduler)
    {
        observeScheduler = scheduler;
        return this;
    }

    public SubscribeLastSubject<T> ObserveOn(SynchronizationContext context)
    {
        observerContext = context;
        return this;
    }

    public void Dispose()
    {
        subject.Dispose();
        lastSubject.Dispose();
    }

    private IObservable<T> GetObservable()
    {
        if (observerContext != null)
        {
            return subject.ObserveOn(observerContext);
        }

        if (observeScheduler != null)
        {
            return subject.ObserveOn(observeScheduler);
        }

        return subject;
    }

    private IObservable<T> GetLastObservable()
    {
        if (observerContext != null)
        {
            return lastSubject.ObserveOn(observerContext);
        }

        if (observeScheduler != null)
        {
            return lastSubject.ObserveOn(observeScheduler);
        }

        return lastSubject;
    }
}

      



Using

var m_subject = new SubscribeLastSubject<string>();

m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("firstSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).SubscribeLast(s => Console.WriteLine("secondSubscription"));
m_subject.ObserveOn(Scheduler.CurrentThread).Subscribe(s => Console.WriteLine("thirdSubscription"));

m_subject.OnNext("1");

Console.ReadKey();

      

Output

firstSubscription
thirdSubscription
secondSubscription

      

+1


source







All Articles