Turn IObservable <IEnumerable <T>> to IEnumerable <IObservable <T>>
How could I list an enumerable xys
to an enumerable of observables yxs
where each observable yxs
focuses on a specific element of each time step xys
? What I want is similar to transpose for an enum.
Example:
IObservable<IEnumerable<int>> xys = Observable.Generate(0, _ => true, i => ++i, i => new[] {0 + i, 1 + i, 2 + i});
// xys = emits {0,1,2}, {1,2,3}, ...
IEnumerable<IObservable<int>> yxs = new[]
{
Observable.Generate(0, i=>true, i=> ++i, i=>i),
Observable.Generate(1, i=>true, i=> ++i, i=>i),
Observable.Generate(2, i=>true, i=> ++i, i=>i),
};
// yxs = {emits 0, 1, ...}, {emits 1, 2, ...}, {emits 2, 3, ...}
I am especially interested in a function that is already part of Rx. As with the above example, infinite observables should be possible as well as infinite enumerations.
source to share
This is what I came up with. However, this is a standalone solution and not part of Rx. I would appreciate much more if someone pointed me to a solution that included library functions.
public static IEnumerable<IObservable<T>> ObserveElements<T>(this IObservable<IEnumerable<T>> obs)
{
var i = 0;
while (true)
{
var idx = i++;
yield return from enumerable in obs
let x = enumerable.ElementAtOrDefault(idx)
where !Equals(x, default(T))
select x;
}
}
Obviously, you should only .Take()
see as many observables as you want.
In Haskell terms, I believe this is actually a sequence
monad IObservable
-specific implementation for IEnumerable
.
source to share