Spark java Problem creating string with java.util.Map type
Spark use 2.1
I have created DataSet with MapDataType inside
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("features", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
});
Map<String,Integer> abc = new HashMap<String,Integer>();
abc.put("abc", 1);
Row r = RowFactory.create(0, "Hi these are words ", 1, abc);
List<Row> data = Arrays.asList(r);
Dataset<Row> wordDataFrame = spark.createDataFrame(data, schema);
wordDataFrame.show();
This above code works fine.
But when I try to call the map function on this DataSet (to replace the DataType records with the new HashMap) I get the following error.
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
Map<String, Integer> newMap = new HashMap<String, Integer>();
newMap.put("Transformed string", 1);
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), newMap);
}
}, encoder);
return output;
Error stack:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.HashMap is not a valid external type for schema of map<string,int>
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:410)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
What am I missing here? Why am I getting " java.util.HashMap is not a valid external type for map schema "
Edit:
I tried java.util.List Data Type
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createArrayType(DataTypes.StringType), false, Metadata.empty())
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
List<String> xyz = Arrays.asList("Hi", "how", "now");
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
}
}, encoder);
I am getting a similar msg error
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: java.util.Arrays$ArrayList is not a valid external type for schema of array<string>
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(generated.java:221)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:377)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:231)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:225)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$25.apply(RDD.scala:826)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:323)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:287)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
java.lang.String works fine
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.StringType, false, Metadata.empty())
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
String xyz = Arrays.asList("Please", "work", "now").toString();
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), xyz);
}
}, encoder);
It looks like primitive DataTypes are working fine!
Solution: This worked for me
I used Java HashMap to Scala Map and changed the code as follows
StructType schema = new StructType(new StructField[]{
new StructField("id", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("words", DataTypes.StringType, false, Metadata.empty()),
new StructField("label", DataTypes.IntegerType, false, Metadata.empty()),
new StructField("featuresNew", DataTypes.createMapType(DataTypes.StringType, DataTypes.IntegerType), false, Metadata.empty())
});
ExpressionEncoder<Row> encoder = RowEncoder.apply(schema);
Dataset<Row> output = input.map(new MapFunction<Row, Row>() {
@Override
public Row call(Row row) throws Exception {
HashMap<String, Integer> newMap = new HashMap<String,Integer();
newMap.put("Transformed string", 1);
return RowFactory.create(row.getInt(0), row.getString(1), row.getInt(2), ToScalaExample.toScalaMap(newMap));
}
}, encoder);
return output;
I think for primitive Datatypes, spark is implicitly converting java Datatypes to Scala Datatypes. For others, we need to explicitly transform them.
source to share
No one has answered this question yet
See similar questions:
or similar: