Pig Java UDF: Create a dynamic tuple schema based on input parameters

EDIT: I'll try to explain what I want to do.

1 line of input looks like this: field1, field2, textbox

The text box is now a string entry that is a fixed number of characters. I want to parse this string to extract substrings from those characters. So, for example, I want to extract the first 10 characters and make this field, extract the next 15 and make this field, etc.

This part is fine. The problem is not that every line has the same textbox rules. In the main, field1 and field2 will be the key for you to use to parse this text field. This key on the map will point to an array of all the field names that I retrieve (for use in the output), as well as the length of each of the fields in the string (so I can parse).

The problem I am having is each of my outputs is a bag with different fields and values ​​in it. Some lines might have a bag with 10 tuples, others with 5 or 6. I need a way to define my schema in the exec function.

Edit:

My main goal is to use my UDF inputs to use a search map to output my output schema. I'm going to leave the rest of the post here as a link, but my main goal is to find out or see if the props to access the map can be used to get an array of my field types.

I am trying to return a bag of tuples, each with an alias or key. I have a map that contains an array of strings that I need to use as an alias for the tuples. Based on user input in the UDF, I will select the correct array containing my schema.

This works fine in my exec function because I have user input. But I cannot figure out how to use this input for my output circuit.

For a UDF entry, the first parameter combined with the second parameter will be the key to retrieve the array. The third parameter is a large text box that I need to parse at specific character intervals that are different for each array.

public class BagTupleExampleUDF extends EvalFunc<DataBag> {

TupleFactory mTupleFactory = TupleFactory.getInstance();
BagFactory mBagFactory = BagFactory.getInstance();

/* Set up the number of fields for each loop/segment type */
HashMap<String, String[]> FieldsMap = new HashMap<String, String[]>();
Map<String,int[]> FieldsNumChar = new HashMap<String, int[]>();


@Override
public DataBag exec(Tuple tuple) throws IOException {
    setUpMaps();


    // expect one string
    if (tuple == null || tuple.size() != 3) {
        throw new IllegalArgumentException("BagTupleExampleUDF: requires 3 input parameters.");
    }
    try {

         String param1 = (String)tuple.get(0);
         String param2 = (String)tuple.get(1);
         String textArea = (String)tuple.get(2);

         String processingText = textArea;



         String paramsCombined = loop.trim()+segment.trim();
         String[] fieldsArray = loopSegmentFieldsMap.get(paramsCombined);

         int[] endFieldsIndex = loopSegmentFieldsNumChar.get(paramsCombined);
         DataBag output = mBagFactory.newDefaultBag();

         Tuple outputTuple = mTupleFactory.newTuple();
         for(int i = 0; i < fieldsArray.length; i++){

                String temp = processingText.substring(0,endFieldsIndex[i]);
                processingText = processingText.substring(endFieldsIndex[i]);
             outputTuple.append(temp);

         }




        output.add(outputTuple);


        return output;
    }
    catch (Exception e) {
        throw new IOException("BagTupleExampleUDF: caught exception processing input.", e);
    }
}

      

** Here I need to somehow access this array below based on user input to determine which schema to use to populate. Now I have some hard stuff. But in my for loop I need the correct array.length and then when I set the field schema I will use the array [i]

But I cannot access the array in this function **

public Schema outputSchema(Schema input) {
        setUpMaps();
        // Function returns a bag with this schema: { (Double), (Double) }
        // Thus the outputSchema type should be a Bag containing a Double
        try{

            Schema tupleSchema = new Schema();
            String[] test = FieldsMap.get("array1");
            for(int i = 0; i<test.length; i++){
                tupleSchema.add(new Schema.FieldSchema(test[i], DataType.CHARARRAY));

            }


            return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass().getName().toLowerCase(), input), tupleSchema, DataType.BAG));
            }
        catch (Exception e){
            throw new RuntimeException(e);
        }
    }



        public void setUpMaps(){
            FieldsMap.put("array1", new String[]{"alias1","alias2","alias3","alias4","alias5","alias6","alias7","alias8","alias9"});

            FieldsNumChar.put("array1",new int[] {6,9,4,4,30,2,5,4,11});

        }
    }

      

In reality I will have 10+ arrays and the input will determine which array and which schema I am using. I've been stuck on this for a while and no matter what I try, I can't figure it out.

+3


source to share





All Articles