Spark Join failed due to out of memory
My cluster: 9 slaves with 100GB memory and 320GB hard drive. Each host has 16 cores. I ran 15 spark performers on each host so the available memory for each performer is 6GB. My application:
val rdd1 = sc.textFile("a big file in S3. about 200GB" with 14M rows)
val rdd2 = sc.textFile("another big file in S3. about 200GB" with 14M rows)
val par = new HashPartitioner(150)
val rdd1Paired = rdd1.map(regular expression to get one string from each row).filter(non matched rows).partitionBy(par)
val rdd2Paired = rdd2.mpa(regular expression to get one string from each row).filter(non matched rows).partitionBy(par)
val rdd3 = rdd1.join(rdd2, par)
rdd3.count()
I understood from the spark interface that the work was planned in three stages. Filtering for rdd1, filtering for rdd2 and counter. Although filter1 and filter2 succeed, the count always fails due to OOM. It's strange that the work always hangs at the countdown stage (149/150). I checked the artist assigned to TID150 and see a sharp increase in the Shuffle read. And after a while it crashes due to OOM. I've also seen GC happen frequently with this performer.
The question is here: Why is only one executor fetching all the data (I checked the last operation the executor was doing was starting a fetch job)? As I understand it, when I use the same delimiter for two RDDs, they will be shared together. And running them on the same task guarantees co-location of data. Thus, the connection must happen on every performer. The first 149 tasks come out quickly and don't seem to do anything. It looks like the last challenge is trying all the work.
source to share
My guess is that the distribution of the keys you generate from your data is skewed, so that many of them fall into the same section.
The fact that this is the last failed task is that it is the largest, so it does the longest and becomes the last.
To solve this problem, increase the number of partitions and / or increase the number of servers.
source to share