What is the IObservable.Subscribe (IObserver <T> observer) overload?
When I write expressions .Subscribe
I often find that Resharper chose the following overload for me, located in mscorlib, Version = 4.0.0.0:
namespace System
{
public interface IObservable<out T>
{
IDisposable Subscribe(IObserver<T> observer);
}
}
This seems very different from most of the overloads that accept Action
, and also from mscorlib, not System.Reactive.*
where I expect most of the Reactive stuff to be.
What does this overload do? How to use it? How does it IObserver<T>
relate to Action
? And why is this the only overload in mscorlib?
source to share
To be clear, this is not an overload of what the Rx core really is. All the other methods Subscribe
and all the other operators you are used to are actually extension methods that ultimately call it.
If you look at the early documentation or Rx, you'll see that the creators viewed it as the push-based side of LINQ. So many things are mirror images of what you see in LINQ. IObservable
is a mirror IEnumerable
and IObserver
is a mirror IEnumerator
.
However, since push is the opposite of pull, so the Rx versions are the opposite of their pull-based counterpart:
-
IEnumerable
defines one method that createsIEnumerator
.IObservable
defines one method that acceptsIObserver
. - If you think of
IEnumerator.MoveNext()
+IEnumerator.Current
as a single operation that can return in one of three ways: the next item is returned, the end of the collection, or an exception. Similarly, itIObserver
should handle three cases: next item (OnNext
), end of stream (OnCompleted
), or exception (OnError
).
The more familiar "overloads" Subscribe
are really just extension methods that look something like this:
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext)
{
return Subscribe(onNext, e => {/*onError */}, () => {/*onCompleted*/);
}
public static IDisposable Subscribe<T>(this IObservable<T> source, Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
source.Subscribe(new AnonymousObserver<T>(onNext, onError, onCompleted));
}
source to share
If you look at IObserver
, you can see the reason. The interface contains three methods (or Action
s), the most "used" of which are OnNext
.
Now let's look at the overload implementation for Action
here . The extension method actually generates IObserver
for you, passing in the supplied Action
semantics OnNext
.
You can provide Action
for all implementations of an interface method if you want to handle notifications OnError
and OnCompleted
.
source to share