How can I set up multiple observables from an event in F #?
I am trying to learn about the Observable module in F # by writing a program that connects to a web juice, listens for messages, and then processes them on some set of Observables based streams. However, I find it difficult to understand the real behavior.
First, I set up websocket like this:
open System
open System.Net.WebSockets
open System.Threading
let connectFeed =
let feedUrl = "blah blah"
let buffer : byte array = Array.zeroCreate 1024
let segment = ArraySegment(buffer)
let socketEvent = new Event<string>()
let task = async {
let random = Random(DateTime.Now.Millisecond)
use socket = new ClientWebSocket()
let! token = Async.CancellationToken
do! Async.AwaitTask (socket.ConnectAsync(Uri(feedUrl), token))
while not token.IsCancellationRequested do
let! result = Async.AwaitTask (socket.ReceiveAsync(segment, token))
socketEvent.Trigger (Encoding.UTF8.GetString(buffer))
Array.fill buffer 0 buffer.Length 0uy
}
(task, socketEvent.Publish)
let deserializeMsg (raw:string) =
// returns a MsgType based on the received message
let tryGetData (msg:MsgType) =
// returns Some data for specific kind of message; None otherwise
[<EntryPoint>]
let main argv =
let feedProc, feedStream = connectFeed
let msgStream = feedStream |> Observable.map deserializeMsg
msgStream |> Observable.subscribe (fun m -> printfn "got msg: %A" m) |> ignore
let dataStream = feedStream |> Observable.choose tryGetData
dataStream |> Observable.subscribe (fun d -> printfn "got data: %A" d) |> ignore
Async.RunSynchronously feedProc
0
I expect to see a printout like:
got msg: { some: "field" }
got msg: { some: "other" }
got msg: { some: "data" }
got data: { // whatever }
got msg: ...
...
Instead, only "got msg" messages appear, even if there are messages that would result in a return tryGetData
Some
.
What's going on here? How to set up multiple streams Observable
from one event?
Refresh . I have updated the code as follows:
let isMsgA msg =
printfn "isMsgA"
match msg with
| MsgA -> true // where MsgA is a member of a DU defined elsewhere, and is the result of deserializeMsg
| _ -> false
let isStringMsgA msg =
printfn "isStringMsgA"
if msg.StartsWith("{ \"type\": \"msga\"") then true else false
[<EntryPoint>]
let main argv =
let feedProc, feedStream = connectFeed
let msgStream = feedStream |> Observable.map deserializeMsg
msgStream
|> Observable.filter isMsgA
|> Observable.subscribe (fun m -> printfn "got msg MsgA")
|> ignore
feedStream
|> Observable.filter isStringMsgA
|> Observable.subscribe (fun m -> printfn "got string MsgA")
|> ignore
And I get a screen full of "isStringMsgA" and "got string MsgA" messages, but exactly one of "isMsgA" and "got msg MsgA".
I'm puzzled.
Here's a stripped-down, reproducible example for anyone interested in this: https://github.com/aggieben/test-observable
Update 2 : I seem to be seeing this behavior due to an exception being thrown in the function deserializeMsg
. More digging ...
source to share
I don't see any obvious reason why this should be happening - can you add some entries in tryGetData
to check what input it receives and what results it returns?
When you use a module, Observable
you create a description of the processing pipeline, but you Observable.subscribe
create a concrete chain of listeners that do the work and attach handlers to the first event source. However, events are not "swallowed" - they should be sent to all observers.
For example, try playing with the following minimal demo:
let evt = Event<int>()
let e1 = evt.Publish |> Observable.choose (fun n ->
if n % 2 = 0 then Some "woop!" else None)
let e2 = evt.Publish |> Observable.map (fun n -> n * 10)
e1 |> Observable.subscribe (printfn "E1: %s")
e2 |> Observable.subscribe (printfn "E2: %d")
evt.Trigger(1)
evt.Trigger(2)
If you run this, it prints the expected output:
E2: 10
E1: woop!
E2: 20
source to share