Hoes Does Spark Schedule To Join?

I am connecting two RDDs rddA

and rddB

.

rddA

has 100 sections and rddB

has 500 sections.

I am trying to understand the mechanics of the operation join

. By default, regardless of the join order, I get the same section structure; those. rddA.join(rddB

) and rddB.join(rddA)

gives the same number of partitions, while observing it uses the smaller partition size, 100. I know that I can increase the partition size with rddA.join(rddB,500)

, but I'm more interested in what's going on under the hood and why the lower size is chosen. From observation, even if I redo a small one rdd

, its subdivision will still be used; does Spark do any kind of heuristic analysis regarding key size?

Another problem I am facing is the level of skewness I am getting. My smaller partition ends up with 3,314 entries, and the larger one ends up with 1,139,207 out of a total size of 599,911,729 (keys). Both RDDs use a default delimiter, so how is the data fetch determined? I vaguely recall that if one rdd

has a set of delimiters, then that will use the delimiter. This is true? Is it "recommended" for this?

Finally, note that both of mine are rdd

relatively large (~ 90GB), so streaming wouldn't help. Instead, perhaps some way will be provided to provide some information about the job join

.

PS. Any information about the mechanism on the left and right will be additional bonuses :)

+3


source to share


1 answer


While I haven't been able to explain how the partitioning works yet, I figured out how the data is shuffled (which was my original problem). The compound has several side effects:

Rearranged / Markup: Spark will hash "RDD" sections and move / distribute to "Workers". Each set of values โ€‹โ€‹for a given key (for example, 5) will fall into one "Worker" / JVM. This means that if your "connection" has a ratio of 1..N, and N is badly mangled, you will end up with skewed JVM partitions and heaps (i.e. one "partition" can have Max (N) and another Min (N) ). The only way to avoid this is to use "broadcast" if possible, or tolerate this behavior. Since your data will be evenly distributed at first, the amount of shuffling will depend on the hash key.

Re-partitions: Following the skewed connection, the re-do call seems to distribute data evenly across the partitions. So it's okay if you have imminent asymmetry issues. Note that this transformation will cause heavy shuffling, but subsequent operations will be much faster. The disadvantage of this is the uncontrolled creation of the object (see below)



Object creation / heap pollution: You were able to join your data, thinking that redistribution would be a good idea to balance your cluster, but for some reason, "redistribution" triggers "OOME". It happens that the initially merged data reuses the merged objects. When you run "repartition" or any other "action" that involves shuffling, for example. additional connection, or "groupBy" (followed by "Action"), the data becomes serialized, so you lose object reuse. When objects are de-serialized, they are now new instances. Also note that reuse is lost during serialization, so the souffle will be pretty heavy. So in my case the connection is 1 ..1,000,000 (where 1 is my "heavy" object) will fail after any action that causes a shuffle.

Workarounds / Debug:

  • I used 'mapPartitionsWithIndex' to debug the sizes of the partitions, returning one 'Iterable>' element with each partition count. This is very useful as you can see the effect of the "rework" and the state of your sections after the "action".
  • You can use "countByKeyApprox" or "countByKey" in your RDD joins to get an idea of โ€‹โ€‹the cardinality and then apply the join in two steps. Use "broadcast" for high power keys and "join" for low power keys. Combining these operations into a "rdd.cache ()" and "rdd.unpersist ()" block will speed up this process significantly. While this may complicate your code a little, it will provide much better performance, especially if you are doing subsequent operations. Also note that if you use the "broadcast" on each "map" to search, you will also greatly reduce the size of the shuffle.
  • Calling to "reallocate" other operations that affect the number of partitions can be very useful, but keep in mind that a (random) large number of partitions will cause a lot of boredom as your large sets for a given key will create large partitions, but other partitions will be small or 0. Creating a debug method to get the size of the split will help you choose a good size.
+5


source







All Articles