Doing a simple task on another thread in scala

I was wondering if there is a way to accomplish very simple tasks in another thread in scala that doesn't have a lot of overhead?

Basically, I would like to create a global "executor" that can handle the execution of an arbitrary number of tasks. Then I can use the executor to create additional constructs.

Also, it would be nice if clients didn't consider blocking or non-blocking considerations.

I know that the scala actor library is built on top of Doug Lea FJ's stuff, and also that they support to a limited extent what I'm trying to accomplish. However, from my understanding, I will have to pre-allocate the "Actor Pool" for execution.

I would like to avoid creating a pool of global threads for this, since from what I understand this is not all that good about fine-grained parallelism.

Here's a simple example:

import concurrent.SyncVar
object SimpleExecutor {
  import actors.Actor._
  def exec[A](task:  => A) : SyncVar[A] = {
    //what goes here?
    //This is what I currently have
    val x = new concurrent.SyncVar[A]
    //The overhead of making the actor appears to be a killer
    actor {
      x.set(task)
    }
    x
  }
  //Not really sure what to stick here
  def execBlocker[A](task: => A) : SyncVar[A] = exec(task)

}

      

and now an example using exec:

object Examples {
  //Benchmarks a task
  def benchmark(blk : => Unit) = {
    val start = System.nanoTime
    blk
    System.nanoTime - start
  }

  //Benchmarks and compares 2 tasks
  def cmp(a: => Any, b: => Any) = {
    val at = benchmark(a)
    val bt = benchmark(b)
    println(at + " " + bt + " " +at.toDouble / bt)
  }

  //Simple example for simple non blocking comparison
  import SimpleExecutor._
  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

  //Simple example for the blocking performance
  import Thread.sleep
  def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
  def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}

      

Finally, to run the examples (you might want to do this a few times so that HotSpot can warm up):

import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))

      

+2


source to share


1 answer


For what it was created Futures

. Just import scala.actors.Futures._

use future

to create new futures, such as awaitAll

to wait for results for some time, apply

or respond

to block until a result is received isSet

, to see if it's ready or not, etc.

You also don't need to create a thread pool. Or at least not usually. Why do you think you are doing?

EDIT

You can't get performance parallelizing something as simple as adding an integer, because it's even faster than calling a function. Concurrency will only lead to performance, avoiding wasted time for blocking I / O and using multiple processor cores to execute tasks in parallel. In the latter case, the task must be computationally sufficient to compensate for the cost of splitting the workload and merging the results.

Another reason for Concurrency is to improve the responsiveness of the application. This doesn't make it faster, which makes it more responsive to the user, and one way to do this is to get even relatively fast operations offloaded to another thread so that threads processing what the user sees or does can be faster. But I'm distracted.

There is a serious problem in your code:

  def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

      

Or, translating futures,

  def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
  def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)

      



You might think that you are paraAdd

executing tasks in parallel, but it is not because it Range

has a loose implementation map

(before Scala 2.7, starting with Scala 2.8.0, it Range

is strict). You can look it up on other Scala questions. What's happening:

  • The range is created from 0

    tohi

  • A range projector is created from each element of the i range into a function that returns future(i+5)

    when called.
  • For each element of the range projection ( i => future(i+5))

    , the element is evaluated ( foreach

    is strict) and then a function is called on it apply

    .

So, since future

it is not called in step 2, but only in step 3, you will wait for each future

to complete before executing the next. You can fix this with:

  def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)

      

This will give you the best performance, but will never be as good as a simple instant add-on. On the other hand, suppose you do this:

def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) = 
  (0 until n).force map (_ => future(f)) foreach (_.apply)

      

And then compare:

cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))

      

You can start making a profit (this will depend on the number of cores and processor speed).

+8


source







All Articles