Memory error while reading large file in Spark 2.1.0

I want to use spark to read a large (51GB) XML file (on an external hard drive) into a dataframe (using the spark-xml plugin ), do a simple mapping / filtering, reorder it, then write it back to disk as a CSV file ...

But I always get it java.lang.OutOfMemoryError: Java heap space

no matter how I set it up.

I want to understand why not increasing the number of partitions stops the OOM error

Shouldn't the task be split up into more parts so that each individual part is smaller and doesn't cause memory problems?

(Spark can't try to hammer everything into memory and crash if it doesn't fit, right?)

Things I've tried:

  • reallocation / merging (5000 and 10000 partitions) of a data frame when reading and writing (initial value is 1.604)
  • using fewer performers (6, 4, even with 2 performers, I get an OOM error!)
  • reduce the size of shared files (by default it looks like 33MB)
  • give a ton of RAM (all I have)
  • increase spark.memory.fraction

    to 0.8 (default 0.6)
  • decrease spark.memory.storageFraction

    to 0.2 (default 0.5)
  • set spark.default.parallelism

    to 30 and 40 (default for me is 8)
  • set spark.files.maxPartitionBytes

    to 64M (default 128M)

All my code is here (note that I am not caching anything):

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema) // defined previously
  .option("rowTag", "row")
  .load(s"$pathToInputXML")

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
// prints 1604

// i pass `numPartitions` as cli arguments
val df2 = df.coalesce(numPartitions)

// filter and select only the cols i'm interested in
val dsout = df2
  .where( df2.col("_TypeId") === "1" )
  .select(
    df("_Id").as("id"),
    df("_Title").as("title"),
    df("_Body").as("body"),
  ).as[Post]

// regexes to clean the text
val tagPat = "<[^>]+>".r
val angularBracketsPat = "><|>|<"
val whitespacePat = """\s+""".r


// more mapping
dsout
 .map{
  case Post(id,title,body,tags) =>

    val body1 = tagPat.replaceAllIn(body,"")
    val body2 = whitespacePat.replaceAllIn(body1," ")

    Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

}
.orderBy(rand(SEED)) // random sort
.write // write it back to disk
.option("quoteAll", true)
.mode(SaveMode.Overwrite)
.csv(output)

      

NOTES

  • the split of the input is really small (only 33MB), so why can't I have 8 threads each processing one section? it really shouldn't blow my memory (i se

UPDATE I wrote a shorter version of the code that just reads a file and then forEachPartition (println).

I am getting the same OOM error:

val df: DataFrame = spark.sqlContext.read
  .option("mode", "DROPMALFORMED")
  .format("com.databricks.spark.xml")
  .schema(customSchema)
  .option("rowTag", "row")
  .load(s"$pathToInputXML")
  .repartition(numPartitions)

println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")

df
  .where(df.col("_PostTypeId") === "1")
  .select(
   df("_Id").as("id"),
   df("_Title").as("title"),
   df("_Body").as("body"),
   df("_Tags").as("tags")
  ).as[Post]
  .map {
    case Post(id, title, body, tags) =>
      Post(id, title.toLowerCase, body.toLowerCase, tags.toLowerCase))
  }
  .foreachPartition { rdd =>
    if (rdd.nonEmpty) {
      println(s"HI! I'm an RDD and I have ${rdd.size} elements!")
    }
  }

      

PS: I am using spark v 2.1.0. My machine has 8 cores and 16GB of RAM.

+3


source to share


2 answers


Because you are saving your RDD twice Your logic should be like this or filter with SparkSql



 val df: DataFrame = SparkFactory.spark.read
      .option("mode", "DROPMALFORMED")
      .format("com.databricks.spark.xml")
      .schema(customSchema) // defined previously
      .option("rowTag", "row")
      .load(s"$pathToInputXML")
      .coalesce(numPartitions)

    println(s"\n\nNUM PARTITIONS: ${df.rdd.getNumPartitions}\n\n")
    // prints 1604


    // regexes to clean the text
    val tagPat = "<[^>]+>".r
    val angularBracketsPat = "><|>|<"
    val whitespacePat = """\s+""".r

    // filter and select only the cols i'm interested in
     df
      .where( df.col("_TypeId") === "1" )
      .select(
        df("_Id").as("id"),
        df("_Title").as("title"),
        df("_Body").as("body"),
      ).as[Post]
      .map{
        case Post(id,title,body,tags) =>

          val body1 = tagPat.replaceAllIn(body,"")
          val body2 = whitespacePat.replaceAllIn(body1," ")

          Post(id,title.toLowerCase,body2.toLowerCase, tags.split(angularBracketsPat).mkString(","))

      }
      .orderBy(rand(SEED)) // random sort
      .write // write it back to disk
      .option("quoteAll", true)
      .mode(SaveMode.Overwrite)
      .csv(output)

      

0


source


You can change the heap size by adding the following to your environment variable:



  • Environment variable name: _JAVA_OPTIONS
  • Environment variable value: -Xmx512M -Xms512M
-2


source







All Articles