Differences in spark caching between 2.0.2 and 2.1.1

I am a bit confused with spark caching. I want to compute the dependent dataset (b), cache it and non-persecute the original dataset (a) - here's my code:

val spark = SparkSession.builder().appName("test").master("local[4]").getOrCreate()
import spark.implicits._
val a = spark.createDataset(Seq(("a", 1), ("b", 2), ("c", 3)))
a.createTempView("a")
a.cache
println(s"Is a cached: ${spark.catalog.isCached("a")}")
val b = a.filter(x => x._2 < 3)
b.createTempView("b")
// calling action
b.cache.first
println(s"Is b cached: ${spark.catalog.isCached("b")}")

spark.catalog.uncacheTable("a")
println(s"Is b cached after a was unpersisted: ${spark.catalog.isCached("b")}")

      

When using spark 2.0.2, it works as expected:

Is a cached: true
Is b cached: true
Is b cached after a was unpersisted: true

      

But on 2.1.1:

Is a cached: true
Is b cached: true
Is b cached after a was unpersisted: false

      

How can I archive the same behavior in 2.1.1?

Thank.

+3


source to share


1 answer


I don't know how it should be. According to tests, in Spark 2.1.1 it works as expected, but there are a few comments that reflect some doubts. Perhaps you could open JIRA in the Spark project to clarify this situation.

CachedTableSuite.scala

test("uncaching temp table") {
  testData.select('key).createOrReplaceTempView("tempTable1")
  testData.select('key).createOrReplaceTempView("tempTable2")
  spark.catalog.cacheTable("tempTable1")

  assertCached(sql("SELECT COUNT(*) FROM tempTable1"))
  assertCached(sql("SELECT COUNT(*) FROM tempTable2"))

  // Is this valid?
  spark.catalog.uncacheTable("tempTable2")

  // Should this be cached?
  assertCached(sql("SELECT COUNT(*) FROM tempTable1"), 0)
}

      



The checked method assertCached numCachedTables is equal to the second argument.

QueryTest.scala

/**
 * Asserts that a given [[Dataset]] will be executed using the given number of cached results.
 */
def assertCached(query: Dataset[_], numCachedTables: Int = 1): Unit = {
  val planWithCaching = query.queryExecution.withCachedData
  val cachedData = planWithCaching collect {
    case cached: InMemoryRelation => cached
  }

  assert(
    cachedData.size == numCachedTables,
    s"Expected query to contain $numCachedTables, but it actually had ${cachedData.size}\n" +
    planWithCaching)
}

      

+1


source







All Articles