PySpark 2.1: Importing a module with UDF breaks
I am currently working with Spark 2.1 and have a main script that calls a helper module containing all of my transform methods. In other words:
main.py
helper.py
At the top of my file helper.py
, I have a few custom UDFs that I have defined as follows:
def reformat(s):
return reformat_logic(s)
reformat_udf = udf(reformat, StringType())
Before I broke off all the UDFs into a helper file, I was able to connect to my metam via Hive via my SparkSession object using spark.sql('sql statement')
. However, after I moved the UDF to a sub file and imported that file at the top of my main script, the SparkSession object could no longer connect to Hive and returned to the default Derby database. I also get errors when trying to query my Hive tables such asHive support is required to insert into the following tables...
I was able to solve my problem by moving my UDFs into a completely separate file and only executing the import instructions for that module inside the functions they need (not sure if this is a good practice, but it works). Anyway, does anyone understand why I see this kind of peculiar behavior when it comes to Spark and UDFs? And does anyone know a good way to split UDF across all applications?
source to share
Prior to Spark 2.2.0 UserDefinedFunction
, it eagerly creates an object UserDefinedPythonFunction
representing Python UDFs to the JVM. This process requires access to SparkContext
and SparkSession
. If there are no active instances when called UserDefinedFunction.__init__
, Spark will automatically initialize the contexts for you.
When you call SparkSession.Builder.getOrCreate
after importing an object UserDefinedFunction
, it returns an existing instance SparkSession
and only some ( enableHiveSupport
not applicable) configuration changes can be applied .
To fix this problem, you must initialize SparkSession
before importing the UDF:
from pyspark.sql.session import SparkSession
spark = SparkSession.builder.enableHiveSupport().getOrCreate()
from helper import reformat_udf
This behavior is documented in SPARK-19163 and fixed in Spark 2.2.0. Other API improvements include decorator syntax ( SPARK-19160 ) and improved docstrings handling ( SPARK-19161 ).
source to share