How to simulate Rx `withLatestFrom` with core.async pipes?
For example, for a channel with operations and another channel with data, how to write a block go
that will apply the operation on any last value in the data channel?
(go-loop []
(let [op (<! op-ch)
data (<! data-ch)]
(put! result-ch (op data))))
Obviously this does not work because both channels will need the same frequency.
source to share
By using alts!
, you can accomplish what you want.
The one shown below with-latest-from
implements the same behavior as in withLatestFrom
from RxJS (I think: P).
(require '[clojure.core.async :as async]) (def op-ch (async/chan)) (def data-ch (async/chan)) (defn with-latest-from [chs f] (let [result-ch (async/chan) latest (vec (repeat (count chs) nil)) index (into {} (map vector chs (range)))] (async/go-loop [latest latest] (let [[value ch] (async/alts! chs) latest (assoc latest (index ch) value)] (when-not (some nil? latest) (async/put! result-ch (apply f latest))) (when value (recur latest)))) result-ch)) (def result-ch (with-latest-from [op-ch data-ch] str)) (async/go-loop [] (prn (async/<! result-ch)) (recur)) (async/put! op-ch :+) ;= true (async/put! data-ch 1) ;= true ; ":+1" (async/put! data-ch 2) ;= true ; ":+2" (async/put! op-ch :-) ;= true ; ":-2"
source to share
Here's the :priority true
parameter alts!
.
An expression that always returns the last seen value in some channel would look something like this:
(def in-chan (chan))
(def mem (chan))
(go (let [[ch value] (alts! [in-chan mem] :priority true)]
(take! mem) ;; clear mem (take! is non-blocking)
(>! mem value) ;; put the new (or old) value in the mem
value ;; return a chan with the value in
It's untested, it's probably inefficient (variable volatile
is probably probably better). go
-block returns a channel with only value, but the idea can be extended to some "memoized" channel.
source to share