Apache Spark - shuffle writes more data than the size of the input
I am using Spark 2.1 in local mode and I am running this simple application.
val N = 10 << 20
sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")
df1.join(df2, col("k1") === col("k2")).count()
Here range (N) creates a Long dataset (with unique values), so I assume the size is
- df1 = N * 8 bytes ~ 80MB
- df2 = N / 5 * 8 bytes ~ 16 MB
Now let's take df1 as an example. df1 is 8 partitions and shuffledRDD 5 , so I assume that
- # of mappers (M) = 8
- # of gears (R) = 5
Since the number of partitions is small, Spark will use the Hash Shuffle which will create M * R files on disk, but I didn't understand if each file has all the data, thus each_file_size = data_size , which results in M * R * data_size files or all_files = data_size .
However, when running this application, shuffle the df1 = 160MB entry which doesn't match any of the above cases.
What am I missing here? Why is the recording data randomly duplicated in size?
source to share
First of all, let's see what it means data size total(min, med, max)
:
According to SQLMetrics.scala # L88 and ShuffleExchange.scala # L43 , data size total(min, med, max)
we see the final value of the dataSize
shuffle metric . Then how is it updated? It is updated every time a record is serialized: UnsafeRowSerializer.scala # L66 dataSize.add(row.getSizeInBytes)
( UnsafeRow
this is the internal representation of records in Spark SQL).
Internally UnsafeRow
maintained byte[]
and copied directly to the underlying output stream during serialization, its method getSizeInBytes()
returns the length byte[]
. So the original question translates to: Why is the representation of bytes twice as large as a single column long
that has a record? This UnsafeRow.scala doc gives us the answer:
Each tuple has three parts: [set of zero bits] [values] [variable length part]
The bit set is used for zero tracking and is aligned with 8-byte word boundaries. It stores one bit per field.
since it is 8 byte aligned, only 1 zero bit takes another 8 bytes, the same width as the long column. Therefore each UnsafeRow
represents your row with one long column using 16 bytes.
source to share