Broadcast Hash Join (BHJ) in Spark for full outer join (outer, full, full)

How to force full outer join for Dataframes in spark to use Boradcast Hash Join? Here's a snippet of code:

sparkConfiguration.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")
val Result = BigTable.join(
  org.apache.spark.sql.functions.broadcast(SmallTable),
  Seq("X", "Y", "Z", "W", "V"),
  "outer"
)

      

My SmallTable is much smaller than autoBroadcastJoinThreshold

above. Also, if I use an internal, left_outer

or right_outer

join, I can see from the DAG rendering that the join is using BroadcastHashJoin

as expected.

However, when I use " outer

" as the connection type, spark decides to use SortMergeJoin

for some unknown reason. does anyone know how to solve this problem? Based on the performance I see with left outer join BroadcastHashJoin

will help speed up my application by a factor of several.

+5


source to share


2 answers


Spark decides to use SortMergeJoin for an unknown reason. does anyone know how to solve this problem?

Reason: FullOuter (means any keyword outer

, full

, fullouter

) used to support the Broadcast hash (aka map of the join)

How to prove it?

let's give one example:

package com.examples

import org.apache.log4j. {Level, Logger}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

/ **
  * Join Example and some basics demonstration using sample data.
  *
  * @author: Ram Ghadiyaram
  * /
object JoinExamples extends Logging {
  // switch off un necessary logs
  Logger.getLogger ("org"). SetLevel (Level.OFF)
   val spark: SparkSession = SparkSession.builder.config ("spark.master", "local"). getOrCreate;
  case class Person (name: String, age: Int, personid: Int)

  case class Profile (name: String, personId: Int, profileDescription: String)

  / **
    * main
    *
    * @param args Array [String]
    * /
  def main (args: Array [String]): Unit = {
    spark.conf.set ("spark.sql.join.preferSortMergeJoin", "false")
    import spark.implicits._

    spark.sparkContext.getConf.getAllWithPrefix ("spark.sql"). foreach (x => logInfo (x.toString ()))
    / **
      * create 2 dataframes here using case classes one is Person df1 and another one is profile df2
      * /
    val df1 = spark.sqlContext.createDataFrame (
      spark.sparkContext.parallelize (
        Person ("Sarath", 33, 2)
          :: Person ("KangarooWest", 30, 2)
          :: Person ("Ravikumar Ramasamy", 34, 5)
          :: Person ("Ram Ghadiyaram", 42, 9)
          :: Person ("Ravi chandra Kancharla", 43, 9)
          :: Nil))


    val df2 = spark.sqlContext.createDataFrame (
      Profile ("Spark", 2, "SparkSQLMaster")
        :: Profile ("Spark", 5, "SparkGuru")
        :: Profile ("Spark", 9, "DevHunter")
        :: Nil
    )

    // you can do alias to refer column name with aliases to increase readablity

    val df_asPerson = df1.as ("dfperson")
    val df_asProfile = df2.as ("dfprofile")
    / ** *
      * Example displays how to join them in the dataframe level
      * next example demonstrates using sql with createOrReplaceTempView
      * /
    val joined_df = df_asPerson.join (
      broadcast (df_asProfile)
      , col ("dfperson.personid") === col ("dfprofile.personid")
      , "outer")
    val joined = joined_df.select (
      col ("dfperson.name")
      , col ("dfperson.age")
      , col ("dfprofile.name")
      , col ("dfprofile.profileDescription"))
    joined.explain (false) // it will show which join was used
    joined.show

  }

}

I tried to use broadcast fullouter

for the connection fullouter

but the framework is ignoring and using it SortMergeJoin

below is the plan of explanation for this. Result:

== Physical Plan ==
* Project [name # 4, age # 5, name # 11, profileDescription # 13]
+ - SortMergeJoin [personid # 6], [personid # 12], FullOuter
   : - * Sort [personid # 6 ASC NULLS FIRST], false, 0
   : + - Exchange hashpartitioning (personid # 6, 200)
   : + - * SerializeFromObject [staticinvoke (class org.apache.spark.unsafe.types.UTF8String, StringType, fromString, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Name, true) AS name # 4, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Age AS age # 5, assertnotnull (input [0, com.examples.JoinExamples $ Person, true]). Personid AS personid # 6]
   : + - Scan ExternalRDDScan [obj # 3]
   + - * Sort [personid # 12 ASC NULLS FIRST], false, 0
      + - Exchange hashpartitioning (personid # 12, 200)
         + - LocalTableScan [name # 11, personId # 12, profileDescription # 13]
+ -------------------- + --- + ----- + ------------------ +
| name | age | name | profileDescription |
+ -------------------- + --- + ----- + ------------------ +
| Ravikumar Ramasamy | 34 | Spark | SparkGuru |
| Ram Ghadiyaram | 42 | Spark | DevHunter |
| Ravi chandra Kanc ... | 43 | Spark | DevHunter |
| Sarath | 33 | Spark | SparkSQLMaster |
| KangarooWest | 30 | Spark | SparkSQLMaster |
+ -------------------- + --- + ----- + ------------------ +

From spark 2.3 Merge Sort Merge is the default merge algorithm in spark. However, this can be disabled using the internal parameter 'spark.sql.join.preferSortMergeJoin, which is set to true by default.

Other than fullouter

join ... If you don't want spark to use sortmergejoin you can set the property below.



sparkSession.conf.set("spark.sql.join.preferSortMergeJoin", "false")

      

these are instructions for code (which responds and converts the logical plan to zero or more SparkPlans) that you don't want to use . SparkStrategies.scala

sortmergejoin

Strong Remarks:

This property spark.sql.join.preferSortMergeJoin

If spark.sql.join.preferSortMergeJoin

true, prefer merge sort rather than random join using this PREFER_SORTMERGEJOIN property .

Setting false

means that spark cannot only select broadcasthashjoin, it can be anything else (like shuffle hash join).

The document below is in SparkStrategies.scala

i.e. on topobject JoinSelection extends Strategy with PredicateHelper...


  • Broadcast: If one side of the pool has an approximate physical size that is less than a user configurable threshold [[ SQLConf.AUTO_BROADCASTJOIN_THRESHOLD

    ]] or if that side has an explicit broadcast hint (for example, the user has applied [[ org.apache.spark.sql.functions.broadcast()

    ]] for DataFrame

    ), then that side of the pool will broadcast and the other side will be broadcast without rearrangement. If both sides of the association have the right to broadcast, then
  • Arbitrary hash join: if the average size of a single partition is small enough to build a hash table.

  • Merge Sort: If matching merge keys are sorted.

+2


source


Broadcast join does not support full outer join. It only supports the following types:

InnerLike | LeftOuter | LeftSemi | LeftAnti | Existence | RightOuter



See JoinStrategy for details .

+2


source







All Articles