Spark flow request from spark sheath (pyspark)
I follow this example in the console pyspark
and everything works fine.
After that, I wrote it as a pyspark app like this:
# -*- coding: utf-8 -*-
import sys
import click
import logging
from pyspark.sql import SparkSession
from pyspark.sql.types import *
@click.command()
@click.option('--master')
def most_idiotic_bi_query(master):
spark = SparkSession \
.builder \
.master(master)\
.appName("stream-test")\
.getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
some_schema = .... # Schema removed
some_stream = spark\
.readStream\
.option("sep", ",")\
.schema(some_schema)\
.option("maxFilesPerTrigger", 1)\
.csv("/data/some_stream", header=True)
streaming_counts = (
linkage_stream.groupBy(some_stream.field_1).count()
)
query = streaming_counts.writeStream\
.format("memory")\
.queryName("counts")\
.outputMode("complete")\
.start()
query.awaitTermination()
if __name__ == "__main__":
logging.getLogger("py4j").setLevel(logging.ERROR)
most_idiotic_bi_query()
The application runs like:
spark-submit test_stream.py --master spark://master:7077
Now if I open up a new spark driver in another terminal:
pyspark --master spark://master:7077
And try running:
spark.sql("select * from counts")
Failure:
During handling of the above exception, another exception occurred:
AnalysisExceptionTraceback (most recent call last)
<ipython-input-3-732b22f02ef6> in <module>()
----> 1 spark.sql("select * from id_counts").show()
/usr/spark-2.0.2/python/pyspark/sql/session.py in sql(self, sqlQuery)
541 [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
542 """
--> 543 return DataFrame(self._jsparkSession.sql(sqlQuery), self._wrapped)
544
545 @since(2.0)
/usr/local/lib/python3.4/dist-packages/py4j-0.10.4-py3.4.egg/py4j/java_gateway.py in __call__(self, *args)
1131 answer = self.gateway_client.send_command(command)
1132 return_value = get_return_value(
-> 1133 answer, self.gateway_client, self.target_id, self.name)
1134
1135 for temp_arg in temp_args:
/usr/spark-2.0.2/python/pyspark/sql/utils.py in deco(*a, **kw)
67 e.java_exception.getStackTrace()))
68 if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):
71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
AnalysisException: 'Table or view not found: counts; line 1 pos 14'
I do not understand what is going on
source to share
This is expected behavior. If you check the documentation for the memory receiver:
The output is stored in memory as a table in memory. Both modes are supported: "Add" and "Full Exit". This should be used for debugging purposes with small amounts of data, since all output is collected and stored in driver memory . Hence, use it with care.
As you can see, the memory sink does not create a persistent table or a global temporary view, but a driver-limited local structure. Hence, it cannot be requested from another Spark application.
Therefore, output from memory must be requested from the driver in which it is written. For example, you could simulate the mode console
shown below.
Fictitious writer:
import pandas as pd
import numpy as np
import tempfile
import shutil
def producer(path):
temp_path = tempfile.mkdtemp()
def producer(i):
df = pd.DataFrame({
"group": np.random.randint(10, size=1000)
})
df["val"] = (
np.random.randn(1000) +
np.random.random(1000) * df["group"] +
np.random.random(1000) * i % 7
)
f = tempfile.mktemp(dir=temp_path)
df.to_csv(f, index=False)
shutil.move(f, path)
return producer
Spark app:
from pyspark.sql.types import IntegerType, DoubleType, StructType, StructField
schema = StructType([
StructField("group", IntegerType()),
StructField("val", DoubleType())
])
path = tempfile.mkdtemp()
query_name = "foo"
stream = (spark.readStream
.schema(schema)
.format("csv")
.option("header", "true")
.load(path))
query = (stream
.groupBy("group")
.avg("val")
.writeStream
.format("memory")
.queryName(query_name)
.outputMode("complete")
.start())
And some events:
from rx import Observable
timer = Observable.timer(5000, 5000)
timer.subscribe(producer(path))
timer.skip(1).subscribe(lambda *_: spark.table(query_name).show())
query.awaitTermination()
source to share