How can I set up multiple observables from an event in F #?

I am trying to learn about the Observable module in F # by writing a program that connects to a web juice, listens for messages, and then processes them on some set of Observables based streams. However, I find it difficult to understand the real behavior.

First, I set up websocket like this:

open System
open System.Net.WebSockets
open System.Threading

let connectFeed =
    let feedUrl = "blah blah"
    let buffer : byte array = Array.zeroCreate 1024
    let segment = ArraySegment(buffer)
    let socketEvent = new Event<string>()

    let task = async {
        let random = Random(DateTime.Now.Millisecond)
        use socket = new ClientWebSocket()
        let! token = Async.CancellationToken
        do! Async.AwaitTask (socket.ConnectAsync(Uri(feedUrl), token))

        while not token.IsCancellationRequested do
            let! result = Async.AwaitTask (socket.ReceiveAsync(segment, token))
            socketEvent.Trigger (Encoding.UTF8.GetString(buffer))
            Array.fill buffer 0 buffer.Length 0uy

    }

    (task, socketEvent.Publish)

let deserializeMsg (raw:string) =
    // returns a MsgType based on the received message

let tryGetData (msg:MsgType) =
    // returns Some data for specific kind of message; None otherwise

[<EntryPoint>]
let main argv =
    let feedProc, feedStream = connectFeed
    let msgStream = feedStream |> Observable.map deserializeMsg

    msgStream |> Observable.subscribe (fun m -> printfn "got msg: %A" m) |> ignore

    let dataStream = feedStream |> Observable.choose tryGetData
    dataStream |> Observable.subscribe (fun d -> printfn "got data: %A" d) |> ignore

    Async.RunSynchronously feedProc
    0

      

I expect to see a printout like:

got msg: { some: "field" }
got msg: { some: "other" }
got msg: { some: "data" }
got data: { // whatever }
got msg: ...
...

      

Instead, only "got msg" messages appear, even if there are messages that would result in a return tryGetData

Some

.

What's going on here? How to set up multiple streams Observable

from one event?

Refresh . I have updated the code as follows:

let isMsgA msg =
    printfn "isMsgA"
    match msg with
    | MsgA -> true // where MsgA is a member of a DU defined elsewhere, and is the result of deserializeMsg
    | _ -> false

let isStringMsgA msg =
    printfn "isStringMsgA"
    if msg.StartsWith("{ \"type\": \"msga\"") then true else false

[<EntryPoint>]
let main argv =
    let feedProc, feedStream = connectFeed
    let msgStream = feedStream |> Observable.map deserializeMsg

    msgStream 
    |> Observable.filter isMsgA
    |> Observable.subscribe (fun m -> printfn "got msg MsgA")
    |> ignore

    feedStream 
    |> Observable.filter isStringMsgA
    |> Observable.subscribe (fun m -> printfn "got string MsgA")
    |> ignore

      

And I get a screen full of "isStringMsgA" and "got string MsgA" messages, but exactly one of "isMsgA" and "got msg MsgA".

I'm puzzled.

Here's a stripped-down, reproducible example for anyone interested in this: https://github.com/aggieben/test-observable

Update 2 : I seem to be seeing this behavior due to an exception being thrown in the function deserializeMsg

. More digging ...

+3


source to share


1 answer


I don't see any obvious reason why this should be happening - can you add some entries in tryGetData

to check what input it receives and what results it returns?

When you use a module, Observable

you create a description of the processing pipeline, but you Observable.subscribe

create a concrete chain of listeners that do the work and attach handlers to the first event source. However, events are not "swallowed" - they should be sent to all observers.

For example, try playing with the following minimal demo:



let evt = Event<int>()

let e1 = evt.Publish |> Observable.choose (fun n -> 
  if n % 2 = 0 then Some "woop!" else None)
let e2 = evt.Publish |> Observable.map (fun n -> n * 10)

e1 |> Observable.subscribe (printfn "E1: %s")
e2 |> Observable.subscribe (printfn "E2: %d")

evt.Trigger(1)
evt.Trigger(2)

      

If you run this, it prints the expected output:

E2: 10
E1: woop!
E2: 20

      

+2


source







All Articles