In D, how can I aggregate the results in a parallel foreach efficiently without using TaskPool.reduce?
Often in D, I want to write something similar to:
int result = 0;
foreach(someclass c; parallel(someclass_array)){
result += somefunction(c);
}
In some cases, I can rewrite this to something like:
TaskPool.reduce!("a+b")(TaskPool.map!(somefunction)(c);
but in other cases this is not possible, for example:
int result = 0;
someotherclass d;
int otherArg = 5;
foreach(someclass c; parallel(someclass_array)){
result += d.somefunction(c, otherArg);
}
which won't work because some function will then be passed to display as a delegate (to support otherArg), but delegates are not currently mixed with those pointers in D.
I would really like the way:
int result = 0;
foreach(someclass c; parallel(someclass_array)){
int tmp = somefunction(c);
... //something to indicate that this section is atomic
result += tmp;
... //end of atomic section.
}
I can see that D has semaphores, but using them here seems too complicated. I also tried atomOp! ("+ =") But it looks like undefined. Is there an idiomatic way D to do this?
source to share
You can use a block statementsynchronized
.
int result = 0;
foreach(someclass c; parallel(someclass_array)){
int tmp = somefunction(c);
synchronized result += tmp;
}
source to share
Another option is to use a parallel receiver that collects all the results. As an example, consider this little program about calculating the factorial of a number in parallel: https://github.com/SommerEngineering/ParallelFactorial01
On line 81, the receiver determines by creating a new stream. Line 90 shows how this parallel thread expects and reads the intermediate result from the parallel foreach threads. Line 124 shows an opposition where the parallel input to send sends the results to the recipient.
Finally, line 130 shows how the main thread, i.e. the main program waits for all threads to complete.
source to share