Checking for RDD Equality

I am doing some tests in JUnit and I need to check for equality of two Spark RDDs.

As I thought it is like this:

JavaRDD<SomeClass> expResult = ...;
JavaRDD<SomeClass> result = ...;

assertEquals(expResult.collect(), result.collect());

      

Is there a better way than this?

+3


source to share


2 answers


If the expected output is small enough, your best collect

bet is the RDD data and compare it locally (exactly as you wrote).

When it is necessary to use large enough datasets in tests, there are several other possibilities:

Disclaimer . I'm not familiar enough with the Spark Java API, so I'll write another example code in Scala. I hope this won't be a problem as it can be rewritten in Java or converted into a couple of utility functions called by Java code.

Method 1: Zip RDD together and compare item by item

This method can only be used if the order of the elements in the RDD is defined correctly (that is, the RDD is sorted).

val diff = expResult
  .zip(result)
  .collect { case (a, b) if a != b => a -> b }
  .take(100)

      

The array diff

will contain up to 100 differentiating pairs. If the RDDs are large enough and you want to get all elements from diff

locally, you can use the toLocalIterator

. Better not to use the method collect

, as you can run OOM.

This method is probably the fastest as it does not require shuffling, but it can only be used if the order of the sections in the RDD and the order of the elements in the sections are correct.



Method 2: Grouped RDDs

This method can be used to check if an result

RDD contains specified (possibly not unique) values ​​in no particular order

  val diff = expResult.map(_ -> 1)
    .cogroup(result.map(_ -> 1))
    .collect { case (a, (i1, i2)) if i1.sum != i2.sum => a -> (i1.sum - i2.sum) }
    .take(100)

      

The array diff

will contain the differentiating values ​​along with the difference between the amounts.

For example:

  • If expResult

    contains one instance of some value, but result

    does not contain this value, the number will be +1

    ;
  • If it result

    contains 3 instances of another value, but expResult

    only 1, the number will be -2

    .

This method will be faster than the other parameters (i.e., subtracting the RDDs from each other) since it only requires one random case.

+5


source


In my case, using Spark in Java, just using the collect method didn't work. AssertEquals was always false.

So this test:

assertEquals( 
  this.fooExpectedRddTest.collect(),
  this.fooServiceTest.getRdd().collect()
);

      

Generate this result:



FooServiceTest.getRdd:143 expected: 

scala.collection.convert.Wrappers$SeqWrapper<[
    {"a":1,"b":2,"c":1496405614}, 
    {"a":1,"b":3,"c":1496243614}, 
    {"a":2,"b":2,"c":1496416414}
]> 

but was: 

scala.collection.convert.Wrappers$SeqWrapper<[
    {"a":1,"b":2,"c":1496405614}, 
    {"a":1,"b":3,"c":1496243614}, 
    {"a":2,"b":2,"c":1496416414}
]>

      

My way of handling this was to convert them to something easier to compare.

List<Tuple3<Integer,Integer,Double>> expectedList = this.fooExpectedRddTest.map(
        (Foo foo) -> {
            return new Tuple3<Integer,Integer,Double>(
                    foo.getA(),
                    foo.getB(),
                    foo.getC()
            );
        }
).collect();

List<Tuple3<Integer,Integer,Double>> receivedList = this.fooServiceTest.getRdd().map(
        (Foo foo) -> {
            return new Tuple3<Integer,Integer,Double>(
                    foo.getA(),
                    foo.getB(),
                    foo.getC()
            );
        }
).collect();

assertEquals(
    expectedList,
    receivedList
);

      

0


source







All Articles