How to split RDD

I have a text file consisting of a large number of random values ​​separated by spaces. I am uploading this file to RDD in scala. How does this RDD get partitioned?

Also, is there some method for creating custom sections so that all sections have an equal number of elements along with an index for each section?

val dRDD = sc.textFile("hdfs://master:54310/Data/input*")
keyval=dRDD.map(x =>process(x.trim().split(' ').map(_.toDouble),query_norm,m,r))

      

Here I am downloading some text files from HDFS and process is the function I am calling. Can I get a solution with mapPartitonsWithIndex along with how can I access this index inside the process function? The map shuffles the sections.

+2


source to share


3 answers


You can create custom sections using the coalesce function:



coalesce(numPartitions: Int, shuffle: Boolean = false): RDD[T]

      

+1


source


Loaded rdd is partitioned by default: hashcode. To specify a custom delimiter, use the rdd.partitionBy () command, which is provided with your own delimiter.



I don't think it is okay to use coalesce () here, like the api docs, coalesce () can only be used when we reduce the number of sections and even we cannot specify a custom delimiter with coalesce ().

+1


source


How does RDD split?

By default, a partition is created for each HDFS partition, which is 64 MB by default. More details here .

How do I balance my data across sections?

First, take a look at three ways to remake your data:

1) Pass the second parameter, the required minimum number of sections for your RDD, to textFile () , but be careful:

In [14]: lines = sc.textFile("data")

In [15]: lines.getNumPartitions()
Out[15]: 1000

In [16]: lines = sc.textFile("data", 500)

In [17]: lines.getNumPartitions()
Out[17]: 1434

In [18]: lines = sc.textFile("data", 5000)

In [19]: lines.getNumPartitions()
Out[19]: 5926

      

As you can see, [16]

it does not do what you would expect, since the number of partitions that have RDDs is already greater than the minimum number of requests we are requesting.

2) Use repartition () like:

In [22]: lines = lines.repartition(10)

In [23]: lines.getNumPartitions()
Out[23]: 10

      

Warning. This will cause shuffling and will be used if you want to increase the number of partitions in your RDD.

From the docs :

Shuffle is Sparks' mechanism for reallocating data so that it is grouped differently across partitions. This usually involves copying data between performers and machines, which makes shuffling difficult and costly.

3) Use coalesce () like:

In [25]: lines = lines.coalesce(2)

In [26]: lines.getNumPartitions()
Out[26]: 2

      

This is where Spark knows you are compressing the RDD and getting it. Learn more about repartition () vs coalesce () .


But will all of this ensure that your data is perfectly balanced across your partitions? Not quite as I experienced in How do I balance my data across partitions?

+1


source







All Articles