How to define multiple custom delimiters for file input in spark mode?

The default input file separator when reading a file through Spark is a newline character (\ n). You can define a custom delimiter using the textinputformat.record.delimiter property.

But is it possible to specify multiple delimiters for the same file?

Let's assume the file has the following content:

COMMENT,A,B,C
COMMENT,D,E,
F
LIKE,I,H,G
COMMENT,J,K,
L
COMMENT,M,N,O

      

I want to read this delimited file as COMMENT and LIKE instead of newline character.

Although, I've come up with an alternative if multiple delimiters are not allowed in the sparks.

val ss = SparkSession.builder().appName("SentimentAnalysis").master("local[*]").getOrCreate()
val sc = ss.sparkContext
sc.hadoopConfiguration.set("textinputformat.record.delimiter", "COMMENT")
val rdd = sc.textFile("<filepath>")
val finalRdd = rdd.flatmap(f=>f.split("LIKE"))

      

But still, I think it is better to have multiple custom separators. Is a spark possible? or do I need to use the above mentioned alternative?

+3


source to share


1 answer


Solved the issue above by creating a custom TextInputFormat class that separates the two types of delimiter strings. The post pointed out by @puhlen in the comments was a big help. Find below the piece of code that I used:

class CustomInputFormat extends TextInputFormat {
  override def createRecordReader(inputSplit: InputSplit,  taskAttemptContext: TaskAttemptContext): RecordReader[LongWritable, Text] = {
    return new ParagraphRecordReader();
  }
}
class ParagraphRecordReader extends RecordReader[LongWritable, Text] {
  var end: Long = 0L;
  var stillInChunk = true;

  var key = new LongWritable();
  var value = new Text();

  var fsin: FSDataInputStream = null;
  val buffer = new DataOutputBuffer();
  val tempBuffer1 = MutableList[Int]();
  val tempBuffer2 = MutableList[Int]();

  val endTag1 = "COMMENT".getBytes();
  val endTag2 = "LIKE".getBytes();

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def initialize(inputSplit: org.apache.hadoop.mapreduce.InputSplit, taskAttemptContext: org.apache.hadoop.mapreduce.TaskAttemptContext) {
    val split = inputSplit.asInstanceOf[FileSplit];
    val conf = taskAttemptContext.getConfiguration();
    val path = split.getPath();
    val fs = path.getFileSystem(conf);

    fsin = fs.open(path);
    val start = split.getStart();
    end = split.getStart() + split.getLength();
    fsin.seek(start);

    if (start != 0) {
      readUntilMatch(endTag1, endTag2, false);
    }
  }

  @throws(classOf[IOException])
  override def nextKeyValue(): Boolean = {
    if (!stillInChunk) return false;

    val status = readUntilMatch(endTag1, endTag2, true);

    value = new Text();
    value.set(buffer.getData(), 0, buffer.getLength());
    key = new LongWritable(fsin.getPos());
    buffer.reset();

    if (!status) {
      stillInChunk = false;
    }

    return true;
  }

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getCurrentKey(): LongWritable = {
    return key;

  }


  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getCurrentValue(): Text = {
    return value;
  }

  @throws(classOf[IOException])
  @throws(classOf[InterruptedException])
  override def getProgress(): Float = {
    return 0;
  }

  @throws(classOf[IOException])
  override def close() {
    fsin.close();
  }

  @throws(classOf[IOException])
  def readUntilMatch(match1: Array[Byte], match2: Array[Byte], withinBlock: Boolean): Boolean = {
    var i = 0;
    var j = 0;
    while (true) {
      val b = fsin.read();
      if (b == -1) return false;

      if (b == match1(i)) {
        tempBuffer1.+=(b)
        i = i + 1;
        if (i >= match1.length) {
          tempBuffer1.clear()
          return fsin.getPos() < end;
        }
      } else if (b == match2(j)) {
        tempBuffer2.+=(b)
        j = j + 1;
        if (j >= match2.length) {
          tempBuffer2.clear()
          return fsin.getPos() < end;
        }
      } else {
        if (tempBuffer1.size != 0)
          tempBuffer1.foreach { x => if (withinBlock) buffer.write(x) }
        else if (tempBuffer2.size != 0)
          tempBuffer2.foreach { x => if (withinBlock) buffer.write(x) }
        tempBuffer1.clear()
        tempBuffer2.clear()
        if (withinBlock) buffer.write(b);
        i = 0;
        j = 0;
      }
    }
    return false;
  }

      



Use the following class while reading a file from the filesystem and your file will be read with two delimiters as needed. :)

val rdd = sc.newAPIHadoopFile("<filepath>", classOf[ParagraphInputFormat], classOf[LongWritable], classOf[Text], sc.hadoopConfiguration)

      

+1


source







All Articles