Spark Streaming Scala combines json of different structures to form a DataFrame

I am trying to process Json strings from Kinesis. Json strings can come in a couple of different forms. From Kinesis, I create a DStream:

val kinesisStream = KinesisUtils.createStream(
 ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
 "region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

 val lines = kinesisStream.map(x => new String(x))

 lines.foreachRDD((rdd, time) =>{

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
   import sqlContext.implicits.StringToColumn

   if(rdd.count() > 0){
    // Process jsons here
    // Json strings here would have either one of the formats below
   }
 })

      

The RDD lines will have either one of these json lines. Collection:

[
  {
    "data": {
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30024,
      "TargetId": "4138",
      "Timestamp": 0
    },
    "host": "host1"
  },
  {
    "data": {
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30025,
      "TargetId": "4139",
      "Timestamp": 0
    },
    "host": "host1"
  }
]

      

and some Json strings are one object:

{
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30026,
      "TargetId": "4140",
      "Timestamp": 0
}

      

I want to be able to retrieve an object from a data key if it is the first Json string type and concatenate with the second Json type and form an RDD / DataFrame, how can I achieve this?

Ultimately I would like my dataframe to be something like this:

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30026|    4140|        0|
+------------------+---------+--------+---------+

      

Sorry new to Scala and Spark. I've looked at existing examples but haven't found a solution yet.

Thank you very much in advance.

+3


source to share


2 answers


This example uses json4s

:



import org.json4s._
import org.json4s.jackson.JsonMethods._

implicit val format = DefaultFormats

case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )

val string1 = """
[ {
  "data" : {
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30024,
    "TargetId" : "4138",
    "Timestamp" : 0
  },
  "host" : "host1"
}, {
  "data" : {
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30025,
    "TargetId" : "4139",
    "Timestamp" : 0
  },
  "host" : "host1"
} ]

"""

val string2 = """
[ {
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4140",
  "Timestamp" : 0
}, {
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4141",
  "Timestamp" : 0
} ]
"""

val json1 = (parse(string1) \ "data").extract[List[jsonschema]]

val json2 = parse(string2).extract[List[jsonschema]]

val jsonRDD = json1.union(json2)

val df = sqlContext.createDataFrame(jsonRDD)

df.show


+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30025|    4140|        0|
|        1.0.3 (65)|    30025|    4141|        0|
+------------------+---------+--------+---------+

      

0


source


You can use concatenation after selecting columns data.*

from the first one Dataframe

:

val spark = SparkSession.builder().master("local[*]").getOrCreate()    
val sc = spark.sparkContext

// Assuming you store your jsons in two separate strings `json1` and `json2`
val df1 = spark.read.json(sc.parallelize(Seq(json1)))
val df2 = spark.read.json(sc.parallelize(Seq(json2)))

import spark.implicits._
df1.select($"data.*") // Select only the data columns from first Dataframe
  .union(df2)         // Union the two Dataframes as they have the same structure
  .show()

      

EDIT [Links to additional solutions]



After editing your question, I understand that when parsing a JSON file, you need some kind of fallback mechanism. There are more ways to do this using any JSON parsing library - there's a nice solution here with Play, which I think already explains how you can solve this problem in an elegant way.

Once you have RDD[Data]

where the data is your "options", you can simply convert it to Dataframe

with rdd.toDF()

.

Hope it helps.

0


source







All Articles