Turn IObservable <IEnumerable <T>> to IEnumerable <IObservable <T>>

How could I list an enumerable xys

to an enumerable of observables yxs

where each observable yxs

focuses on a specific element of each time step xys

? What I want is similar to transpose for an enum.

Example:

IObservable<IEnumerable<int>> xys = Observable.Generate(0, _ => true, i => ++i, i => new[] {0 + i, 1 + i, 2 + i});
// xys = emits {0,1,2}, {1,2,3}, ...
IEnumerable<IObservable<int>> yxs = new[]
{
    Observable.Generate(0, i=>true, i=> ++i, i=>i),
    Observable.Generate(1, i=>true, i=> ++i, i=>i),
    Observable.Generate(2, i=>true, i=> ++i, i=>i),
};
// yxs = {emits 0, 1, ...}, {emits 1, 2, ...}, {emits 2, 3, ...}

      

I am especially interested in a function that is already part of Rx. As with the above example, infinite observables should be possible as well as infinite enumerations.

+3


source to share


2 answers


This is what I came up with. However, this is a standalone solution and not part of Rx. I would appreciate much more if someone pointed me to a solution that included library functions.

public static IEnumerable<IObservable<T>> ObserveElements<T>(this IObservable<IEnumerable<T>> obs)
{
    var i = 0;
    while (true)
    {
        var idx = i++;
        yield return from enumerable in obs
                     let x = enumerable.ElementAtOrDefault(idx)
                     where !Equals(x, default(T))
                     select x;
    }
}

      



Obviously, you should only .Take()

see as many observables as you want.

In Haskell terms, I believe this is actually a sequence

monad IObservable

-specific implementation for IEnumerable

.

-1


source


This will convert your types correctly. However, you cannot change the semantics of when / how elements will be delivered from below, so you are really just going to buffer and block no matter how you do it.



dest = source.ToEnumerable().Map(x => x.ToObservable());

      

+2


source







All Articles