Spark-SQL: how to read TSV or CSV file into dataframe and apply custom schema?
I am using Spark 2.0 while working with Separated Values (TSV) and Comma Separated Values (CSV). I want to load data into Spark-SQL frames where I would like to have full control over the schema when reading files. I don't want Spark to guess the schema from the data in the file.
How do I load TSV or CSV files into Spark SQL Dataframes and apply schema to them?
source to share
Below is a complete Spark 2.0 example of loading a tab-delimited (TSV) file and applying a schema.
I am using the Iris dataset in TSV format from UAH.edu as an example . Here are the first few lines from this file:
Type PW PL SW SL
0 2 14 33 50
1 24 56 31 67
1 23 51 31 69
0 2 10 36 46
1 20 52 30 65
To enforce a schema, you can programmatically build it using one of two methods:
and. Create a schema with StructType
:
import org.apache.spark.sql.types._
var irisSchema = StructType(Array(
StructField("Type", IntegerType, true),
StructField("PetalWidth", IntegerType, true),
StructField("PetalLength", IntegerType, true),
StructField("SepalWidth", IntegerType, true),
StructField("SepalLength", IntegerType, true)
))
B. Alternatively, create a schema with case class
and Encoders
(this approach is less verbose):
import org.apache.spark.sql.Encoders
case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
SepalWidth: Int, SepalLength: Int)
var irisSchema = Encoders.product[IrisSchema].schema
After creating your schema, you can use spark.read
to read into TSV file. Note that you can indeed read comma delimited (CSV) files as well as any delimited files if you set the parameter correctly option("delimiter", d)
. Also, if you have a data file that has a header line, be sure to install option("header", "true")
.
Below is the complete final code:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Encoders
val spark = SparkSession.builder().getOrCreate()
case class IrisSchema(Type: Int, PetalWidth: Int, PetalLength: Int,
SepalWidth: Int, SepalLength: Int)
var irisSchema = Encoders.product[IrisSchema].schema
var irisDf = spark.read.format("csv"). // Use "csv" regardless of TSV or CSV.
option("header", "true"). // Does the file have a header line?
option("delimiter", "\t"). // Set delimiter to tab or comma.
schema(irisSchema). // Schema that was built above.
load("iris.tsv")
irisDf.show(5)
And here's the output:
scala> irisDf.show(5)
+----+----------+-----------+----------+-----------+
|Type|PetalWidth|PetalLength|SepalWidth|SepalLength|
+----+----------+-----------+----------+-----------+
| 0| 2| 14| 33| 50|
| 1| 24| 56| 31| 67|
| 1| 23| 51| 31| 69|
| 0| 2| 10| 36| 46|
| 1| 20| 52| 30| 65|
+----+----------+-----------+----------+-----------+
only showing top 5 rows
source to share