Thinking era of milliseconds from json with Spark 2
Has anyone parsed millisecond timestamp using from_json
in Spark 2+? How it's done?
So Spark changed TimestampType
to parse numerical epoch values as seconds, not milli in v2.
My input is a catch table that has a json formatted string in a column, which I am trying to accomplish this way:
val spark = SparkSession
.builder
.appName("Problematic Timestamps")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("categoryId", LongType) ::
StructField("cleared", BooleanType) ::
StructField("dataVersion", LongType) ::
StructField("details", DataTypes.createArrayType(StringType)) ::
…
StructField("timestamp", TimestampType) ::
StructField("version", StringType) :: Nil
)
val item_parsed =
spark.sql("select * FROM source.jsonStrInOrc")
.select('itemid, 'locale,
from_json('internalitem, schema)
as 'internalitem,
'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
.select('itemid, 'locale,
$"internalitem.*",
'version as'outer_version, 'createdat, 'modifiedat)
This can parse a row with a column containing:
{"timestamp": 1494790299549, "cleared": false, "version": "V1", "dataVersion": 2, "categoryId": 2641, "details": [], ...}
And that gives me fields timestamp
like 49338-01-08 00:39:09.0
, from the value 1494790299549
, which I prefer to read as:2017-05-14 19:31:39.549
Now I could set the schema for the timestamp to be long and then divide the value by 1000 and overlay the timestamp, but then I would have 2017-05-14 19:31:39.000
not 2017-05-14 19:31:39.549
. I was having trouble figuring out how I could:
- Say
from_json
to parse the millisecond timestamp (perhaps by substituting TimestampType in some way for use in the schema) - Use
LongType
in a schematic and add it to a timestamp that stores milliseconds .
source to share
Now I can set the timestamp to be long for the schema and then divide the value by 1000
Actually, this is exactly what you want, just save the types correctly. Let's say you only have a field Long
timestamp
:
val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp")) // df: org.apache.spark.sql.DataFrame = [timestamp: bigint]
If you divide by 1000:
val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000) // org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]
you will get the timestamp in seconds as double (note that this is SQL behavior, not Scala).
All that's left is cast
:
inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false) // +-----------------------+ // |timestamp_seconds | // +-----------------------+ // |2017-05-14 21:31:39.549| // +-----------------------+
source to share
I found that trying to do the split in select and then casting didn't look clean to me, even though it is a perfectly correct method. I chose the UDF I used java.sql.timestamp
, which is actually specified in epoch milliseconds.
import java.sql.Timestamp
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, from_json, udf}
import org.apache.spark.sql.types.
{BooleanType, DataTypes, IntegerType, LongType,
StringType, StructField, StructType, TimestampType}
val tsmillis = udf { t: Long => new Timestamp (t) }
val spark = SparkSession
.builder
.appName("Problematic Timestamps")
.enableHiveSupport()
.getOrCreate()
import spark.implicits._
val schema = StructType(
StructField("categoryId", LongType) ::
StructField("cleared", BooleanType) ::
StructField("dataVersion", LongType) ::
StructField("details", DataTypes.createArrayType(StringType)) ::
…
StructField("timestamp", LongType) ::
StructField("version", StringType) :: Nil
)
val item_parsed =
spark.sql("select * FROM source.jsonStrInOrc")
.select('itemid, 'locale,
from_json('internalitem, schema)
as 'internalitem,
'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
.select('itemid, 'locale,
$"internalitem.categoryId", $"internalitem.cleared",
$"internalitem.dataVersion", $"internalitem.details",
tsmillis($"internalitem.timestamp"),
$"internalitem.version",
'version as'outer_version, 'createdat, 'modifiedat)
See how this is done when choosing. I think it would be wise to do a performance test to see if splitting withcolumn
and casting is faster than udf
.
source to share