How to expire dropDuplicates state in structured streaming to avoid OOM?

I want to count unique access for each day using spark structured streaming, so I use the following code

.dropDuplicates("uuid")

and the next day the state maintained for today should be dropped so that I can get the correct unique access account the next day and avoid OOM. The original document indicates the use of dropDuplicates with a watermark, for example:

.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")

      

but the watermark column must be listed in dropDuplicates. In such a case, uuid and timestamp will be used as a combined key to deduplicate items with the same uuid and timestamp, which I did not expect.

So, is there a perfect solution?

+3


source to share


1 answer


After a few days of effort, I finally get to know myself.

By researching the source code of the watermark and dropDuplicates, I found that besides the eventTime column, the watermark also supports a window column, so we can use the following code:



.select(
    window($"timestamp", "1 day"),
    $"timestamp",
    $"uuid"
  )
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")

      

Since all events on the same day have the same window, this will produce the same results as using only the uuid for deduplication. Hope can help someone.

+4


source







All Articles