How to use a spark quantilizer on multiple columns

All,

I have a ml pipeline setup as shown below

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)

      

When I run this, spark seems to be setting up each sampler as a separate job. Is there a way to run all samplers as one job with or without pipeline? Thanks for the help, rate it.

+3


source to share


2 answers


support for this feature was added in Spark 2.3.0. See release documentation

  • Multiple column support for multiple functional transformers:
    • [SPARK-13030]: OneHotEncoderEstimator (Scala / Java / Python)
    • [SPARK-22397]: QuantileDiscretizer (Scala / Java)
    • [SPARK-20542]: Bucketizer (Scala / Java / Python)

You can now use setInputCols

and setOutputCols

to specify multiple columns, although this doesn't seem to be reflected in the official docs yet. Performance has been significantly improved with this new patch compared to running each job one column at a time.



Your example could be adapted like this:

import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}    
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random

val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()

//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)

val discretizer = new QuantileDiscretizer()
  .setInputCols(continuous)
  .setOutputCols(continuous.map(c => s"${c}_disc"))
  .setNumBuckets(3)

val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)

      

0


source


import org.apache.spark.ml.feature.QuantileDiscretizer

val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")

val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)

val result = discretizer.fit(df).transform(df)
result.show()

      

taken from quantilediscretizer

it runs as one job for one column, below it also runs as one job, but for multiple columns:

def discretizerFun (col: String, bucketNo: Int): 
 org.apache.spark.ml.feature.QuantileDiscretizer = {

val discretizer = new QuantileDiscretizer()

discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}


val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0, 
88.3), (4, 2.2, 0.8))

val df = spark.createDataFrame(data).toDF("id", "hour", "temp")

      



df.show

val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))

      

df.show

The best way is to convert this function to udf

, but that might be a problem with org.apache.spark.ml.feature.QuantileDiscretizer

- type

if it can be done then you have a nice and clean way to do lazy conversions

0


source







All Articles