Apache Spark MLLib - Running KMeans with IDF-TF Vectors - Java Heap Space

I am trying to run KMeans on MLLib from a (large) collection of text documents (TF-IDF vectors). The documents are submitted through the Lucene English parser, and the sparse vectors are generated from the HashingTF.transform () function. Regardless of the degree of parabolism I use (via the coalesce function) KMeans.train always returns the OutOfMemory exception below. Any thought on how to fix this issue?

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:138)
at scala.reflect.ManifestFactory$$anon$12.newArray(Manifest.scala:136)
at breeze.linalg.Vector$class.toArray(Vector.scala:80)
at breeze.linalg.SparseVector.toArray(SparseVector.scala:48)
at breeze.linalg.Vector$class.toDenseVector(Vector.scala:75)
at breeze.linalg.SparseVector.toDenseVector(SparseVector.scala:48)
at breeze.linalg.Vector$class.toDenseVector$mcD$sp(Vector.scala:74)
at breeze.linalg.SparseVector.toDenseVector$mcD$sp(SparseVector.scala:48)
at org.apache.spark.mllib.clustering.BreezeVectorWithNorm.toDense(KMeans.scala:422)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(KMeans.scala:285)
at org.apache.spark.mllib.clustering.KMeans$$anonfun$initKMeansParallel$1.apply(KMeans.scala:284)
at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:108)
at org.apache.spark.mllib.clustering.KMeans.initKMeansParallel(KMeans.scala:284)
at org.apache.spark.mllib.clustering.KMeans.runBreeze(KMeans.scala:143)
at org.apache.spark.mllib.clustering.KMeans.run(KMeans.scala:126)
at org.apache.spark.mllib.clustering.KMeans$.train(KMeans.scala:338)
at org.apache.spark.mllib.clustering.KMeans$.train(KMeans.scala:348)

      

+3


source to share


1 answer


After some research, it turns out that this question was method related new HashingTF().transform(v)

. While creating sparse vectors using a hashing trick is really useful (especially when the number of functions is unknown), the vector must remain sparse. The default size for HashingTF vectors is 2 ^ 20. Given the double precision of 64 bits, each vector would theoretically require 8MB when converted to a Dense vector - regardless of the size reduction we might apply.

Unfortunately, KMeans uses the toDense method (at least for cluster centers), so it throws an OutOfMemory error (assuming k = 1000).



  private def initRandom(data: RDD[BreezeVectorWithNorm]) : Array[Array[BreezeVectorWithNorm]] = {
    val sample = data.takeSample(true, runs * k, new XORShiftRandom().nextInt()).toSeq
    Array.tabulate(runs)(r => sample.slice(r * k, (r + 1) * k).map { v =>
      new BreezeVectorWithNorm(v.vector.toDenseVector, v.norm)
    }.toArray)
  }

      

+3


source







All Articles