How to use a spark quantilizer on multiple columns
All,
I have a ml pipeline setup as shown below
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizers = continuous.map(c => new QuantileDiscretizer().setInputCol(c).setOutputCol(s"${c}_disc").setNumBuckets(3).fit(df))
val pipeline = new Pipeline().setStages(discretizers)
val model = pipeline.fit(df)
When I run this, spark seems to be setting up each sampler as a separate job. Is there a way to run all samplers as one job with or without pipeline? Thanks for the help, rate it.
source to share
support for this feature was added in Spark 2.3.0. See release documentation
- Multiple column support for multiple functional transformers:
- [SPARK-13030]: OneHotEncoderEstimator (Scala / Java / Python)
- [SPARK-22397]: QuantileDiscretizer (Scala / Java)
- [SPARK-20542]: Bucketizer (Scala / Java / Python)
You can now use setInputCols
and setOutputCols
to specify multiple columns, although this doesn't seem to be reflected in the official docs yet. Performance has been significantly improved with this new patch compared to running each job one column at a time.
Your example could be adapted like this:
import org.apache.spark.ml.feature.QuantileDiscretizer
import org.apache.spark.sql.types.{StructType,StructField,DoubleType}
import org.apache.spark.ml.Pipeline
import org.apache.spark.rdd.RDD
import org.apache.spark.sql._
import scala.util.Random
val nRows = 10000
val nCols = 1000
val data = sc.parallelize(0 to nRows-1).map { _ => Row.fromSeq(Seq.fill(nCols)(Random.nextDouble)) }
val schema = StructType((0 to nCols-1).map { i => StructField("C" + i, DoubleType, true) } )
val df = spark.createDataFrame(data, schema)
df.cache()
//Get continuous feature name and discretize them
val continuous = df.dtypes.filter(_._2 == "DoubleType").map (_._1)
val discretizer = new QuantileDiscretizer()
.setInputCols(continuous)
.setOutputCols(continuous.map(c => s"${c}_disc"))
.setNumBuckets(3)
val pipeline = new Pipeline().setStages(Array(discretizer))
val model = pipeline.fit(df)
model.transform(df)
source to share
import org.apache.spark.ml.feature.QuantileDiscretizer
val data = Array((0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2))
val df = spark.createDataFrame(data).toDF("id", "hour")
val discretizer = new QuantileDiscretizer()
.setInputCol("hour")
.setOutputCol("result")
.setNumBuckets(3)
val result = discretizer.fit(df).transform(df)
result.show()
taken from quantilediscretizer
it runs as one job for one column, below it also runs as one job, but for multiple columns:
def discretizerFun (col: String, bucketNo: Int):
org.apache.spark.ml.feature.QuantileDiscretizer = {
val discretizer = new QuantileDiscretizer()
discretizer
.setInputCol(col)
.setOutputCol(s"${col}_result")
.setNumBuckets(bucketNo)
}
val data = Array((0, 18.0, 2.1), (1, 19.0, 14.1), (2, 8.0, 63.7), (3, 5.0,
88.3), (4, 2.2, 0.8))
val df = spark.createDataFrame(data).toDF("id", "hour", "temp")
val res = discretizerFun("temp", 4).fit(discretizerFun("hour", 2).fit(df).transform(df)).transform(discretizerFun("hour", 2).fit(df).transform(df))
The best way is to convert this function to udf
, but that might be a problem with org.apache.spark.ml.feature.QuantileDiscretizer
- type
if it can be done then you have a nice and clean way to do lazy conversions
source to share