Checking for RDD Equality
If the expected output is small enough, your best collect
bet is the RDD data and compare it locally (exactly as you wrote).
When it is necessary to use large enough datasets in tests, there are several other possibilities:
Disclaimer . I'm not familiar enough with the Spark Java API, so I'll write another example code in Scala. I hope this won't be a problem as it can be rewritten in Java or converted into a couple of utility functions called by Java code.
Method 1: Zip RDD together and compare item by item
This method can only be used if the order of the elements in the RDD is defined correctly (that is, the RDD is sorted).
val diff = expResult
.zip(result)
.collect { case (a, b) if a != b => a -> b }
.take(100)
The array diff
will contain up to 100 differentiating pairs. If the RDDs are large enough and you want to get all elements from diff
locally, you can use the toLocalIterator
. Better not to use the method collect
, as you can run OOM.
This method is probably the fastest as it does not require shuffling, but it can only be used if the order of the sections in the RDD and the order of the elements in the sections are correct.
Method 2: Grouped RDDs
This method can be used to check if an result
RDD contains specified (possibly not unique) values ββin no particular order
val diff = expResult.map(_ -> 1)
.cogroup(result.map(_ -> 1))
.collect { case (a, (i1, i2)) if i1.sum != i2.sum => a -> (i1.sum - i2.sum) }
.take(100)
The array diff
will contain the differentiating values ββalong with the difference between the amounts.
For example:
- If
expResult
contains one instance of some value, butresult
does not contain this value, the number will be+1
; - If it
result
contains 3 instances of another value, butexpResult
only 1, the number will be-2
.
This method will be faster than the other parameters (i.e., subtracting the RDDs from each other) since it only requires one random case.
source to share
In my case, using Spark in Java, just using the collect method didn't work. AssertEquals was always false.
So this test:
assertEquals(
this.fooExpectedRddTest.collect(),
this.fooServiceTest.getRdd().collect()
);
Generate this result:
FooServiceTest.getRdd:143 expected:
scala.collection.convert.Wrappers$SeqWrapper<[
{"a":1,"b":2,"c":1496405614},
{"a":1,"b":3,"c":1496243614},
{"a":2,"b":2,"c":1496416414}
]>
but was:
scala.collection.convert.Wrappers$SeqWrapper<[
{"a":1,"b":2,"c":1496405614},
{"a":1,"b":3,"c":1496243614},
{"a":2,"b":2,"c":1496416414}
]>
My way of handling this was to convert them to something easier to compare.
List<Tuple3<Integer,Integer,Double>> expectedList = this.fooExpectedRddTest.map(
(Foo foo) -> {
return new Tuple3<Integer,Integer,Double>(
foo.getA(),
foo.getB(),
foo.getC()
);
}
).collect();
List<Tuple3<Integer,Integer,Double>> receivedList = this.fooServiceTest.getRdd().map(
(Foo foo) -> {
return new Tuple3<Integer,Integer,Double>(
foo.getA(),
foo.getB(),
foo.getC()
);
}
).collect();
assertEquals(
expectedList,
receivedList
);
source to share