Spark Streaming Scala combines json of different structures to form a DataFrame
I am trying to process Json strings from Kinesis. Json strings can come in a couple of different forms. From Kinesis, I create a DStream:
val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>{
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0){
// Process jsons here
// Json strings here would have either one of the formats below
}
})
The RDD lines will have either one of these json lines. Collection:
[
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
},
"host": "host1"
},
{
"data": {
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
},
"host": "host1"
}
]
and some Json strings are one object:
{
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
}
I want to be able to retrieve an object from a data key if it is the first Json string type and concatenate with the second Json type and form an RDD / DataFrame, how can I achieve this?
Ultimately I would like my dataframe to be something like this:
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+
Sorry new to Scala and Spark. I've looked at existing examples but haven't found a solution yet.
Thank you very much in advance.
source to share
This example uses json4s
:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val format = DefaultFormats
case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )
val string1 = """
[ {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30024,
"TargetId" : "4138",
"Timestamp" : 0
},
"host" : "host1"
}, {
"data" : {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4139",
"Timestamp" : 0
},
"host" : "host1"
} ]
"""
val string2 = """
[ {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4140",
"Timestamp" : 0
}, {
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4141",
"Timestamp" : 0
} ]
"""
val json1 = (parse(string1) \ "data").extract[List[jsonschema]]
val json2 = parse(string2).extract[List[jsonschema]]
val jsonRDD = json1.union(json2)
val df = sqlContext.createDataFrame(jsonRDD)
df.show
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30025| 4140| 0|
| 1.0.3 (65)| 30025| 4141| 0|
+------------------+---------+--------+---------+
source to share
You can use concatenation after selecting columns data.*
from the first one Dataframe
:
val spark = SparkSession.builder().master("local[*]").getOrCreate() val sc = spark.sparkContext // Assuming you store your jsons in two separate strings `json1` and `json2` val df1 = spark.read.json(sc.parallelize(Seq(json1))) val df2 = spark.read.json(sc.parallelize(Seq(json2))) import spark.implicits._ df1.select($"data.*") // Select only the data columns from first Dataframe .union(df2) // Union the two Dataframes as they have the same structure .show()
EDIT [Links to additional solutions]
After editing your question, I understand that when parsing a JSON file, you need some kind of fallback mechanism. There are more ways to do this using any JSON parsing library - there's a nice solution here with Play, which I think already explains how you can solve this problem in an elegant way.
Once you have RDD[Data]
where the data is your "options", you can simply convert it to Dataframe
with rdd.toDF()
.
Hope it helps.
source to share