Doing a simple task on another thread in scala
I was wondering if there is a way to accomplish very simple tasks in another thread in scala that doesn't have a lot of overhead?
Basically, I would like to create a global "executor" that can handle the execution of an arbitrary number of tasks. Then I can use the executor to create additional constructs.
Also, it would be nice if clients didn't consider blocking or non-blocking considerations.
I know that the scala actor library is built on top of Doug Lea FJ's stuff, and also that they support to a limited extent what I'm trying to accomplish. However, from my understanding, I will have to pre-allocate the "Actor Pool" for execution.
I would like to avoid creating a pool of global threads for this, since from what I understand this is not all that good about fine-grained parallelism.
Here's a simple example:
import concurrent.SyncVar
object SimpleExecutor {
import actors.Actor._
def exec[A](task: => A) : SyncVar[A] = {
//what goes here?
//This is what I currently have
val x = new concurrent.SyncVar[A]
//The overhead of making the actor appears to be a killer
actor {
x.set(task)
}
x
}
//Not really sure what to stick here
def execBlocker[A](task: => A) : SyncVar[A] = exec(task)
}
and now an example using exec:
object Examples {
//Benchmarks a task
def benchmark(blk : => Unit) = {
val start = System.nanoTime
blk
System.nanoTime - start
}
//Benchmarks and compares 2 tasks
def cmp(a: => Any, b: => Any) = {
val at = benchmark(a)
val bt = benchmark(b)
println(at + " " + bt + " " +at.toDouble / bt)
}
//Simple example for simple non blocking comparison
import SimpleExecutor._
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
//Simple example for the blocking performance
import Thread.sleep
def paraSle(hi : Int) = (0 until hi) map (i=>exec(sleep(i))) foreach (_.get)
def singSle(hi : Int) = (0 until hi) foreach (i=>sleep(i))
}
Finally, to run the examples (you might want to do this a few times so that HotSpot can warm up):
import Examples._
cmp(paraAdd(10000), singAdd(10000))
cmp(paraSle(100), singSle(100))
source to share
For what it was created Futures
. Just import scala.actors.Futures._
use future
to create new futures, such as awaitAll
to wait for results for some time, apply
or respond
to block until a result is received isSet
, to see if it's ready or not, etc.
You also don't need to create a thread pool. Or at least not usually. Why do you think you are doing?
EDIT
You can't get performance parallelizing something as simple as adding an integer, because it's even faster than calling a function. Concurrency will only lead to performance, avoiding wasted time for blocking I / O and using multiple processor cores to execute tasks in parallel. In the latter case, the task must be computationally sufficient to compensate for the cost of splitting the workload and merging the results.
Another reason for Concurrency is to improve the responsiveness of the application. This doesn't make it faster, which makes it more responsive to the user, and one way to do this is to get even relatively fast operations offloaded to another thread so that threads processing what the user sees or does can be faster. But I'm distracted.
There is a serious problem in your code:
def paraAdd(hi: Int) = (0 until hi) map (i=>exec(i+5)) foreach (_.get)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
Or, translating futures,
def paraAdd(hi: Int) = (0 until hi) map (i=>future(i+5)) foreach (_.apply)
def singAdd(hi: Int) = (0 until hi) foreach (i=>i+5)
You might think that you are paraAdd
executing tasks in parallel, but it is not because it Range
has a loose implementation map
(before Scala 2.7, starting with Scala 2.8.0, it Range
is strict). You can look it up on other Scala questions. What's happening:
- The range is created from
0
tohi
- A range projector is created from each element of the i range into a function that returns
future(i+5)
when called. - For each element of the range projection (
i => future(i+5))
, the element is evaluated (foreach
is strict) and then a function is called on itapply
.
So, since future
it is not called in step 2, but only in step 3, you will wait for each future
to complete before executing the next. You can fix this with:
def paraAdd(hi: Int) = (0 until hi).force map (i=>future(i+5)) foreach (_.apply)
This will give you the best performance, but will never be as good as a simple instant add-on. On the other hand, suppose you do this:
def repeat(n: Int, f: => Any) = (0 until n) foreach (_ => f)
def paraRepeat(n: Int, f: => Any) =
(0 until n).force map (_ => future(f)) foreach (_.apply)
And then compare:
cmp(repeat(100, singAdd(100000)), paraRepeat(100, singAdd(100000)))
You can start making a profit (this will depend on the number of cores and processor speed).
source to share