How do I set up multiple Avro schemas using AvroParquetOutputFormat?

In my MapReduce job, I am using AvroParquetOutputFormat to write to Parquet files using Avro schema.

The application logic requires the creation of several types of Reducer-generated files, and each file has its own Avro schema.

The AvroParquetOutputFormat class has a static setSchema () method for setting the Avro output schema. Looking at the code, AvroParquetOutputFormat uses AvroWriteSupport.setSchema (), which is again a static implementation.

Without extending AvroWriteSupport and hacking the logic, is there an easier way to get multiple Avro schema schemas from AvroParquetOutputFormat in a single MR job?

Any pointers / inputs are much appreciated.

Thanks and Regards

MK

+3


source to share


1 answer


It may be quite late to answer, but I also ran into this problem and came up with a solution.

First, there is parquet-mr

no built-in support MultipleAvroParquetOutputFormat

'. But to achieve similar behavior, I used MultipleOutputs

.

For a map display-only assignment, position your cartographer as follows:

public class EventMapper extends Mapper<LongWritable, BytesWritable, Void, GenericRecord>{

    protected  KafkaAvroDecoder deserializer;
    protected String outputPath = "";

    // Using MultipleOutputs to write custom named files
    protected MultipleOutputs<Void, GenericRecord> mos;

    public void setup(Context context) throws IOException, InterruptedException {
        super.setup(context);
        Configuration conf = context.getConfiguration();           
        outputPath = conf.get(FileOutputFormat.OUTDIR);
        mos = new MultipleOutputs<Void, GenericRecord>(context);
    }

    public void map(LongWritable ln, BytesWritable value, Context context){

        try {
            GenericRecord record = (GenericRecord) deserializer.fromBytes(value.getBytes());
            AvroWriteSupport.setSchema(context.getConfiguration(), record.getSchema());
            Schema schema = record.getSchema();
            String mergeEventsPath = outputPath + "/" + schema.getName(); // Adding '/' will do no harm 
            mos.write( (Void) null, record, mergeEventsPath);

        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    @Override
    public void cleanup(Context context) throws IOException, InterruptedException {
        mos.close();
    }

}

      



This will create a new one RecordWriter

for each schema and create a new parquet file appended with the schema name, for example schema1-r-0000.parquet.

This will also create the default part-r-0000x.parquet files based on the schema set in the driver. To avoid this use LazyOutputFormat

like:

LazyOutputFormat.setOutputFormatClass(job, AvroParquetOutputFormat.class);

      

Hope it helps.

+1


source







All Articles