How does Spark SQL decide the number of partitions to use when loading data from a Hive table?

This question is the same as Number of sections of spark infoframe created by reading data from Hive table

But I think this question didn't get the right answer. Note that the question asks how many partitions will be created when the framework is created as a result of executing a sql query on the HIVE table using SparkSession.sql .

IIUC, the question above is different from the question of how many partitions will be created when the dataframe is created as a result of executing some type of code spark.read.json("examples/src/main/resources/people.json")

that loads data directly from the filesystem, which might be HDFS. I think the answer to this last question is given by spark.sql.files.maxPartitionBytes

spark.sql.files.maxPartitionBytes 134217728 (128 MB) Maximum number of bytes to pack into one partition when reading files.

Experimentally, I tried to create a dataframe from the HIVE table and the number of partitions I am getting is not explained total data in hive table / spark.sql.files.maxPartitionBytes

Also adding to the OP , it would be nice to know how the number of partitions can be controlled i.e. when you want to force the spark to use a different number than the default.

Literature:

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/SparkSession.scala

https://github.com/apache/spark/blob/master/sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala

+3


source to share


1 answer


TL; The DR: . By default, the number of partitions when reading data from Hive will be determined by the HDFS blockSize. The number of sections can be increased by setting mapreduce.job.maps to the appropriate value and can be decreased by setting mapreduce.input.fileinputformat.split.minsize to the appropriate value

Spark SQL creates a HadoopRDD instance when loading data from a hive table.

An RDD that provides basic functionality for reading data stored in Hadoop (for example, files in HDFS, sources in HBase or S3) using the older MapReduce APIs (org.apache.hadoop.mapred).

enter image description here

HadoopRDD, in turn, splits the input files according to the method computeSplitSize

defined in org.apache.hadoop.mapreduce.lib.input.FileInputFormat (new API) and org.apache.hadoop.mapred.FileInputFormat (old API).

New API:

protected long computeSplitSize(long blockSize, long minSize,
                                  long maxSize) {
    return Math.max(minSize, Math.min(maxSize, blockSize));
  }

      

Old API:



protected long computeSplitSize(long goalSize, long minSize,
                                       long blockSize) {
    return Math.max(minSize, Math.min(goalSize, blockSize));
  }

      

computeSplitSize splits files according to HDFS blockSize, but if blockSize is less minSize

or greater than maxSize

then it is clamped to those extremes. HDFS blockSize can be obtained from

hdfs getconf -confKey dfs.blocksize

      

According to Hadoop's definitive manual Table 8.5 is minSize

derived from mapreduce.input.fileinputformat.split.minsize

, but maxSize

derived from mapreduce.input.fileinputformat.split.maxsize

.

enter image description here

However, the book also mentions mapreduce.input.fileinputformat.split.maxsize

that:

This property is not present in the old MapReduce API (except for CombineFileInputFormat). Instead, it is computed indirectly as the size of the total input for the job divided by the direction number of the map jobs specified by the mapreduce.job.maps (or the setNumMapTasks () method on JobConf).

this post also calculates the maxSize using the total input size divided by the number of map tasks.

+3


source







All Articles