Hadoop Map Reduce hashing program

In Hadoop I wrote a map shortening program to hash all the records in a file and add the hased value as an extra attribute for each record, and then output to the Hadoop filesystem This is the code I wrote

public class HashByMapReduce
{
public static class LineMapper extends Mapper<Text, Text, Text, Text>
{
    private Text word = new Text();

    public void map(Text key, Text value, Context context) throws IOException,    InterruptedException
      {
        key.set("single")
        String line = value.toString();
            word.set(line);
            context.write(key, line);

    }
}
public static class LineReducer
extends Reducer<Text,Text,Text,Text>
{
    private Text result = new Text();
    public void reduce(Text key, Iterable<Text> values,
    Context context
    ) throws IOException, InterruptedException
    {
        String translations = "";
        for (Text val : values)
        {
            translations = val.toString()+","+String.valueOf(hash64(val.toString())); //Point of Error 

        result.set(translations);
        context.write(key, result);
        }
    }
}
public static void main(String[] args) throws Exception
{
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Hashing");
    job.setJarByClass(HashByMapReduce.class);
    job.setMapperClass(LineMapper.class);
    job.setReducerClass(LineReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(KeyValueTextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

      

I wrote this code with the logic that each row is read by a Map method, which assigns the entire value to one key, which then goes to the same reducer method. which passes each value to the hash64 () function.

But I can see that its passing a null value (empty value) to the hash function. I can't figure out why? thanks in advance

+3


source to share


1 answer


The cause of the problem is most likely related to use KeyValueTextInputFormat

. From the Yahoo tutorial :

  InputFormat:          Description:       Key:                     Value:

  TextInputFormat       Default format;    The byte offset          The line contents 
                        reads lines of     of the line                            
                        text files

  KeyValueInputFormat   Parses lines       Everything up to the     The remainder of                      
                        into key,          first tab character      the line
                        val pairs 

      

It splits your wrt input lines tab

. I suppose there is no in your lines tab

. As a result, key

in LineMapper

represents a whole string, while nothing is passed as value

(not sure null

or empty).

From your code, I think you are better off using a class TextInputFormat

like your inputformat that produces a line offset as key

well as a full line like value

. This should fix your problem.

EDIT: I am running your code with the following changes and it seems to work fine:



  • Changed inputformat to TextInputFormat

    and change Mapper declaration accordingly
  • Added correct setMapOutputKeyClass

    and setMapOutputValueClass

    in job

    . They are optional, but they often pose a startup problem.
  • Removed yours ket.set("single")

    and added private key to Mapper.
  • Since you didn't provide the details of the method hash64

    , I used it String.toUpperCase

    for testing.

If the problem persists, then I'm pretty sure your hash method was failing null

.

Complete code:

 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.hadoop.mapreduce.Reducer;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
 import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

 public class HashByMapReduce {
 public static class LineMapper extends
        Mapper<LongWritable, Text, Text, Text> {
    private Text word = new Text();
    private Text outKey = new Text("single");

    public void map(LongWritable key, Text value, Context context)
            throws IOException, InterruptedException {
        String line = value.toString();
        word.set(line);
        context.write(outKey, word);
    }
}

public static class LineReducer extends Reducer<Text, Text, Text, Text> {
    private Text result = new Text();

    public void reduce(Text key, Iterable<Text> values, Context context)
            throws IOException, InterruptedException {
        String translations = "";
        for (Text val : values) {
            translations = val.toString() + ","
                    + val.toString().toUpperCase(); // Point of Error

            result.set(translations);
            context.write(key, result);
        }
    }
}

public static void main(String[] args) throws Exception {
    Configuration conf = new Configuration();
    Job job = new Job(conf, "Hashing");
    job.setJarByClass(HashByMapReduce.class);
    job.setMapperClass(LineMapper.class);
    job.setReducerClass(LineReducer.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);
    job.setInputFormatClass(TextInputFormat.class);
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    System.exit(job.waitForCompletion(true) ? 0 : 1);
}

      

}

+2


source







All Articles