Memory allocation issue in Spark DataFrame to Hive table letter

I am trying to save Spark DataFrame in a Hive table (parquet) with .saveAsTable()

in pySpark but keep working in memory issues like below:

org.apache.hadoop.hive.ql.metadata.HiveException: parquet.hadoop.MemoryManager$1:
New Memory allocation 1034931 bytes is smaller than the minimum allocation size of 1048576 bytes.

      

The first number ( 1034931

) usually changes in different runs. I'll admit the second number ( 1048576

) is this 1024^2

, but I have no idea what it means here.

I've used the same technique for several of my other projects (with much larger DataFrames) and it worked without issue. Here I have essentially copied the process and config structure, but run into a memory issue! It must be something trivial that I am missing.

Spark DataFrame (let's call it sdf

) has a structure (~ 10 columns and ~ 300k rows, but could be more if executed correctly):

+----------+----------+----------+---------------+---------------+
| col_a_str| col_b_num| col_c_num|partition_d_str|partition_e_str|
+----------+----------+----------+---------------+---------------+
|val_a1_str|val_b1_num|val_c1_num|     val_d1_str|     val_e1_str|
|val_a2_str|val_b2_num|val_c2_num|     val_d2_str|     val_e2_str|
|       ...|       ...|       ...|            ...|            ...|
+----------+----------+----------+---------------+---------------+

      

The Hive table was created as follows:

sqlContext.sql("""
                    CREATE TABLE IF NOT EXISTS my_hive_table (
                        col_a_str string,
                        col_b_num double,
                        col_c_num double
                    ) 
                    PARTITIONED BY (partition_d_str string,
                                    partition_e_str string)
                    STORED AS PARQUETFILE
               """)

      

Attempting to insert data into this table with the following command:

sdf.write \
   .mode('append') \
   .partitionBy('partition_d_str', 'partition_e_str') \
   .saveAsTable('my_hive_table')

      

Spark / Hive config looks like this:

spark_conf = pyspark.SparkConf()
spark_conf.setAppName('my_project')

spark_conf.set('spark.executor.memory', '16g')
spark_conf.set('spark.python.worker.memory', '8g')
spark_conf.set('spark.yarn.executor.memoryOverhead', '15000')
spark_conf.set('spark.dynamicAllocation.maxExecutors', '64')
spark_conf.set('spark.executor.cores', '4')

sc = pyspark.SparkContext(conf=spark_conf)

sqlContext = pyspark.sql.HiveContext(sc)
sqlContext.setConf('hive.exec.dynamic.partition', 'true')
sqlContext.setConf('hive.exec.max.dynamic.partitions', '5000')
sqlContext.setConf('hive.exec.dynamic.partition.mode', 'nonstrict')
sqlContext.setConf('hive.exec.compress.output', 'true')

      

I tried changing .partitionBy('partition_d_str', 'partition_e_str')

to .partitionBy(['partition_d_str', 'partition_e_str'])

, increasing memory, splitting the DataFrame into smaller chunks, re-creating the tables and DataFrame, but nothing seems to work. I also cannot find solutions on the internet. What could be causing the memory error (I'm not quite sure where it came from), and how can I change my code to write to the Hive table? Thank.

+3


source to share


1 answer


It turns out I was punching a nullable field that was throwing away .saveAsTable()

. When I converted the RDD to Spark DataFrame, the schema I provided was generated like this:

from pyspark.sql.types import *

# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), True)])

# Convert RDD to Spark DataFrame
sdf = sqlContext.createDataFrame(my_rdd, schema=my_schema)

      

Since it partition_e_str

was declared as nullable=True

(third argument for this StructField

), it had problems writing to the Hive table as it was being used as one of the partitioning fields. I changed it to:



# Define schema
my_schema = StructType(
                    [StructField('col_a_str', StringType(), False),
                     StructField('col_b_num', DoubleType(), True),
                     StructField('col_c_num', DoubleType(), True),
                     StructField('partition_d_str', StringType(), False),
                     StructField('partition_e_str', StringType(), False)])

      

and everything was fine again!

Lesson: Make sure the split fields are not NULL!

+3


source







All Articles