How do I run a list of blocking calls in parallel and get results in first-come-first-serve order?

Basically I have a list of functions to call

'(f1 f2 f3 f4)

      

each one blocks and takes a different time to return.

I would like to have a lazy sequence s, where the first element is the result of a shortcut calling

(first s)

      

will only block until the response time of this call. Likewise for other elements.

Concrete example: if

  • f1 takes 10 seconds
  • f2 takes 5s
  • f3 takes 1s
  • f4 takes 2s

then call

(first s)

      

will block the value 1s and return the result (f3)

(into [] (take 2 s)) 

      

will block 2s and return results (f3) and (f4) etc.

I thought about packaging all the functions in futures and delivering the results to promises. But I don't know how to figure out which promise will be delivered the fastest.

Does anyone have any idea how to do this?

+3


source to share


4 answers


(require '[clojure.core.async
           :refer [chan
                   >!
                   go
                   <!!]])

(def c (chan))

(def fns [#(do (Thread/sleep 5000) :fn-1)
          #(do (Thread/sleep 2000) :fn-2)
          #(do (Thread/sleep 1000) :fn-3)])
(do
  (go
    (doseq [f fns]
      (go (>! c (f)))))

  (println "First => " (<!! c)))

      



+3


source


A pure Clojure solution using promises, futures and one atom is certainly possible:

(defn parallelize
  [fs]
  (let [[h & r :as ps] (repeatedly (count fs) promise)
        queue (atom (cycle ps))]
    (doseq [f fs]
      (future
        (let [result (f)]
          (-> (swap! queue rest)
              (first)
              (deliver result)))))
    (map deref (concat r [h]))))

      

It basically creates a series of promises and uses an atom to store them as a circular queue. Each future then turns the queue, picks the next available promise, and delivers the result to the function.



Example:

(defn g
  [ms]
  (fn []
    (Thread/sleep ms)
    ms))

(doseq [value (parallelize (map g [500 200 100 300]))]
  (prn value))
;; 100
;; 200
;; 300
;; 500

      

+3


source


If you don't want to use core.async, you can go back to a simple queue:

(import 'java.util.concurrent.LinkedBlockingQueue)

(defn fut [q f] ;; this will need some error handling
  (future
    (.add q (f))))

(defn take-blocking [q n]
  (when (pos? n)
    (lazy-seq
     (cons (.take q)
           (take-blocking q (dec n))))))

(defn in-parallel [fns]
  (let [queue (LinkedBlockingQueue. (count fns))]
    (doseq [f fns]
      (fut queue f))
    (take-blocking queue (count fns))))

      

To use it:

(defn slow [n]
  (fn []
    (Thread/sleep (* n 1000))
    n))

(doseq [r (in-parallel [(slow 5) (slow 9) (slow 1) (slow 3)])]
  (println (java.util.Date.) r))

      

+1


source


Another good option for simple concurrency and workflow management is [com.climate / claypoole "0.3.3"] . It mimics map

and for

, but in parallel, either ordered or unordered, and with control of the threadflow size (unlike pmap, where the threadpool size is fixed at (* 2 cores)).

Here's an example with upmap, their unordered parallel version of the map. This means that the first implemented version of the displayed sequence is returned first. The first parameter is either a predefined threadpool or the size of the thread to use.

(require '[com.climate.claypoole :as cp]))

(defn wait-and-return
  [w]
  (Thread/sleep (* 1000 w))
  w)

(cp/upmap 4 wait-and-return [10 5 7 9])
=> (5 7 9 10)

      

Be sure to specify a thread size large enough to accommodate the maximum number of parallel waiting / io -bound functions.

(def to-sort
  (shuffle (range 0 40 2))

;not enough threads, so not returned in the right order
(def timesorted
  (time (doall (cp/upmap 10 wait-and-return to-sort))))
"Elapsed time: 52001. 812056 msecs"

(apply < timesorted)
=> false

;enough threads
(def timesorted
  (time (doall (cp/upmap 20 wait-and-return to-sort))))
"Elapsed time: 38002.858901 msecs"

(apply < timesorted)
=> true

      

Futures will not work in these situations because their threadpool is automatically increased to the maximum Integer / MAX_VALUE. However, if instead of the clay thread threadpool or threadpool size, you specify a built-in key, claypoole will use Clojure's own nearly unlimited thread pool used for futures and agent dispatch.

Note that if you do not know how much the number of threads will increase, managing and switching all of these threads can lead to performance degradation, so you should only use it in IO-bound situations, not CPU-bound.

(def timesorted
  (time (doall (cp/upmap :builtin wait-and-return to-sort))))
"Elapsed time: 38001.348402 msecs"

(apply < timesorted)
=> true

      

+1


source







All Articles