Scala parallel unordered iterator

I have Iterable

"work units" that need to be executed in a specific order and can easily run in parallel without interfering with each other.

Unfortunately, running too many of them in one go will exceed my available RAM, so I need to make sure that only a small portion is running at any given time.

Basically, I want a function of this type signature:

parMap[A, B](xs: Iterator[A], f: A => B, chunkSize: Int): Iterator[B]

      

so the output is Iterator

not necessarily in the same order as the input (if I want to maintain knowledge of where the output came from, I can output a pair with the input or whatever.) Then the consumer can consume the resulting iterator gradually without waiting the entire memory of the apparatus while maintaining as much parallelism as possible.

Also, I want the function to be as efficient as possible. The original idea I had was, for example, to do something along the following lines:

xs.iterator.grouped(chunkSize).flatMap(_.toSet.par.map(f).iterator)

      

where I was hoping to toSet

tell Scala to the parallel collection that it can start creating elements from its iterator as soon as they are ready, in any order, and the call grouped

was to limit the number of concurrent workers. Unfortunately, the call does not appear to toSet

achieve the desired effect (results are returned in the same order as they would have been without the call par

in my experiments), and the call is grouped

sub-optimal. For example, if we have a group size of 100 and 99 of these tasks fill up immediately on a dozen cores, but one of them is particularly slow, most of the remaining cores will be idle until we can move on to the next group. It would be much cleaner to have a "responsive window" that is nothing more than my chunk size, but doesn't get in the way of slow workers.

I can imagine something similar to myself with a processing queue (de) or something like that, but I suppose most of the work with concurrency primitives has already been done for me at some level in the Scala parallel collection library. Does anyone know in what parts I can reuse this functionality or any other suggestions on how to implement such an operation?

+3


source to share


1 answer


The parallel collection structure allows you to specify the maximum number of threads that will be used for a given task. Using scala -2.10 you want to do:

def parMap[A,B](x : Iterable[A], f : A => B, chunkSize : Int) = {
  val px = x.par
  px.tasksupport = new ForkJoinTaskSupport(new scala.concurrent.forkjoin.ForkJoinPool(chunkSize))
  px map f
}

      



This will prevent more than one chunkSize

operation from starting at once. This uses a job stealing strategy to keep the actors on the job, and therefore does not suffer from the same problem as your example grouped

above.

Doing this, however, will not reorder the results in first order. To do this, I suggest something like turning your operation into an actor and having a small actor pool doing the operations and then sending the results to you as they complete.

+3


source







All Articles