How can I create a MapFile using Spark and access it?

I am trying to create a MapFile from Spark RDD but cannot find enough information. Here are my steps:

I started with

rdd.saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

which threw an exception as it MapFiles

needs to be sorted. So I changed to:

rdd.sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

which worked great and my MapFile was generated. So the next step was to access the file. Using the name of the directory it was created parts

in didn't indicate that it couldn't find the file data

. Back on Google I found that to access the parts MapFile

I needed to use:

Object ret = new Object();//My actual WritableComparable impl
Reader[] readers = MapFileOutputFormat.getReaders(new Path(file), new Configuration());
Partitioner<K,V> p = new HashPartitioner<>();
Writable e = MapFileOutputFormat.getEntry(readers, p key, ret);

      

Naively, I ignored the bit HashPartioner

and expected this to find my entry, but no luck. So my next step was to get hung up on the readers and get done get(..)

. This solution did work, but it was very slow as the files were created from 128 tasks, resulting in 128 files part

.

So I researched the importance HashPartitioner

and found that internally it uses it to determine which reader to use, but it seems that Spark doesn't use the same partitioning logic. So I changed to:

rdd.partitionBy(new org.apache.spark.HashPartitioner(128)).sortByKey().saveAsNewAPIHadoopFile(....MapFileOutputFormat.class)

But again, the 2 HashPartioner

didn't match. Therefore, some of the questions ...

  • Is there a way to efficiently combine MapFiles

    (since this ignores the pass-through logic)?
  • MapFileOutputFormat.getReaders(new Path(file), new Configuration());

    works very slowly. Can I get to know the reader more effectively?
  • I am using MapR-FS as my base DFS. Will this use the same implementation HashParitioner

    ?
  • Is there a way to avoid reallocating or sorting the data across the entire file? (As opposed to sorting in a section)
  • I am getting an exception _SUCCESS/data does not exist

    . Do I need to manually delete this file manually?

Any links about this would be greatly appreciated.

PS. If the records are sorted, how can you use it HashPartitioner

to find the correct one Reader

? This would mean that the data parts

Hash Partitioned

is then sorted by key. So I also tried it rdd.repartiotionAndSortWithinPartitions(new HashPartitioner(280))

, but again with no luck.

+3


source to share


1 answer


Digging into this question, I found that Spark HashPartitioner and Hadoop HashPartitioner have different logic.

So, the brute force solution that I tried and works is the following.

Save the MapFile with rdd.repartitionAndSortWithinPArtitions(new org.apache.aprk.HashPartitioner(num_of_parititions)).saveAsNewAPIHadoopFile(....MapFileOutputFormat.class);

Search with:



  • Reader [] reader = MapFileOutputFormat.getReaders (new path (file), new Configuration ());
  • org.apache.aprk.HashPartitioner p = new org.apache.aprk.HashPartitioner (reader.length);
  • readers [p.getPartition (key)] get (key, shaft);

This is "messy" because MapFile access is now tied to the Spark extender rather than the intuitive Hadoop HashPartitioner. I could implement a Spark extender that Hadoop uses HashPartitioner

to improve though.

This also does not address the problem of slow access to a relatively large number of gearboxes. I could make it even dirtier by creating the file part number from the delimiter, but I'm looking for a clean solution, so please post if there is a better approach to this problem.

+3


source







All Articles