Pyspark RDD for DataFrame with forced schema: value error

I am working with pyspark with a schema commensurate with the one shown at the end of this post (note the nested lists, unordered fields) originally imported from Parquet as a DataFrame. Basically the problem I am having is the inability to process this data as RDD and then convert back to DataFrame. (I've looked at several related posts, but I still can't tell where I'm going wrong.)

Trivially, the following code works fine (as you would expect):

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
tripDFNew = sqlContext.createDataFrame(tripRDD, schema)
tripDFNew.take(1)

      

Things don't work when I need to match the RDD (as it would for adding a field, for example).

schema = deepcopy(tripDF.schema)
tripRDD = tripDF.rdd
def trivial_map(row):
    rowDict = row.asDict()
    return pyspark.Row(**rowDict)
tripRDDNew = tripRDD.map(lambda row: trivial_map(row))
tripDFNew = sqlContext.createDataFrame(tripRDDNew, schema)
tripDFNew.take(1)

      

The above code gives the following exception, where XXX is the stand-in for an integer that varies from run to run (for example, I've seen 1, 16, 23, etc.):

File "/opt/cloudera/parcels/CDH-5.8.3-
1.cdh5.8.3.p1967.2057/lib/spark/python/pyspark/sql/types.py", line 546, in 
toInternal
raise ValueError("Unexpected tuple %r with StructType" % obj)
ValueError: Unexpected tuple XXX with StructType`

      

Given this information, is there a clear error in the second block of code ? (I note that tripRDD is of the rdd.RDD class, and tripRDDNew is of the rdd.PipelinedRDD class, but I don't think this should be a problem.) (I also note that tripRDD schema is not sorted by field name, whereas for tripRDDNew is sorted by field name. Again, I don't understand why that would be a problem.)

Scheme:

root
 |-- foo: struct (nullable = true)
 |    |-- bar_1: integer (nullable = true)
 |    |-- bar_2: integer (nullable = true)
 |    |-- bar_3: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |    |-- bar_4: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- baz_1: integer (nullable = true)
 |    |    |    |-- baz_2: string (nullable = true)
 |    |    |    |-- baz_3: double (nullable = true)
 |-- qux: integer (nullable = true)
 |-- corge: integer (nullable = true)
 |-- uier: integer (nullable = true)`

      

+3


source to share


1 answer


As noted in the post, the original schema has fields that are not alphabetically ordered. This is the problem. Using .asDict () in a mapping function orders the fields of the resulting RDD. The tripRDDNew field order conflicts with the schema when calling createDataFrame. ValueError is the result of trying to parse one of the integer fields (such as qux, corge, or uier in the example) as a StructType.

(As an aside: it's a bit surprising that createDataFrame requires schema fields to be in the same order as RDD fields. You either need to require consistent field names or consistent field ordering, but requires both to appear overkill.)



(As a second aside: the existence of non-abgraphic fields in a DataFrame is somewhat abnormal. For example, sc.parallelize () will automatically arrange the fields alphabetically when distributing the data structure. It looks like the fields need to be ordered when importing a DataFrame from a parquet file. Maybe interesting to investigate why this is not the case.)

+2


source







All Articles