How can I parallelize GPars actors?

My understanding of GPars. Actors can be disabled, so please correct me if I'm wrong. I have a Groovy application that polls a web service for jobs. When one or more jobs are found, it submits each job to the DynamicDispatchActor

one I created and the job runs. The works are completely self-contained and there is no need to return anything to the main thread. When multiple jobs come in at once, I'd like them to be processed in parallel, but no matter what configuration I try, the actor processes them first.

To give some example code:

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def actor = poolGroup.messageHandler {
    when {Integer msg -> 
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

integers.each {
    actor << it
}

      

This gives:

I'm number 1 on thread Actor Thread 31
I'm number 2 on thread Actor Thread 31
I'm number 3 on thread Actor Thread 31
I'm number 4 on thread Actor Thread 31
I'm number 5 on thread Actor Thread 31
I'm number 6 on thread Actor Thread 31
I'm number 7 on thread Actor Thread 31
I'm number 8 on thread Actor Thread 31
I'm number 9 on thread Actor Thread 31
I'm number 10 on thread Actor Thread 31

With a short pause between each printout. Also note that each printout comes from the same Actor / Stream.

What I would like to see here is the first 5 numbers, which are printed instantly because the thread pool is set to 5, and then the next 5 numbers when those threads are freed. Am I completely out of here?

+3


source to share


1 answer


There are a few changes to make it run as you expect:

import groovyx.gpars.group.DefaultPGroup
import groovyx.gpars.scheduler.DefaultPool

def poolGroup = new DefaultPGroup(new DefaultPool(true, 5))

def closure = {
    when {Integer msg ->
        println("I'm number ${msg} on thread ${Thread.currentThread().name}")
        Thread.sleep(1000)
        stop()
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

def actors = integers.collect { poolGroup.messageHandler(closure) << it }
actors*.join()

      

Full gist file: https://gist.github.com/wololock/7f1348e04f68710e42d2

Then the output will be:

I'm number 5 on thread Actor Thread 5
I'm number 4 on thread Actor Thread 4
I'm number 1 on thread Actor Thread 1
I'm number 3 on thread Actor Thread 3
I'm number 2 on thread Actor Thread 2
I'm number 6 on thread Actor Thread 3
I'm number 9 on thread Actor Thread 4
I'm number 7 on thread Actor Thread 2
I'm number 8 on thread Actor Thread 5
I'm number 10 on thread Actor Thread 1

      

Now let's see what has changed. First of all, in your previous example, you only worked with one player. You identified correctly poolGroup

, but then you created one actor and translated the computation into that single instance. For these computations to work in parallel, you only need to rely on poolGroup

and send only a message to some group of message handlers that will handle the creation of members and their lifecycle management. This is what we do:

def actors = integers.collect { poolGroup.messageHandler(closure) << it }

      



It will create a collection of actors starting at the given input. The pool group will ensure that the specified pool size is not exceeded. Then you should join

every actor, and this can be done using magic groovy: actors*.join()

. Thank you for letting the app wait until all participants have stopped computing. Therefore, we need to add a method stop()

to close the body when

of the message handler body - without it, it will not complete, because the pool group does not know that the actors have completed the task - they can wait, for example. for another message.

Alternative solution

We can also consider an alternative solution that uses parallel iterations using GPars:

import groovyx.gpars.GParsPool

// This example is dummy, but let assume that this processor is
// stateless and shared between threads component.
class Processor {
    void process(int number) {
        println "${Thread.currentThread().name} starting with number ${number}"
        Thread.sleep(1000)
    }
}

def integers = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

Processor processor = new Processor()

GParsPool.withPool 5, {
    integers.eachParallel { processor.process(it) }
}

      

In this example, you have a stateless component Processor

and parallel computation using a single stateless instance Processor

with multiple inputs.

I tried to figure out the case you mentioned in the comment, but I'm not sure if a single actor can handle multiple messages at once. Statelessness of an actor only means that it does not change its internal state while processing a message, and it should not store any other information in scope. It would be great if someone could correct me if my reasoning is wrong :)

Hope this helps you. Best!

+3


source







All Articles