PySpark 2.1: Importing a module with UDF breaks

I am currently working with Spark 2.1 and have a main script that calls a helper module containing all of my transform methods. In other words:

main.py
helper.py

      

At the top of my file helper.py

, I have a few custom UDFs that I have defined as follows:

def reformat(s):
  return reformat_logic(s)
reformat_udf = udf(reformat, StringType())

      

Before I broke off all the UDFs into a helper file, I was able to connect to my metam via Hive via my SparkSession object using spark.sql('sql statement')

. However, after I moved the UDF to a sub file and imported that file at the top of my main script, the SparkSession object could no longer connect to Hive and returned to the default Derby database. I also get errors when trying to query my Hive tables such asHive support is required to insert into the following tables...

I was able to solve my problem by moving my UDFs into a completely separate file and only executing the import instructions for that module inside the functions they need (not sure if this is a good practice, but it works). Anyway, does anyone understand why I see this kind of peculiar behavior when it comes to Spark and UDFs? And does anyone know a good way to split UDF across all applications?

+3


source to share


1 answer


Prior to Spark 2.2.0 UserDefinedFunction

, it eagerly creates an object UserDefinedPythonFunction

representing Python UDFs to the JVM. This process requires access to SparkContext

and SparkSession

. If there are no active instances when called UserDefinedFunction.__init__

, Spark will automatically initialize the contexts for you.

When you call SparkSession.Builder.getOrCreate

after importing an object UserDefinedFunction

, it returns an existing instance SparkSession

and only some ( enableHiveSupport

not applicable) configuration changes can be applied .

To fix this problem, you must initialize SparkSession

before importing the UDF:



from pyspark.sql.session import SparkSession

spark = SparkSession.builder.enableHiveSupport().getOrCreate()

from helper import reformat_udf

      

This behavior is documented in SPARK-19163 and fixed in Spark 2.2.0. Other API improvements include decorator syntax ( SPARK-19160 ) and improved docstrings handling ( SPARK-19161 ).

+2


source







All Articles