The upper limit for the number of jobs in the do block?

Here is the code:

(ns typedclj.async
  (:require [clojure.core.async
             :as a
             :refer [>! <! >!! <!!
                     go chan buffer
                     close! thread
                     alts! alts!! timeout]]
            [clj-http.client :as -cc]))


(time (dorun
        (let [c (chan)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))

      

And I have an error:

Exception in thread "async-dispatch-12" java.lang.AssertionError: Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
    at clojure.core.async.impl.channels.ManyToManyChannel.put_BANG_(channels.clj:150)
    at clojure.core.async.impl.ioc_macros$put_BANG_.invoke(ioc_macros.clj:959)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817$fn__11819.invoke(async.clj:19)
    at typedclj.async$eval11807$fn__11816$state_machine__6185__auto____11817.invoke(async.clj:19)
    at clojure.core.async.impl.ioc_macros$run_state_machine.invoke(ioc_macros.clj:940)
    at clojure.core.async.impl.ioc_macros$run_state_machine_wrapped.invoke(ioc_macros.clj:944)
    at typedclj.async$eval11807$fn__11816.invoke(async.clj:19)
    at clojure.lang.AFn.run(AFn.java:22)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)...

      

According to http://martintrojer.github.io/clojure/2013/07/07/coreasync-and-blocking-io/

... This will destroy 1 job = 1 thread, so this parking thread will allow us to scale the number of jobs, limiting the flow on the platform (usually around 1000 per JVM).

core.async provides (blocking) pipes and a new (unlimited) thread pool when using a "thread". This is (essentially) just sugar behind the java threads (or clojure flags) and BlockingQueues from java.util.concurrent. The main feature is go blocks in which threads can be parked and resumed while (potentially) blocking calls with core.async pipes ...

Is 1e4 assignments too big? What is the upper limit?

+3


source to share


2 answers


I don't usually say that, so I hope you will forgive me for this one violation:

In a more prefectural world, every programmer would repeat himself "there is no such thing as an unlimited queue" five times before bed and the first thing after waking up. This way of thinking requires practicing how the backpressure will be handled in your system, so when there is a slowdown somewhere in the process, the parts that have a way to know about it will slow down in response. In core.async, the default backpressure is immediate because the default buffer size is zero. No blocking block succeeds in putting something in the vat until someone is ready to destroy it.

chans basically looks like this:

"queue of pending puts" --> buffer --> "queue of pending takes"

      



The putter and taker queues are designed to allow two processes communicating through this channel to schedule themselves so that progress can be made. Without them, there would be no room for threads to schedule and a deadlock would occur. They are NOT intended to be used as a buffer. this is what the buffer is in the middle, and it was a design that it is the only one that has an explicit size. explicitly set the buffer size for your system by setting the buffer size in chan:

user> (time (dorun
        (let [c (chan 1e6)]
          (doseq [i (range 10 1e4)]
            (go (>! c i))))))
"Elapsed time: 83.526679 msecs"
nil

      

In this case, I "calculated" that my system as a whole will be in good shape if there are up to a million pending jobs. Of course, your life experiments will be different and very unique to your situation.

Thank you for your patience,

+4


source


The unused puts limit is the size of the channel buffer plus the size of the queue.



The queue size in core.async is capped at 1024, but you should not rely on this.

+2


source







All Articles