How to determine the starting position of a dataset in Apache Flink?

I am trying to implement some kind of window function in Apache Flink. For example, I want to take items 1 - 5 and do something with them, after which I want to take items 6 - 10, etc.

I currently have a dataset whose data is received by a CSV file:

DataSet<Tuple2<Double, Double>> csvInput = env
        .readCsvFile(csvpath)
        .includeFields(usedFields)
        .types(Double.class, Double.class);

      

Now I want to have a subset with the first 5 elements of this dataset. I could do it with a function first

:

DataSet<Tuple2<Double, Double>> subset1 = csvInput.first(5);

      

But how do you get the next 5 items? Is there a function like startAt

that that I can use? For example, something like this:

DataSet<Tuple2<Double, Double>> subset2 = csvInput.first(5).startAt(6);

      

I didn't find anything in the Apache Flink Java API. What's the best way to archive this?

+3


source to share


1 answer


Matthias Sachs has given good pointers to the streaming API for windows. If your application follows a streaming analytics model, the streaming API is definitely the right way to go.

Here are some more resources for streaming window: https://ci.apache.org/projects/flink/flink-docs-master/apis/streaming_guide.html#window-operators

Windows in Batch API

You can also manually apply some form of the window in the Batch API. Consider the following when using windows:

  • Most of the operations are parallel. When traversing n elements together, this usually happens independently for each parallel partition.

  • There is no implicit order of elements. Even when reading from a file in parallel, it may be that later sections of the file are read by a faster parallel read stream, and writes from these later segments arrive earlier. The n items window in order of arrival thus gives you just a few items.

Window on demand in file (not parallel)



In a custom window in a file, you can set a non-parallel input (use setParallelism(1)

in source) and then use mapPartition()

to move the window item by item.

Ordered window by some value (e.g. timestamp)

You can expand the group (no key) by sorting the section ( sortPartition().mapPartition()

) or the window above the groups using groupBy(...).sortGroup(...).reduceGroup(...)

. The functions tidy things up with respect to the value you want to include in the window and move the data into the window.

Some concurrent windows (no good semantics)

You can always read in parallel and move the window through the data stream with mapPartition()

. However, as described above, parallel execution and undefined element ordering give you some windowed result, not predictable windowed result.

+3


source







All Articles