How do I create a dataframe from csv in Spark (using scala) when the first line is schema?

I am new to Spark and I am coding with scala. I want to read a file from HDFS or S3 and convert it to Spark Data frame. The first line of the Csv file is the schema. but how can I create a dataframe with schema with unknown columns? I used the following piece of code to create a dataframe for a known schema.

def loadData(path:String): DataFrame = {

  val rdd = sc.textFile(path);
  val firstLine = rdd.first();
  val schema = StructType(firstLine.split(',').map(fieldName=>StructField(fieldName,StringType,true)));

  val noHeader = rdd.mapPartitionsWithIndex( 
    (i, iterator) => 
      if (i == 0 && iterator.hasNext) { 
         iterator.next 
         iterator 
        } else iterator)


  val rowRDD = noHeader.map(_.split(",")).map(p => Row(p(0), p(1), p(2), p(3), p(4),p(5)))

  val dataFrame = sqlContext.createDataFrame(rowRDD, schema);
  return dataFrame;

      

}

+3


source to share


1 answer


You can try the following code dear hamed

val sc = new SparkContext(new SparkConf().setMaster("local").setAppName("test"))
val sqlcon = new SQLContext(sc)
//comma separated list of columnName:type

def main(args:Array[String]){
var schemaString ="Id:int,FirstName:text,LastName:text,Email:string,Country:text"
val schema =
      StructType(
        schemaString.split(",").map(fieldName => StructField(fieldName.split(":")(0),
          getFieldTypeInSchema(fieldName.split(":")(1)), true)))
val rdd=sc.textFile("/users.csv")
val noHeader = rdd.mapPartitionsWithIndex( 
(i, iterator) => 
  if (i == 0 && iterator.hasNext) { 
     iterator.next 
     iterator 
    } else iterator)
 val rowRDDx =noHeader.map(p => {
      var list: collection.mutable.Seq[Any] = collection.mutable.Seq.empty[Any]
      var index = 0
      var tokens = p.split(",")
      tokens.foreach(value => {
        var valType = schema.fields(index).dataType
        var returnVal: Any = null
        valType match {
          case IntegerType => returnVal = value.toString.toInt
          case DoubleType => returnVal = value.toString.toDouble
          case LongType => returnVal = value.toString.toLong
          case FloatType => returnVal = value.toString.toFloat
          case ByteType => returnVal = value.toString.toByte
          case StringType => returnVal = value.toString
          case TimestampType => returnVal = value.toString
        }
        list = list :+ returnVal
        index += 1
      })
      Row.fromSeq(list)
    })
val df = sqlcon.applySchema(rowRDDx, schema)
}
def getFieldTypeInSchema(ftype: String): DataType = {

    ftype match {
      case "int" => return IntegerType
      case "double" => return DoubleType
      case "long" => return LongType
      case "float" => return FloatType
      case "byte" => return ByteType
      case "string" => return StringType
      case "date" => return TimestampType
      case "timestamp" => return StringType
      case "uuid" => return StringType
      case "decimal" => return DoubleType
      case "boolean" => BooleanType
      case "counter" => IntegerType
      case "bigint" => IntegerType
      case "text" => return StringType
      case "ascii" => return StringType
      case "varchar" => return StringType
      case "varint" => return IntegerType
      case default => return StringType
    }
  }

      



Hope this helps you. :)

+4


source







All Articles