Spark streaming and py4j.Py4JException: __getnewargs __ ([]) method does not exist

I am trying to implement a Spark streaming app but I am returning an exception: py4j.Py4JException: getnewargs ([]) method does not exist

I don't understand the source of this exception. I read here that I cannot use the SparkSession instance outside of the driver. But I don't know if I am doing this. I don't understand how to determine if any code is running on a driver or executor - I understand the difference between transforms and actions (I think), but when it comes to threads and foreachRDD, I am at a loss.

The app is a Spark streaming app running on AWS EMR, reading data from AWS Kinesis. We submit Spark app via spark-submit using the -deploy-mode cluster. Each entry in the stream is a JSON object in the form:

{"type":"some string","state":"an escaped JSON string"}

      

eg:.

{"type":"type1","state":"{\"some_property\":\"some value\"}"}

      

Here is my application in its current state:

# Each handler subclasses from BaseHandler and
# has the method
# def process(self, df, df_writer, base_writer_path)
# Each handler process method performs additional transformations.
# df_writer is a function which writes a Dataframe to some S3 location.

HANDLER_MAP = {
    'type1': Type1Handler(),
    'type2': Type2Handler(),
    'type3': Type3Handler()
}

FORMAT = 'MyProject %(asctime)s %(levelname)s %(name)s: %(message)s'
logging.basicConfig(level=logging.INFO, format=FORMAT)
logger = logging.getLogger(__name__)

# Use a closure and lambda to create streaming context
create = lambda: create_streaming_context(
    spark_app_name=spark_app_name,
    kinesis_stream_name=kinesis_stream_name,
    kinesis_endpoint=kinesis_endpoint,
    kinesis_region=kinesis_region,
    initial_position=InitialPositionInStream.LATEST,
    checkpoint_interval=checkpoint_interval,
    checkpoint_s3_path=checkpoint_s3_path,
    data_s3_path=data_s3_path)

streaming_context = StreamingContext.getOrCreate(checkpoint_s3_path, create)

streaming_context.start()
streaming_context.awaitTermination()

      

Streaming context creation function:

def create_streaming_context(
    spark_app_name, kinesis_stream_name, kinesis_endpoint,
    kinesis_region, initial_position, checkpoint_interval,
    data_s3_path, checkpoint_s3_path):
    """Create a new streaming context or reuse a checkpointed one."""

    # Spark configuration
    spark_conf = SparkConf()
    spark_conf.set('spark.streaming.blockInterval', 37500)
    spark_conf.setAppName(spark_app_name)

    # Spark context
    spark_context = SparkContext(conf=spark_conf)

    # Spark streaming context
    streaming_context = StreamingContext(spark_context, batchDuration=300)
    streaming_context.checkpoint(checkpoint_s3_path)

    # Spark session
    spark_session = get_spark_session_instance(spark_conf)

    # Set up stream processing
    stream = KinesisUtils.createStream(
        streaming_context, spark_app_name, kinesis_stream_name,
        kinesis_endpoint, kinesis_region, initial_position,
        checkpoint_interval)

    # Each record in the stream is a JSON object in the form:
    # {"type": "some string", "state": "an escaped JSON string"}
    json_stream = stream.map(json.loads)

    for state_type in HANDLER_MAP.iterkeys():
        filter_stream(json_stream, spark_session, state_type, data_s3_path)

    return streaming_context

      

The get_spark_session_instance function returns a global SparkSession instance (copied from here ):

def get_spark_session_instance(spark_conf):
    """Lazily instantiated global instance of SparkSession"""

    logger.info('Obtaining global SparkSession instance...')
    if 'sparkSessionSingletonInstance' not in globals():
        logger.info('Global SparkSession instance does not exist, creating it...')

        globals()['sparkSessionSingletonInstance'] = SparkSession\
            .builder\
            .config(conf=spark_conf)\
            .getOrCreate()

    return globals()['sparkSessionSingletonInstance']

      

The filter_stream function is designed to filter the stream by the type property in JSON. The goal is to convert stream to stream where each entry is an escaped JSON string from the "state" property in the original JSON:

def filter_stream(json_stream, spark_session, state_type, data_s3_path):
    """Filter stream by type and process the stream."""

    state_type_stream = json_stream\
        .filter(lambda jsonObj: jsonObj['type'] == state_type)\
        .map(lambda jsonObj: jsonObj['state'])

    state_type_stream.foreachRDD(lambda rdd: process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path))

      

The process_rdd function is for loading JSON into the Dataframe using the correct schema depending on the type of the original JSON object. The handler instance returns a valid Spark schema and has a process method that performs further transformations on the dataframe (after which df_writer is called and the Dataframe is written to S3):

def process_rdd(spark_session, rdd, state_type, df_writer, data_s3_path):
    """Process an RDD by state type."""

    if rdd.isEmpty():
        logger.info('RDD is empty, returning early.')
        return

    handler = HANDLER_MAP[state_type]
    df = spark_session.read.json(rdd, handler.get_schema())
    handler.process(df, df_writer, data_s3_path)

      

Mostly I'm confused about the source of the exception. Does it have to do with how I am using spark_session.read.json? If so, how is this related? If not, is there something else in my code that is wrong?

Everything seems to work correctly if I simply replace the call to StreamingContext.getOrCreate with the contents of the create_streaming_context method. I was wrong about this - I get the same exception anyway. I think the control point material is red herring ... I am obviously doing something else wrong.

I would really appreciate any help with this issue and I am happy to clarify anything or add more information!

+3


source to share





All Articles