Why is it slow to apply a Spark pipeline to a dataset with many columns but multiple rows?

I am testing spark pipelines using a simple dataset with 312 (mostly numeric) columns, but only 421 rows. It's small, but it takes 3 minutes to apply my ML pipeline to it on a 24-core server with 60GB of memory. It looks a lot like such a tiny dataset. Pipelines like these start quickly on datasets that have fewer columns and more rows. It is something about the number of columns that causes slow performance.

Here is a list of the stages in my line:

000_strIdx_5708525b2b6c      048_bucketizer_888b0055c1ad  096_bucketizer_e677659ca253
001_strIdx_ec2296082913      049_bucketizer_974e0a1433a6  097_bucketizer_396e35548c72
002_bucketizer_3cbc8811877b  050_bucketizer_e848c0937cb9  098_bucketizer_78a6410d7a84
003_bucketizer_5a01d5d78436  051_bucketizer_95611095a4ac  099_bucketizer_e3ae6e54bca1
004_bucketizer_bf290d11364d  052_bucketizer_660a6031acd9  100_bucketizer_9fed5923fe8a
005_bucketizer_c3296dfe94b2  053_bucketizer_aaffe5a3140d  101_bucketizer_8925ba4c3ee2
006_bucketizer_7071ca50eb85  054_bucketizer_8dc569be285f  102_bucketizer_95750b6942b8
007_bucketizer_27738213c2a1  055_bucketizer_83d1bffa07bc  103_bucketizer_6e8b50a1918b
008_bucketizer_bd728fd89ba1  056_bucketizer_0c6180ba75e6  104_bucketizer_36cfcc13d4ba
009_bucketizer_e1e716f51796  057_bucketizer_452f265a000d  105_bucketizer_2716d0455512
010_bucketizer_38be665993ba  058_bucketizer_38e02ddfb447  106_bucketizer_9bcf2891652f
011_bucketizer_5a0e41e5e94f  059_bucketizer_6fa4ad5d3ebd  107_bucketizer_8c3d352915f7
012_bucketizer_b5a3d5743aaa  060_bucketizer_91044ee766ce  108_bucketizer_0786c17d5ef9
013_bucketizer_4420f98ff7ff  061_bucketizer_9a9ef04a173d  109_bucketizer_f22df23ef56f
014_bucketizer_777cc4fe6d12  062_bucketizer_3d98eb15f206  110_bucketizer_bad04578bd20
015_bucketizer_f0f3a3e5530e  063_bucketizer_c4915bb4d4ed  111_bucketizer_35cfbde7e28f
016_bucketizer_218ecca3b5c1  064_bucketizer_8ca2b6550c38  112_bucketizer_cf89177a528b
017_bucketizer_0b083439a192  065_bucketizer_417ee9b760bc  113_bucketizer_183a0d393ef0
018_bucketizer_4520203aec27  066_bucketizer_67f3556bebe8  114_bucketizer_467c78156a67
019_bucketizer_462c2c346079  067_bucketizer_0556deb652c6  115_bucketizer_380345e651ab
020_bucketizer_47435822e04c  068_bucketizer_067b4b3d234c  116_bucketizer_0f39f6de1625
021_bucketizer_eb9dccb5e6e8  069_bucketizer_30ba55321538  117_bucketizer_d8500b2c0c2f
022_bucketizer_b5f63dd7451d  070_bucketizer_ad826cc5d746  118_bucketizer_dc5f1fd09ff1
023_bucketizer_e0fd5041c841  071_bucketizer_77676a898055  119_bucketizer_eeaf9e6cdaef
024_bucketizer_ffb3b9737100  072_bucketizer_05c37a38ce30  120_bucketizer_5614cd4533d7
025_bucketizer_e06c0d29273c  073_bucketizer_6d9ae54163ed  121_bucketizer_2f1230e2871e
026_bucketizer_36ee535a425f  074_bucketizer_8cd668b2855d  122_bucketizer_f8bf9d47e57e
027_bucketizer_ee3a330269f1  075_bucketizer_d50ea1732021  123_bucketizer_2df774393575
028_bucketizer_094b58ea01c0  076_bucketizer_c68f467c9559  124_bucketizer_259320b7fc86
029_bucketizer_e93ea86c08e2  077_bucketizer_ee1dfc840db1  125_bucketizer_e334afc63030
030_bucketizer_4728a718bc4b  078_bucketizer_83ec06a32519  126_bucketizer_f17d4d6b4d94
031_bucketizer_08f6189c7fcc  079_bucketizer_741d08c1b69e  127_bucketizer_da7834230ecd
032_bucketizer_11feb74901e6  080_bucketizer_b7402e4829c7  128_bucketizer_8dbb503f658e
033_bucketizer_ab4add4966c7  081_bucketizer_8adc590dc447  129_bucketizer_e09e2eb2b181
034_bucketizer_4474f7f1b8ce  082_bucketizer_673be99bdace  130_bucketizer_faa04fa16f3c
035_bucketizer_90cfa5918d71  083_bucketizer_77693b45f94c  131_bucketizer_d0bd348a5613
036_bucketizer_1a9ff5e4eccb  084_bucketizer_53529c6b1ac4  132_bucketizer_de6da796e294
037_bucketizer_38085415a4f4  085_bucketizer_6a3ca776a81e  133_bucketizer_0395526346ce
038_bucketizer_9b5e5a8d12eb  086_bucketizer_6679d9588ac1  134_bucketizer_ea3b5eb6058f
039_bucketizer_082bb650ecc3  087_bucketizer_6c73af456f65  135_bucketizer_ad83472038f7
040_bucketizer_57e1e363c483  088_bucketizer_2291b2c5ab51  136_bucketizer_4a17c440fd16
041_bucketizer_337583fbfd65  089_bucketizer_cb3d0fe669d8  137_bucketizer_d468637d4b86
042_bucketizer_73e8f6673262  090_bucketizer_e71f913c1512  138_bucketizer_4fc473a72f1d
043_bucketizer_0f9394ed30b8  091_bucketizer_156528f65ce7  139_vecAssembler_bd87cd105650
044_bucketizer_8530f3570019  092_bucketizer_f3ec5dae079b  140_nb_f134e0890a0d
045_bucketizer_c53614f1e507  093_bucketizer_809fab77eee1  141_sql_a8590b83c826
046_bucketizer_8fd99e6ec27b  094_bucketizer_6925831511e6
047_bucketizer_6a8610496d8a  095_bucketizer_c5d853b95707

      

There are 2 string columns which are converted to int using StringIndexerModel. Then there are the bucketizers who fill all the numeric columns in 2 or 3 minutes each. Is there a way to combine multiple columns at the same time with one step? I didn't see the way. Then there is vectorAssembler to combine all columns into one for the NaiveBayes classifier. Finally, there is a simple SQLTransformer for casting one prection column to int.

This is what the metadata for the two stringIndexers looks like:

{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551461778,"sparkVersion":"2.1.1","uid":"strIdx_5708525b2b6c","paramMap":{"outputCol":"ADI_IDX__","handleInvalid":"skip","inputCol":"ADI_CLEANED__"}}
{"class":"org.apache.spark.ml.feature.StringIndexerModel","timestamp":1492551462004,"sparkVersion":"2.1.1","uid":"strIdx_ec2296082913","paramMap":{"outputCol":"State_IDX__","inputCol":"State_CLEANED__","handleInvalid":"skip"}}

      

The ATVs all look very similar. Here's what the metadata looks like for several of them:

{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462636,"sparkVersion":"2.1.1","uid":"bucketizer_bd728fd89ba1","paramMap":{"outputCol":"HH_02_BINNED__","inputCol":"HH_02_CLEANED__","handleInvalid":"keep","splits":["-Inf",7521.0,12809.5,20299.0,"Inf"]}}
{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462711,"sparkVersion":"2.1.1","uid":"bucketizer_e1e716f51796","paramMap":{"splits":["-Inf",6698.0,13690.5,"Inf"],"handleInvalid":"keep","outputCol":"HH_97_BINNED__","inputCol":"HH_97_CLEANED__"}}
{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462784,"sparkVersion":"2.1.1","uid":"bucketizer_38be665993ba","paramMap":{"splits":["-Inf",4664.0,7242.5,11770.0,14947.0,"Inf"],"outputCol":"HH_90_BINNED__","handleInvalid":"keep","inputCol":"HH_90_CLEANED__"}}
{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462858,"sparkVersion":"2.1.1","uid":"bucketizer_5a0e41e5e94f","paramMap":{"splits":["-Inf",6107.5,10728.5,"Inf"],"outputCol":"HH_80_BINNED__","inputCol":"HH_80_CLEANED__","handleInvalid":"keep"}}
{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551462931,"sparkVersion":"2.1.1","uid":"bucketizer_b5a3d5743aaa","paramMap":{"outputCol":"HHPG9702_BINNED__","splits":["-Inf",8.895000457763672,"Inf"],"handleInvalid":"keep","inputCol":"HHPG9702_CLEANED__"}}
{"class":"org.apache.spark.ml.feature.Bucketizer","timestamp":1492551463004,"sparkVersion":"2.1.1","uid":"bucketizer_4420f98ff7ff","paramMap":{"splits":["-Inf",54980.5,"Inf"],"outputCol":"MEDHI97_BINNED__","handleInvalid":"keep","inputCol":"MEDHI97_CLEANED__"}}

      

Here is the metadata for the NaiveBayes model:

{"class":"org.apache.spark.ml.classification.NaiveBayesModel","timestamp":1492551472568,"sparkVersion":"2.1.1","uid":"nb_f134e0890a0d","paramMap":{"modelType":"multinomial","probabilityCol":"_class_probability_column__","smoothing":1.0,"predictionCol":"_prediction_column_","rawPredictionCol":"rawPrediction","featuresCol":"_features_column__","labelCol":"DAYPOP_BINNED__"}}

      

and for the final SQLTransformer

{"class":"org.apache.spark.ml.feature.SQLTransformer","timestamp":1492551472804,"sparkVersion":"2.1.1","uid":"sql_a8590b83c826","paramMap":{"statement":"SELECT *, CAST(_prediction_column_ AS INT) AS `_*_prediction_label_column_*__` FROM __THIS__"}}

      

Why is it very slow when there are over a hundred hundred hundred columns (and only a few rows), but with millions of rows (with fewer columns) works fine? In addition to being slow when using this pipeline, it is also slow to build it. Fitting and evaluating steps takes a few minutes. Is there something you can do to get it done faster?

I get similar results using 2.1.1RC, 2.1.2 (tip) and 2.2.0 (tip). Spark 2.1.0 gives a Janino 64k terminal bug when trying to build this pipeline (see https://issues.apache.org/jira/browse/SPARK-16845 ).

+3


source to share





All Articles