How does Spark SQL decide the number of partitions to use when loading data from a Hive table?
This question is the same as Number of sections of spark infoframe created by reading data from Hive table
But I think this question didn't get the right answer. Note that the question asks how many partitions will be created when the framework is created as a result of executing a sql query on the HIVE table using SparkSession.sql .
IIUC, the question above is different from the question of how many partitions will be created when the dataframe is created as a result of executing some type of code spark.read.json("examples/src/main/resources/people.json")
that loads data directly from the filesystem, which might be HDFS. I think the answer to this last question is given by spark.sql.files.maxPartitionBytes
spark.sql.files.maxPartitionBytes 134217728 (128 MB) Maximum number of bytes to pack into one partition when reading files.
Experimentally, I tried to create a dataframe from the HIVE table and the number of partitions I am getting is not explained total data in hive table / spark.sql.files.maxPartitionBytes
Also adding to the OP , it would be nice to know how the number of partitions can be controlled i.e. when you want to force the spark to use a different number than the default.
Literature:
source to share
TL; The DR: . By default, the number of partitions when reading data from Hive will be determined by the HDFS blockSize. The number of sections can be increased by setting mapreduce.job.maps to the appropriate value and can be decreased by setting mapreduce.input.fileinputformat.split.minsize to the appropriate value
Spark SQL creates a HadoopRDD instance when loading data from a hive table.
An RDD that provides basic functionality for reading data stored in Hadoop (for example, files in HDFS, sources in HBase or S3) using the older MapReduce APIs (org.apache.hadoop.mapred).
HadoopRDD, in turn, splits the input files according to the method computeSplitSize
defined in org.apache.hadoop.mapreduce.lib.input.FileInputFormat (new API) and org.apache.hadoop.mapred.FileInputFormat (old API).
New API:
protected long computeSplitSize(long blockSize, long minSize,
long maxSize) {
return Math.max(minSize, Math.min(maxSize, blockSize));
}
Old API:
protected long computeSplitSize(long goalSize, long minSize,
long blockSize) {
return Math.max(minSize, Math.min(goalSize, blockSize));
}
computeSplitSize splits files according to HDFS blockSize, but if blockSize is less minSize
or greater than maxSize
then it is clamped to those extremes. HDFS blockSize can be obtained from
hdfs getconf -confKey dfs.blocksize
According to Hadoop's definitive manual Table 8.5 is minSize
derived from mapreduce.input.fileinputformat.split.minsize
, but maxSize
derived from mapreduce.input.fileinputformat.split.maxsize
.
However, the book also mentions mapreduce.input.fileinputformat.split.maxsize
that:
This property is not present in the old MapReduce API (except for CombineFileInputFormat). Instead, it is computed indirectly as the size of the total input for the job divided by the direction number of the map jobs specified by the mapreduce.job.maps (or the setNumMapTasks () method on JobConf).
this post also calculates the maxSize using the total input size divided by the number of map tasks.
source to share