Combining micropackages in Spark Streaming

(I have little information about sparking, but not with spark flow)

Problem

I have kafka topics Kafka[(A,B)->X]

where (A,B)

is the key (A and B are simple numeric types) and X is the post type, relatively large (Mb pair). Putting aside the problem of denial of input, the data is a grid: for each a

in there a

will be messages (A,B)

for all b

in b

. Moreover, b is ordered, and I think we can assume that all messages for one a

will come after the b-order (I know the topic is filled in that order).

Then I need to process the messages like this:

  • The function (pair) is applied to each message (a,b)->x

    , outputting(a,b)->y

  • the function should be applied to messages aB->Seq[y]

    whereaB = {(a,b) for all b in B}

(and later there is a gap where messages have to be "transposed" for processing across all but this is not a question)

Question

How can I achieve this kind of message merge from step 1 to step 2?

It looks like a a

turnkey group , but as far as I know the groupby method will be applied for each micropackage. I need everyone to a

expect everyone to b

be received (assume a simple counting system would work). Once again, postpone missing b's and input errors.

Some idea

Without knowledge, I would try to see if such a merge could be achieved by adding hdfs to the file, one for each a. And try to run a second thread on these files when they are full. That is, when it contains all the b's, move the file to the input directory for step 2. But:

  • I don't know if such an addition can be implemented in hdfs
  • The two sparkStreamingContext must run in parallel, one for each step. And that looks like a problem (?).
  • I figured that passing hdfs would destroy the "exactly once" property of the spark (streaming)
+2


source to share


1 answer


You can create a master RDD and merge the micro RDDs generated by the thread into master with RDD.union

. Something like:

var masterRDD: RDD[(Long,Long), String] = sc.emptyRDD  // guessing on RDD type

myStream.foreachRDD(rdd => {
  if (! rdd.isEmpty) {
    masterRDD.union(rdd)

    masterRDD.groupBy(...).....
  }
})

      



You should take some time and read on the breakpoint, which is:

Data checkpoint . Saving the generated RDDs to reliable storage. This is needed in some state transformations that combine data across multiple batches. In such transformations, the generated RDDs depend on the RDDs of previous batches, which leads to the length of the dependency chain increasing over time. To avoid such unlimited recovery time increases (proportional to the chain of dependencies), intermediate state transformations RDDs are periodically checked for reliable storage (eg HDFS) to disable network dependency.

+1


source







All Articles