How do I set up multiple Avro schemas using AvroParquetOutputFormat?
In my MapReduce job, I am using AvroParquetOutputFormat to write to Parquet files using Avro schema.
The application logic requires the creation of several types of Reducer-generated files, and each file has its own Avro schema.
The AvroParquetOutputFormat class has a static setSchema () method for setting the Avro output schema. Looking at the code, AvroParquetOutputFormat uses AvroWriteSupport.setSchema (), which is again a static implementation.
Without extending AvroWriteSupport and hacking the logic, is there an easier way to get multiple Avro schema schemas from AvroParquetOutputFormat in a single MR job?
Any pointers / inputs are much appreciated.
Thanks and Regards
MK
source to share
It may be quite late to answer, but I also ran into this problem and came up with a solution.
First, there is parquet-mr
no built-in support MultipleAvroParquetOutputFormat
'. But to achieve similar behavior, I used MultipleOutputs
.
For a map display-only assignment, position your cartographer as follows:
public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{
protected KafkaAvroDecoder deserializer;
protected String outputPath = "";
// Using MultipleOutputs to write custom named files
protected MultipleOutputs<Void, GenericRecord> mos;
public void setup(Context context) throws IOException, InterruptedException {
super.setup(context);
Configuration conf = context.getConfiguration();
outputPath = conf.get(FileOutputFormat.OUTDIR);
mos = new MultipleOutputs<Void, GenericRecord>(context);
}
public void map(LongWritable ln, BytesWritable value, Context context){
try {
GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
Schema schema = record.getSchema();
String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm
mos.write( (Void) null, record, mergeEventsPath);
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
@Override
public void cleanup(Context context) throws IOException, InterruptedException {
mos.close();
}
}
This will create a new one RecordWriter
for each schema and create a new parquet file appended with the schema name, for example schema1-r-0000.parquet.
This will also create the default part-r-0000x.parquet files based on the schema set in the driver. To avoid this use LazyOutputFormat
like:
LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);
Hope it helps.
source to share