Apache Beam: why is the aggregate cost timestamp in the global window 9223371950454775?

We have migrated from Google Dataflow 1.9 to Apache Beam 0.6. We notice a change in behavior to timestamps after applying globalwindow. In Google Dataflow 1.9, we will get the correct timestamps in DoFn after the windowing / comb function. Now we get huge value for timestamp, for example. 9223371950454775, has the default behavior for changing the global window changed in the Apache Beam version?

input.apply(name(id, "Assign To Shard"), ParDo.of(new AssignToTest()))
      .apply(name(id, "Window"), Window
          .<KV<Long, ObjectNode >>into(new GlobalWindows())
          .triggering(Repeatedly.forever(
              AfterProcessingTime
                  .pastFirstElementInPane()
                  .plusDelayOf(Duration.standardMinutes(1))))
          .discardingFiredPanes())
      .apply(name(id, "Group By Shard"), GroupByKey.create())
      .appy(.....) }

      

+3


source to share


1 answer


TL; DR . When you are aggregating a bunch of time values, you need to choose a timestamp for the aggregation result. There are some good answers for this output timestamp. Dataflow 1.x had a minimum input timestamp by default. Based on our experience with 1.x in Beam, the default was changed to the end of the window. You can restore the previous behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST)

to your transform Window

.


I will unpack this. Let's use the @ sign to concatenate a value and its timestamp. Focusing on just one key, you have timestamped values ​​v1 @ t1, v2 @ t2, ... etc. I'll stick with your raw example GroupByKey

, although this also applies to other ways to combine values. Thus, the output following the values ​​is [v1, v2, ...] in no particular order.

Below are some possibilities for timestamping:

  • min (t1, t2, ...)
  • max (t1, t2, ...)
  • the end of the window that these items are in (ignoring input timestamps)

All of this is correct. All of them are available as parameters for yours OutputTimeFn

in Dataflow 1.x and TimestampCombiner

in Apache Beam.



Timestamps have different interpretations and are useful for different things. The time to display the aggregated value controls the downstream watermark. So the selection of the earlier timestamps contains the bottom watermark, while the later timestamps allow it to move forward.

  • min (t1, t2, ...) allows you to decompress an iterable and re-output v1 @ t1
  • max (t1, t2, ...) accurately simulates the logical time when the aggregated value was fully available. Max tends to be the most expensive, for implementation detail reasons.
  • end of window:
    • simulates the fact that this aggregation represents all data for a window
    • very easy to understand
    • allows you to push watermarks downstream as quickly as possible
    • extremely effective

For all these reasons, we switched the default from min to the end of the window.

In Beam, you can restore the previous behavior by adding .withTimestampCombiner(TimestampCombiner.EARLIEST)

to your transform Window

. In Dataflow 1.x, you can migrate to the defaults by adding .withOutputTimeFn(OutputTimeFns.outputAtEndOfWindow())

.

Another technical problem is that the custom one OutputTimeFn

is removed and replaced with an enum TimestampCombiner

, so there are only three options and not the whole API to write your own.

+2


source







All Articles