Thinking era of milliseconds from json with Spark 2

Has anyone parsed millisecond timestamp using from_json

in Spark 2+? How it's done?

So Spark changed TimestampType

to parse numerical epoch values ​​as seconds, not milli in v2.

My input is a catch table that has a json formatted string in a column, which I am trying to accomplish this way:

val spark = SparkSession
  .builder
  .appName("Problematic Timestamps")
  .enableHiveSupport()
  .getOrCreate()
import spark.implicits._
val schema = StructType(
  StructField("categoryId", LongType) ::
  StructField("cleared", BooleanType) ::
  StructField("dataVersion", LongType) ::
  StructField("details", DataTypes.createArrayType(StringType)) ::
  StructField("timestamp", TimestampType) ::
  StructField("version", StringType) :: Nil
)
val item_parsed =
    spark.sql("select * FROM source.jsonStrInOrc")
    .select('itemid, 'locale,
            from_json('internalitem, schema)
                as 'internalitem,
            'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
    .select('itemid, 'locale,
            $"internalitem.*",
            'version as'outer_version, 'createdat, 'modifiedat)

      

This can parse a row with a column containing:

{"timestamp": 1494790299549, "cleared": false, "version": "V1", "dataVersion": 2, "categoryId": 2641, "details": [], ...}

And that gives me fields timestamp

like 49338-01-08 00:39:09.0

, from the value 1494790299549

, which I prefer to read as:2017-05-14 19:31:39.549

Now I could set the schema for the timestamp to be long and then divide the value by 1000 and overlay the timestamp, but then I would have 2017-05-14 19:31:39.000

not 2017-05-14 19:31:39.549

. I was having trouble figuring out how I could:

  • Say from_json

    to parse the millisecond timestamp (perhaps by substituting TimestampType in some way for use in the schema)
  • Use LongType

    in a schematic and add it to a timestamp that stores milliseconds .
+3


source to share


2 answers


Now I can set the timestamp to be long for the schema and then divide the value by 1000

Actually, this is exactly what you want, just save the types correctly. Let's say you only have a field Long

timestamp

:

val df = spark.range(0, 1).select(lit(1494790299549L).alias("timestamp"))
// df: org.apache.spark.sql.DataFrame = [timestamp: bigint]

      

If you divide by 1000:



val inSeconds = df.withColumn("timestamp_seconds", $"timestamp" / 1000)
// org.apache.spark.sql.DataFrame = [timestamp: bigint, timestamp_seconds: double]

      

you will get the timestamp in seconds as double (note that this is SQL behavior, not Scala).

All that's left is cast

:

inSeconds.select($"timestamp_seconds".cast("timestamp")).show(false)
// +-----------------------+
// |timestamp_seconds      |
// +-----------------------+
// |2017-05-14 21:31:39.549|
// +-----------------------+

      

+2


source


I found that trying to do the split in select and then casting didn't look clean to me, even though it is a perfectly correct method. I chose the UDF I used java.sql.timestamp

, which is actually specified in epoch milliseconds.

import java.sql.Timestamp

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.{explode, from_json, udf}
import org.apache.spark.sql.types.
{BooleanType, DataTypes, IntegerType, LongType,
StringType, StructField, StructType, TimestampType}

val tsmillis = udf { t: Long => new Timestamp (t) }

val spark = SparkSession
  .builder
  .appName("Problematic Timestamps")
  .enableHiveSupport()
  .getOrCreate()
import spark.implicits._
val schema = StructType(
  StructField("categoryId", LongType) ::
  StructField("cleared", BooleanType) ::
  StructField("dataVersion", LongType) ::
  StructField("details", DataTypes.createArrayType(StringType)) ::
  StructField("timestamp", LongType) ::
  StructField("version", StringType) :: Nil
)
val item_parsed =
    spark.sql("select * FROM source.jsonStrInOrc")
    .select('itemid, 'locale,
            from_json('internalitem, schema)
                as 'internalitem,
            'version, 'createdat, 'modifiedat)
val item_flattened = item_parsed
    .select('itemid, 'locale,
            $"internalitem.categoryId", $"internalitem.cleared",
            $"internalitem.dataVersion", $"internalitem.details",
            tsmillis($"internalitem.timestamp"),
            $"internalitem.version",
            'version as'outer_version, 'createdat, 'modifiedat)

      



See how this is done when choosing. I think it would be wise to do a performance test to see if splitting withcolumn

and casting is faster than udf

.

-1


source







All Articles