Apache Spark - shuffle writes more data than the size of the input

I am using Spark 2.1 in local mode and I am running this simple application.

val N = 10 << 20

sparkSession.conf.set("spark.sql.shuffle.partitions", "5")
sparkSession.conf.set("spark.sql.autoBroadcastJoinThreshold", (N + 1).toString)
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

val df1 = sparkSession.range(N).selectExpr(s"id as k1")
val df2 = sparkSession.range(N / 5).selectExpr(s"id * 3 as k2")

df1.join(df2, col("k1") === col("k2")).count()

      

Here range (N) creates a Long dataset (with unique values), so I assume the size is

  • df1 = N * 8 bytes ~ 80MB
  • df2 = N / 5 * 8 bytes ~ 16 MB

Now let's take df1 as an example. df1 is 8 partitions and shuffledRDD 5 , so I assume that

  • # of mappers (M) = 8
  • # of gears (R) = 5

Since the number of partitions is small, Spark will use the Hash Shuffle which will create M * R files on disk, but I didn't understand if each file has all the data, thus each_file_size = data_size , which results in M * R * data_size files or all_files = data_size .

However, when running this application, shuffle the df1 = 160MB entry which doesn't match any of the above cases.

Corrected interface

What am I missing here? Why is the recording data randomly duplicated in size?

+3


source to share


1 answer


First of all, let's see what it means data size total(min, med, max)

:

According to SQLMetrics.scala # L88 and ShuffleExchange.scala # L43 , data size total(min, med, max)

we see the final value of the dataSize

shuffle metric . Then how is it updated? It is updated every time a record is serialized: UnsafeRowSerializer.scala # L66 dataSize.add(row.getSizeInBytes)

( UnsafeRow

this is the internal representation of records in Spark SQL).

Internally UnsafeRow

maintained byte[]

and copied directly to the underlying output stream during serialization, its method getSizeInBytes()

returns the length byte[]

. So the original question translates to: Why is the representation of bytes twice as large as a single column long

that has a record? This UnsafeRow.scala doc gives us the answer:



Each tuple has three parts: [set of zero bits] [values] [variable length part]

The bit set is used for zero tracking and is aligned with 8-byte word boundaries. It stores one bit per field.

since it is 8 byte aligned, only 1 zero bit takes another 8 bytes, the same width as the long column. Therefore each UnsafeRow

represents your row with one long column using 16 bytes.

+3


source







All Articles