Why doesn't this simple Spark program use multiple cores?

So, I am running this simple program on a 16-core multi-core system. I run it with the following.

spark-submit --master local[*] pi.py

      

And the code for this program is as follows.

#"""pi.py"""
from pyspark import SparkContext
import random

N = 12500000

def sample(p):
    x, y = random.random(), random.random()
    return 1 if x*x + y*y < 1 else 0

sc = SparkContext("local", "Test App")
count = sc.parallelize(xrange(0, N)).map(sample).reduce(lambda a, b: a + b)
print "Pi is roughly %f" % (4.0 * count / NUM_SAMPLES)

      

When I use top to view CPU consumption, only 1 core is used. Why is this so? Seconldy, spark documentation says the default parallelism is contained in the spark.default.parallelism property. How can I read this property from my python program?

+3


source to share


4 answers


Probably because the sc.parallelize call puts all the data in one separate section. You can specify the number of partitions as 2nd argument to parallelize:

part = 16
count = sc.parallelize(xrange(N), part).map(sample).reduce(lambda a, b: a + b)

      

Note that this will still generate 12 million points with a single cpu in the driver, and then propagate them to only 16 partitions to perform the reduction step.

The improved approach will try to do most of the work after partitioning: for example, the following only generates a small array in the driver, and then allows each remote task to generate actual random numbers and a subsequent PI approximation:



part = 16
count = ( sc.parallelize([0] * part, part)
           .flatMap(lambda blah: [sample(p) for p in xrange( N/part)])
           .reduce(lambda a, b: a + b)
       )

      

Finally, (because the more lazy we get the better), the sparks of mllib actually comes already with random data generation which is well parallelized, take a look here: http://spark.apache.org/docs/1.1.0/mllib- statistics.html # random-data-generation . So maybe the following is close to what you are trying to do (not tested => may not work, but hopefully will be close)

count = ( RandomRDDs.uniformRDD(sc, N, part)
        .zip(RandomRDDs.uniformRDD(sc, N, part))
        .filter (lambda (x, y): x*x + y*y < 1)
        .count()
        )

      

+3


source


As much as none of the above really worked for me (possibly because I didn't really understand them), here are my two cents.

I started my work with spark-submit program.py

and inside the file I had sc = SparkContext("local", "Test")

. I tried to check the number of spark cores using sc.defaultParallelism

. It turned out to be 1. When I changed the context initialization to sc = SparkContext("local[*]", "Test")

, it became 16 (the number of cores on my system) and my program was using all the cores.



I'm new to spark , but I understand that local defaults to using one core and since it's installed inside the program, it overwrites other settings (exactly in my case it overwrites them from config files and environment variables).

+11


source


To change the CPU core consumption, set the number of cores that will be used by the workers in the file spark-env.sh

to spark-installation-directory/conf

This is done with an attribute SPARK_EXECUTOR_CORES

in the spark-env.sh file. The default is set to 1.

+1


source


I tried the method mentioned by @Svend but still doesn't work.

The following works for me:

Do NOT use a URL local

like:

sc = SparkContext("local", "Test App")

...

Use the master url like this:

sc = SparkContext("spark://your_spark_master_url:port", "Test App")

+1


source







All Articles