NullPointerException in the MapReduce sorter

I know the SortComparator is used to sort the map output by their keys. I wrote a custom SortComparator to better understand the MapReduce structure. This is my WordCount class with a custom SortComparator class.

package bananas;

import java.io.FileWriter;
import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class WordCount {


  public static class TokenizerMapper
       extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word = new Text();

    public void map(Object key, Text value, Context context
                    ) throws IOException, InterruptedException {
      StringTokenizer itr = new StringTokenizer(value.toString());

      while (itr.hasMoreTokens()) {
        word.set(itr.nextToken());
        context.write(word, one);

      }
    }
  }

  public static class IntSumReducer
       extends Reducer<Text,IntWritable,Text,IntWritable> {
    private IntWritable result = new IntWritable();    
    public void reduce(Text key, Iterable<IntWritable> values,
                       Context context
                       ) throws IOException, InterruptedException {


      int sum = 0;
      for (IntWritable val : values) {
        sum += val.get();
      }

      result.set(sum);
      context.write(key, result);
    }
  }

  public static class MyPartitoner extends Partitioner<Text, IntWritable>{

    @Override
    public int getPartition(Text key, IntWritable value, int numPartitions) {


        return Math.abs(key.hashCode()) % numPartitions;
    }  
  }

  public static class MySortComparator2 extends WritableComparator{

      protected MySortComparator2() {
          super();
          }

      @SuppressWarnings({ "rawtypes" })
    @Override
      public int compare(WritableComparable w1,WritableComparable w2){

          return 0;
      }
  }

  public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = Job.getInstance(conf, "word count");
    job.setJarByClass(WordCount.class);
    job.setSortComparatorClass(MySortComparator2.class);
    job.setMapperClass(TokenizerMapper.class);
    job.setCombinerClass(IntSumReducer.class);
    job.setReducerClass(IntSumReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
  }
}

      

but when i execute this i get this error

Error: java.lang.NullPointerException
    at org.apache.hadoop.io.WritableComparator.compare(WritableComparator.java:157)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.compare(MapTask.java:1265)
    at org.apache.hadoop.util.QuickSort.fix(QuickSort.java:35)
    at org.apache.hadoop.util.QuickSort.sortInternal(QuickSort.java:87)
    at org.apache.hadoop.util.QuickSort.sort(QuickSort.java:63)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.sortAndSpill(MapTask.java:1593)
    at org.apache.hadoop.mapred.MapTask$MapOutputBuffer.flush(MapTask.java:1482)
    at org.apache.hadoop.mapred.MapTask$NewOutputCollector.close(MapTask.java:720)
    at org.apache.hadoop.mapred.MapTask.runNewMapper(MapTask.java:790)
    at org.apache.hadoop.mapred.MapTask.run(MapTask.java:341)
    at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:163)
    at java.security.AccessController.doPrivileged(Native Method)
    at javax.security.auth.Subject.doAs(Subject.java:415)
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1628)
    at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:158) 

      

My SortComparator class works great for me. After the mapping is done by MySortComparator2, the comparison method must receive "text" keys as input, and since I return 0, the sort will fail. This is what I expected to see / observe. I followed these tutorials.

http://codingjunkie.net/secondary-sort/

http://blog.zaloni.com/secondary-sorting-in-hadoop

http://www.bigdataspeak.com/2013/02/hadoop-how-to-do-secondary-sort-on_25.html

Thanks in advance for your help.

+3


source to share


3 answers


You also need to implement / override this method:

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    // per your desired no-sort logic
    return 0;
}

      



I think your comparator is being constructed in such a way that the variables mentioned in the super implementation are null (and this is the method called in support of the sort, not the method you wrote above). This is why you are getting a null pointer exception. By overriding a method with an implementation that doesn't use variables, you can avoid an exception.

+3


source


There is actually a problem with the MySortComparator2 constructor . The code should look like

protected MySortComparator2() {
      super(Text.class, true);
}

      



where the first parameter is your key class and the second parameter value ensures that it WritableComparator

is instantiated in a way that WritableComparator.compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2)

can causeMySortComparator2.compare(WritableComparable a, WritableComparable b)

+4


source


As Chris Gerken said, you need to override this method when extending WritableComparator, or implement RawComparator instead of WritableComparator.

public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {
    return 0;
}

      

and as you said, you want to see the sort fails, but if you return 0, that means every time MapReduce tries to sort / compare, it sees each key as the same, so you only get one pair of keys, values ​​which will be the first key in the map task to be completed first, and a value with the number of words in the input file. I hope you understand what I am saying. If your input is something like this

why are rockets cylindrical

      

your reduction result will be

why  4

      

as it takes everything as the same key. Hope this helps.

0


source







All Articles