Spark-SQL: how to read TSV or CSV file into dataframe and apply custom schema?

I am using Spark 2.0 while working with Separated Values ​​(TSV) and Comma Separated Values ​​(CSV). I want to load data into Spark-SQL frames where I would like to have full control over the schema when reading files. I don't want Spark to guess the schema from the data in the file.

How do I load TSV or CSV files into Spark SQL Dataframes and apply schema to them?

+3


source to share


1 answer


Below is a complete Spark 2.0 example of loading a tab-delimited (TSV) file and applying a schema.

I am using the Iris dataset in TSV format from UAH.edu as an example . Here are the first few lines from this file:

Type    PW      PL      SW      SL
0       2       14      33      50
1       24      56      31      67
1       23      51      31      69
0       2       10      36      46
1       20      52      30      65

      

To enforce a schema, you can programmatically build it using one of two methods:

and. Create a schema with StructType

:

import org.apache.spark.sql.types._

var irisSchema = StructType(Array(
    StructField("Type",         IntegerType, true),
    StructField("PetalWidth",   IntegerType, true),
    StructField("PetalLength",  IntegerType, true),
    StructField("SepalWidth",   IntegerType, true),
    StructField("SepalLength",  IntegerType, true)
    ))

      

B. Alternatively, create a schema with case class

and Encoders

(this approach is less verbose):



import org.apache.spark.sql.Encoders

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int, 
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

      

After creating your schema, you can use spark.read

to read into TSV file. Note that you can indeed read comma delimited (CSV) files as well as any delimited files if you set the parameter correctly option("delimiter", d)

. Also, if you have a data file that has a header line, be sure to install option("header", "true")

.

Below is the complete final code:

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders

val spark = SparkSession.builder().getOrCreate()

case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
                      SepalWidth: Int, SepalLength: Int)

var irisSchema = Encoders.product[IrisSchema].schema

var irisDf = spark.read.format("csv").     // Use "csv" regardless of TSV or CSV.
                option("header", "true").  // Does the file have a header line?
                option("delimiter", "\t"). // Set delimiter to tab or comma.
                schema(irisSchema).        // Schema that was built above.
                load("iris.tsv")

irisDf.show(5)

      

And here's the output:

scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
|   0|         2|         14|        33|         50|
|   1|        24|         56|        31|         67|
|   1|        23|         51|        31|         69|
|   0|         2|         10|        36|         46|
|   1|        20|         52|        30|         65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows

      

+8


source







All Articles