Is it possible to unit test Spark UDAF?
Spark UDAFs require you to implement several methods, notably
def update(buffer: MutableAggregationBuffer, input: Row): Unit
and
def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit
Suppose I have a UDAF X, 4 rows (r0, r1, r2, r3)
and two aggregation buffers A, B
in my test. I want to see that this code produces the expected output:
X.update(A, r0)
X.update(A, r1)
X.update(B, r2)
X.update(B, r3)
X.merge(A, B)
X.evaluate(A)
Same as calling X.update for each of the 4 lines with one buffer:
X.update(A, r0)
X.update(A, r1)
X.update(A, r2)
X.update(A, r3)
X.evaluate(A)
This verifies the correctness of both methods. However, I don't know how to write a test like this: it doesn't seem like custom code can create any implementation MutableAggregationBuffer
.
If I just make a DF of 4 lines and try to use it groupBy().agg(...)
to call my UDAF, Spark won't even try to concatenate them that way - since it's a small number of lines, it doesn't need to.
source to share
MutableAggregationBuffer
is just an abstract class. You can easily create your own implementation, for example like this:
import org.apache.spark.sql.expressions._
class DummyBuffer(init: Array[Any]) extends MutableAggregationBuffer {
val values: Array[Any] = init
def update(i: Int, value: Any) = values(i) = value
def get(i: Int): Any = values(i)
def length: Int = init.size
def copy() = new DummyBuffer(values)
}
It will not replace the "real thing", but should be sufficient for simple test scenarios.
source to share