Is recursive computation possible with Apache Spark?

I am developing a chess engine using Scala and Apache Spark (and I need to emphasize that my sanity is not the subject of this question). My problem is that the Negamax algorithm is recursive in nature and when I try the naive approach:

class NegaMaxSparc(@transient val sc: SparkContext) extends Serializable  {
  val movesOrdering = new Ordering[Tuple2[Move, Double]]() {
    override def compare(x: (Move, Double), y: (Move, Double)): Int =
      Ordering[Double].compare(x._2, y._2)
  }

  def negaMaxSparkHelper(game: Game, color: PieceColor, depth: Int, previousMovesPar: RDD[Move]): (Move, Double) = {
    val board = game.board

    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val moves = board.possibleMovesForColor(color)
      val movesPar = previousMovesPar.context.parallelize(moves)

      val moveMappingFunc = (m: Move) => { negaMaxSparkHelper(new Game(board.boardByMakingMove(m), color.oppositeColor, null), color.oppositeColor, depth - 1, movesPar) }
      val movesWithScorePar = movesPar.map(moveMappingFunc)
      val move = movesWithScorePar.min()(movesOrdering)

      (move._1, -move._2)
    }
  }

  def negaMaxSpark(game: Game, color: PieceColor, depth: Int): (Move, Double) = {
    if (depth == 0) {
      (null, NegaMax.evaluateDefault(game, color))
    } else {
      val movesPar = sc.parallelize(new Array[Move](0))

      negaMaxSparkHelper(game, color, depth, movesPar)
    }
  }
}

class NegaMaxSparkBot(val maxDepth: Int, sc: SparkContext) extends Bot {
  def nextMove(game: Game): Move = {
    val nms = new NegaMaxSparc(sc)
    nms.negaMaxSpark(game, game.colorToMove, maxDepth)._1
  }
}

      

I get:

org.apache.spark.SparkException: RDD transformations and actions can only be invoked by the driver, not inside of other transformations; for example, rdd1.map(x => rdd2.values.count() * x) is invalid because the values transformation and count action cannot be performed inside of the rdd1.map transformation. For more information, see SPARK-5063.

      

Question: can this algorithm be implemented recursively using Spark? If not, what is the correct Spark way to solve this problem?

+3


source to share


2 answers


This is a limitation that makes sense from an implementation standpoint, but can be painful to work with.

Can you try to bring recursion to the top level, just in the "driver" code that builds and works with the RDD? Something like:



def step(rdd: Rdd[Move], limit: Int) =
  if(0 == limit) rdd
  else {
    val newRdd = rdd.flatMap(...)
    step(newRdd, limit - 1)
  }

      

Alternatively, it is always possible to convert recursion to iteration by manipulating the "stack" explicitly by hand (although this can lead to more cumbersome code).

+2


source


Only the driver can run calculations on the RDD. The reason is that even if the RDD "feels" like regular collections of data, behind the scenes they are still distributed by collections, so launching on them requires coordinated execution of tasks on all remote slaves, which sparks hide from us most of the time.

Thus, the return from the slaves, i.e. launching new distributed tasks dynamically directly from slaves is impossible: only the disk can take care of this coordination.

Here is a possible alternative to simplify your problem (if I understood correctly). The idea is to sequentially create instances Moves

, each one representing a complete sequence Move

from the original state.

Each instance Moves

is capable of transforming itself into a set Moves

, each matching the same sequence Move

plus one possible next Move

.



From there, the driver just has to execute flatMap Moves

as deep as possible, and as a result, RDD [Moves] will do all the operations in parallel for us.

The disadvantage of this approach is that the entire depth level is kept synchronously, i.e. we must calculate all the moves in the level n

(i.e. RDD[Moves]

for the level n

) before moving on to the next.

The code below is untested, probably has flaws and won't even compile, but hopefully it gives an idea of ​​how to approach the problem.

/* one modification to the board */
case class Move(from: String, to: String)

case class PieceColor(color: String)

/* state of the game */ 
case class Board {

    // TODO
    def possibleMovesForColor(color: PieceColor): Seq[Move] = 
        Move("here", "there") :: Move("there", "over there") :: Move("there", "here") :: Nil

    // TODO: compute a new instance of board here, based on current + this move
    def update(move: Move): Board = new Board
}


/** Solution, i.e. a sequence of moves*/ 
case class Moves(moves: Seq[Move], game: Board, color: PieceColor) {    
    lazy val score = NegaMax.evaluateDefault(game, color)

    /** @return all valid next Moves  */
    def nextPossibleMoves: Seq[Moves] = 
        board.possibleMovesForColor(color).map { 
            nextMove => 
              play.copy(moves = nextMove :: play.moves, 
                        game = play.game.update(nextMove)
        } 

}

/** Driver code: negaMax: looks for the best next move from a give game state */
def negaMax(sc: SparkContext, game: Board, color: PieceColor, maxDepth: Int):Moves = {

    val initialSolution = Moves(Seq[moves].empty, game, color)

    val allPlays: rdd[Moves] = 
        (1 to maxDepth).foldLeft (sc.parallelize(Seq(initialSolution))) {
        rdd => rdd.flatMap(_.nextPossibleMoves)
    }

    allPlays.reduce { case (m1, m2) => if (m1.score < m2.score) m1 else m2}

}

      

+2


source







All Articles