Hadoop Map Reduce hashing program
In Hadoop I wrote a map shortening program to hash all the records in a file and add the hased value as an extra attribute for each record, and then output to the Hadoop filesystem This is the code I wrote
public class HashByMapReduce
{
public static class LineMapper extends Mapper<Text, Text, Text, Text>
{
private Text word = new Text();
public void map(Text key, Text value, Context context) throws IOException, InterruptedException
{
key.set("single")
String line = value.toString();
word.set(line);
context.write(key, line);
}
}
public static class LineReducer
extends Reducer<Text,Text,Text,Text>
{
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values,
Context context
) throws IOException, InterruptedException
{
String translations = "";
for (Text val : values)
{
translations = val.toString()+","+String.valueOf(hash64(val.toString())); //Point of Error
result.set(translations);
context.write(key, result);
}
}
}
public static void main(String[] args) throws Exception
{
Configuration conf = new Configuration();
Job job = new Job(conf, "Hashing");
job.setJarByClass(HashByMapReduce.class);
job.setMapperClass(LineMapper.class);
job.setReducerClass(LineReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(KeyValueTextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
I wrote this code with the logic that each row is read by a Map method, which assigns the entire value to one key, which then goes to the same reducer method. which passes each value to the hash64 () function.
But I can see that its passing a null value (empty value) to the hash function. I can't figure out why? thanks in advance
source to share
The cause of the problem is most likely related to use KeyValueTextInputFormat
. From the Yahoo tutorial :
InputFormat: Description: Key: Value:
TextInputFormat Default format; The byte offset The line contents
reads lines of of the line
text files
KeyValueInputFormat Parses lines Everything up to the The remainder of
into key, first tab character the line
val pairs
It splits your wrt input lines tab
. I suppose there is no in your lines tab
. As a result, key
in LineMapper
represents a whole string, while nothing is passed as value
(not sure null
or empty).
From your code, I think you are better off using a class TextInputFormat
like your inputformat that produces a line offset as key
well as a full line like value
. This should fix your problem.
EDIT: I am running your code with the following changes and it seems to work fine:
- Changed inputformat to
TextInputFormat
and change Mapper declaration accordingly - Added correct
setMapOutputKeyClass
andsetMapOutputValueClass
injob
. They are optional, but they often pose a startup problem. - Removed yours
ket.set("single")
and added private key to Mapper. - Since you didn't provide the details of the method
hash64
, I used itString.toUpperCase
for testing.
If the problem persists, then I'm pretty sure your hash method was failing null
.
Complete code:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class HashByMapReduce {
public static class LineMapper extends
Mapper<LongWritable, Text, Text, Text> {
private Text word = new Text();
private Text outKey = new Text("single");
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
word.set(line);
context.write(outKey, word);
}
}
public static class LineReducer extends Reducer<Text, Text, Text, Text> {
private Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
String translations = "";
for (Text val : values) {
translations = val.toString() + ","
+ val.toString().toUpperCase(); // Point of Error
result.set(translations);
context.write(key, result);
}
}
}
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
Job job = new Job(conf, "Hashing");
job.setJarByClass(HashByMapReduce.class);
job.setMapperClass(LineMapper.class);
job.setReducerClass(LineReducer.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
source to share