How can I parallelize GPars actors?
My understanding of GPars. Actors can be disabled, so please correct me if I'm wrong. I have a Groovy application that polls a web service for jobs. When one or more jobs are found, it submits each job to the DynamicDispatchActor
one I created and the job runs. The works are completely self-contained and there is no need to return anything to the main thread. When multiple jobs come in at once, I'd like them to be processed in parallel, but no matter what configuration I try, the actor processes them first.
To give some example code:
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def actor = poolGroup.messageHandler {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
integers.each {
actor << it
}
This gives:
I'm number 1 on thread Actor Thread 31 I'm number 2 on thread Actor Thread 31 I'm number 3 on thread Actor Thread 31 I'm number 4 on thread Actor Thread 31 I'm number 5 on thread Actor Thread 31 I'm number 6 on thread Actor Thread 31 I'm number 7 on thread Actor Thread 31 I'm number 8 on thread Actor Thread 31 I'm number 9 on thread Actor Thread 31 I'm number 10 on thread Actor Thread 31
With a short pause between each printout. Also note that each printout comes from the same Actor / Stream.
What I would like to see here is the first 5 numbers, which are printed instantly because the thread pool is set to 5, and then the next 5 numbers when those threads are freed. Am I completely out of here?
source to share
There are a few changes to make it run as you expect:
import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool
def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))
def closure = {
when {Integer msg ->
println("I'm number ${msg} on thread ${Thread.currentThread().name}")
Thread.sleep(1000)
stop()
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()
Full gist file: https://gist.github.com/wololock/7f1348e04f68710e42d2
Then the output will be:
I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1
Now let's see what has changed. First of all, in your previous example, you only worked with one player. You identified correctly poolGroup
, but then you created one actor and translated the computation into that single instance. For these computations to work in parallel, you only need to rely on poolGroup
and send only a message to some group of message handlers that will handle the creation of members and their lifecycle management. This is what we do:
def actors = integers.collect { poolGroup.messageHandler(closure) << it }
It will create a collection of actors starting at the given input. The pool group will ensure that the specified pool size is not exceeded. Then you should join
every actor, and this can be done using magic groovy: actors*.join()
. Thank you for letting the app wait until all participants have stopped computing. Therefore, we need to add a method stop()
to close the body when
of the message handler body - without it, it will not complete, because the pool group does not know that the actors have completed the task - they can wait, for example. for another message.
Alternative solution
We can also consider an alternative solution that uses parallel iterations using GPars:
import groovyx.gpars.GParsPool
// This example is dummy, but let assume that this processor is
// stateless and shared between threads component.
class Processor {
void process(int number) {
println "${Thread.currentThread().name} starting with number ${number}"
Thread.sleep(1000)
}
}
def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
Processor processor = new Processor()
GParsPool.withPool 5, {
integers.eachParallel { processor.process(it) }
}
In this example, you have a stateless component Processor
and parallel computation using a single stateless instance Processor
with multiple inputs.
I tried to figure out the case you mentioned in the comment, but I'm not sure if a single actor can handle multiple messages at once. Statelessness of an actor only means that it does not change its internal state while processing a message, and it should not store any other information in scope. It would be great if someone could correct me if my reasoning is wrong :)
Hope this helps you. Best!
source to share