How to expire dropDuplicates state in structured streaming to avoid OOM?
I want to count unique access for each day using spark structured streaming, so I use the following code
.dropDuplicates("uuid")
and the next day the state maintained for today should be dropped so that I can get the correct unique access account the next day and avoid OOM. The original document indicates the use of dropDuplicates with a watermark, for example:
.withWatermark("timestamp", "1 day")
.dropDuplicates("uuid", "timestamp")
but the watermark column must be listed in dropDuplicates. In such a case, uuid and timestamp will be used as a combined key to deduplicate items with the same uuid and timestamp, which I did not expect.
So, is there a perfect solution?
source to share
After a few days of effort, I finally get to know myself.
By researching the source code of the watermark and dropDuplicates, I found that besides the eventTime column, the watermark also supports a window column, so we can use the following code:
.select(
window($"timestamp", "1 day"),
$"timestamp",
$"uuid"
)
.withWatermark("window", "1 day")
.dropDuplicates("uuid", "window")
Since all events on the same day have the same window, this will produce the same results as using only the uuid for deduplication. Hope can help someone.
source to share