Join output in Apache Flink

In Apache Flink, if I join two datasets on the same primary key, I get a tuple of 2 containing the corresponding dataset, outputting each of the datasets.

The problem is that when the method is applied map()

to the dataset of output set 2, it doesn't look very good, especially if the records of both datasets have a lot of functionality.

Using tuples in both sets of inputs calls me code like this:

var in1: DataSet[(Int, Int, Int, Int, Int)] = /* */
var in2: DataSet[(Int, Int, Int, Int)] = /* */

val out = in1.join(in2).where(0, 1, 2).equalTo(0, 1, 2)
  .map(join => (join._1._1, join._1._2, join._1._3,
                    join._1._4, join._1._5, join._2._4))

      

I wouldn't mind using POJOs or case classes, but I don't see how it gets better.

Question 1: Is there a good way to flatten this set of 2? For example. using a different operator.

Question 2: How to handle joining three datasets on the same key? This would make the source of the example even messier.

Thanks for the help.

+3


source to share


1 answer


you can directly apply the join function to each pair of connected items, for example

val leftData: DataSet[(String, Int, Int)] = ...
val rightData: DataSet[(String, Int)] = ...
val joined: DataSet[(String, Int, Int)] = leftData
      .join(rightData).where(0).equalTo(0) { (l, r) => (l._1, l._2, l._3 + r._2) ) }

      



To answer your second question, Flink only handles binary joins. However, Flink's optimizer can avoid unnecessary shuffling if you give us a hint about the behavior of your function. Forward Field annotations tell the optimizer that certain fields (such as the join key) have not been modified by your join function and allow you to reuse existing splits and sorts.

+6


source







All Articles