How to implement Observable.take using FSharp.Control.Reactive to observe calculations?

I am trying to figure out how to observe calculations from FSharp.Control.Reactive, so I am reprogramming the Observable.take combinator

This is my first try:

let myTake (n : int) (source : IObservable<'a>) : IObservable<'a> =
    let rec go counter : IObservable<'a> =
        observe {
            let! v = source
            if counter < n
            then yield v
                 yield! go (counter + 1)
        }
    go 0

      

However, when I run the test below:

use subcription =
    subj
    |> myTake 2
    |> Observable.subscribe (printfn "next:%d")

subj.OnNext 1
subj.OnNext 2
subj.OnNext 3

waitForKey "my take"

      

I am getting output:

next:1 
next:2 
next:2 
next:3 
next:3 
next:3 

      

How can I fix this?

I also tried running myTake on an observable created with an observable .Seq and it failed even worse, i.e. it just produced a sequence of input repeated several times. I'm guessing this is because ofSeq returns a cold observable, but doesn't fully understand the behavior.

How can I make it work with cold observables too?

thank!

+3


source to share


2 answers


I agree with Thomas that the built-in linker operators don't support what you want. However, my general conclusion differs from his - I don't understand why observables are more computationally problematic than sequence.

First of all, consider how you write take

in a sequence expression. There let!

is not supported; you need to use a loop for

. But the analogue of your code:

let rec go counter = seq {
    for v in source do
        if counter < n then
            yield v
            yield! go (counter + 1)
}

      

is just as broken for taking from a sequence as it is for taking from an observable: the counter effectively tells you how many times to iterate over the Cartesian product, which you don't want. One solution is to make the counter mutable and move it inside the expression (which should then no longer be recursive):

seq {
    let mutable counter = 0
    for v in source do
        if counter < n then
            yield v
            counter <- counter + 1
}

      



So this works for sequences, but what about observables? observe

also supports loops for

, but only for looping through a sequence, not an observable. However, we can remove this limitation by overloading the builder method for

:

type FSharp.Control.Reactive.Builders.ObservableBuilder with
    member __.For(o, f) =
        FSharp.Control.Reactive.Observable.bind f o

      

And now we can write take

as easily for observables as for sequences:

let myTake (n : int) (source : IObservable<'a>) : IObservable<'a> =
    observe {
        let mutable counter = 0
        for v in source do
            if counter < n then
                yield v
                counter <- counter + 1
    }

      

and you can check that it works.

+2


source


I'm not very familiar with Calculation Designer observable

, but a quick look at the source code shows that the operation Bind

(which is behind let!

) is implemented using the Rx SelectMany operation . The SelectMany

activity starts the rest of the workflow for each event event, so the behavior you see is as expected.

The image from this article illustrates the behavior well :

enter image description here



I'm not sure what would be a good way to implement Observable.take

with a computation constructor observable

- frankly, I've always thought that observables are not particularly suitable for F # computations, because the usual intuition that comes with computation expressions just doesn't work for push-based observables ...

I think observables work really well if you can manage to just write inline operations, but they're not particularly enjoyable when you need to implement your own primitives - and most of the time I just use the F # agent to implement the logic and wrap it up and observe ...

+3


source







All Articles