Is it possible to unit test Spark UDAF?

Spark UDAFs require you to implement several methods, notably def update(buffer: MutableAggregationBuffer, input: Row): Unit

and def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit

Suppose I have a UDAF X, 4 rows (r0, r1, r2, r3)

and two aggregation buffers A, B

in my test. I want to see that this code produces the expected output:

X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)

      

Same as calling X.update for each of the 4 lines with one buffer:

X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)

      

This verifies the correctness of both methods. However, I don't know how to write a test like this: it doesn't seem like custom code can create any implementation MutableAggregationBuffer

.

If I just make a DF of 4 lines and try to use it groupBy().agg(...)

to call my UDAF, Spark won't even try to concatenate them that way - since it's a small number of lines, it doesn't need to.

+3


source to share


1 answer


MutableAggregationBuffer

is just an abstract class. You can easily create your own implementation, for example like this:

import org.apache.spark.sql.expressions._

class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
  val values: Array[Any] = init
  def update(i: Int, value: Any) = values(i) = value
  def get(i: Int): Any = values(i)
  def length: Int = init.size
  def copy() = new DummyBuffer(values)
}

      



It will not replace the "real thing", but should be sufficient for simple test scenarios.

+4


source







All Articles