Broadcast Hash Join (BHJ) in Spark for full outer join (outer, full, full)
How to force full outer join for Dataframes in spark to use Boradcast Hash Join? Here's a snippet of code:
sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
org.apache.spark.sql.functions.broadcast(SmallTable),
Seq("X", "Y", "Z", "W", "V"),
"outer"
)
My SmallTable is much smaller than autoBroadcastJoinThreshold
above. Also, if I use an internal, left_outer
or right_outer
join, I can see from the DAG rendering that the join is using BroadcastHashJoin
as expected.
However, when I use " outer
" as the connection type, spark decides to use SortMergeJoin
for some unknown reason. does anyone know how to solve this problem? Based on the performance I see with left outer join BroadcastHashJoin
will help speed up my application by a factor of several.
source to share
Spark decides to use SortMergeJoin for an unknown reason. does anyone know how to solve this problem?
Reason: FullOuter (means any keyword outer
, full
, fullouter
) used to support the Broadcast hash (aka map of the join)
How to prove it?
let's give one example:
package com.examples import org.apache.log4j. {Level, Logger} import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.functions._ / ** * Join Example and some basics demonstration using sample data. * * @author: Ram Ghadiyaram * / object JoinExamples extends Logging { // switch off un necessary logs Logger.getLogger ("org"). SetLevel (Level.OFF) val spark: SparkSession = SparkSession.builder.config ("spark.master", "local"). getOrCreate; case class Person (name: String, age: Int, personid: Int) case class Profile (name: String, personId: Int, profileDescription: String) / ** * main * * @param args Array [String] * / def main (args: Array [String]): Unit = { spark.conf.set ("spark.sql.join.preferSortMergeJoin", "false") import spark.implicits._ spark.sparkContext.getConf.getAllWithPrefix ("spark.sql"). foreach (x => logInfo (x.toString ())) / ** * create 2 dataframes here using case classes one is Person df1 and another one is profile df2 * / val df1 = spark.sqlContext.createDataFrame ( spark.sparkContext.parallelize ( Person ("Sarath", 33, 2) :: Person ("KangarooWest", 30, 2) :: Person ("Ravikumar Ramasamy", 34, 5) :: Person ("Ram Ghadiyaram", 42, 9) :: Person ("Ravi chandra Kancharla", 43, 9) :: Nil)) val df2 = spark.sqlContext.createDataFrame ( Profile ("Spark", 2, "SparkSQLMaster") :: Profile ("Spark", 5, "SparkGuru") :: Profile ("Spark", 9, "DevHunter") :: Nil ) // you can do alias to refer column name with aliases to increase readablity val df_asPerson = df1.as ("dfperson") val df_asProfile = df2.as ("dfprofile") / ** * * Example displays how to join them in the dataframe level * next example demonstrates using sql with createOrReplaceTempView * / val joined_df = df_asPerson.join ( broadcast (df_asProfile) , col ("dfperson.personid") === col ("dfprofile.personid") , "outer") val joined = joined_df.select ( col ("dfperson.name") , col ("dfperson.age") , col ("dfprofile.name") , col ("dfprofile.profileDescription")) joined.explain (false) // it will show which join was used joined.show } }
I tried to use broadcast fullouter
for the connection fullouter
but the framework is ignoring and using it SortMergeJoin
below is the plan of explanation for this. Result:
== Physical Plan == * Project [name # 4, age # 5, name # 11, profileDescription # 13] + - SortMergeJoin [personid # 6], [personid # 12], FullOuter : - * Sort [personid # 6 ASC NULLS FIRST], false, 0 : + - Exchange hashpartitioning (personid # 6, 200) : + - * SerializeFromObject [staticinvoke (class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Name, true) AS name # 4, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Age AS age # 5, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Personid AS personid # 6] : + - Scan ExternalRDDScan [obj # 3] + - * Sort [personid # 12 ASC NULLS FIRST], false, 0 + - Exchange hashpartitioning (personid # 12, 200) + - LocalTableScan [name # 11, personId # 12, profileDescription # 13] + -------------------- + --- + ----- + ------------------ + | name | age | name | profileDescription | + -------------------- + --- + ----- + ------------------ + | Ravikumar Ramasamy | 34 | Spark | SparkGuru | | Ram Ghadiyaram | 42 | Spark | DevHunter | | Ravi chandra Kanc ... | 43 | Spark | DevHunter | | Sarath | 33 | Spark | SparkSQLMaster | | KangarooWest | 30 | Spark | SparkSQLMaster | + -------------------- + --- + ----- + ------------------ +
From spark 2.3 Merge Sort Merge is the default merge algorithm in spark. However, this can be disabled using the internal parameter 'spark.sql.join.preferSortMergeJoin, which is set to true by default.
Other than fullouter
join ... If you don't want spark to use sortmergejoin you can set the property below.
sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")
these are instructions for code (which responds and converts the logical plan to zero or more SparkPlans) that you don't want to use . SparkStrategies.scala
sortmergejoin
Strong Remarks:
This property spark.sql.join.preferSortMergeJoin
If spark.sql.join.preferSortMergeJoin
true, prefer merge sort rather than random join using this PREFER_SORTMERGEJOIN property .
Setting false
means that spark cannot only select broadcasthashjoin, it can be anything else (like shuffle hash join).
The document below is in SparkStrategies.scala
i.e. on topobject JoinSelection extends Strategy with PredicateHelper...
- Broadcast: If one side of the pool has an approximate physical size that is less than a user configurable threshold [[
SQLConf.AUTO_BROADCASTJOIN_THRESHOLD
]] or if that side has an explicit broadcast hint (for example, the user has applied [[org.apache.spark.sql.functions.broadcast()
]] forDataFrame
), then that side of the pool will broadcast and the other side will be broadcast without rearrangement. If both sides of the association have the right to broadcast, then -
Arbitrary hash join: if the average size of a single partition is small enough to build a hash table.
-
Merge Sort: If matching merge keys are sorted.
source to share
Broadcast join does not support full outer join. It only supports the following types:
InnerLike | LeftOuter | LeftSemi | LeftAnti | Existence | RightOuter
See JoinStrategy for details .
source to share