Spark DataTables: Where is the By section?
We have the general flow of processing Sparks something like this:
Loading:
rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
Processing id
( so we do partitionBy
above !)
rdd.reduceByKey(....)
rdd.join(...)
However, Spark 1.3 changed sqlContext.parquetFile
to return DataFrame
instead RDD
, and no longer has methods partitionBy
, getNumPartitions
and reduceByKey
.
What are we doing with now partitionBy
?
We can replace the boot code with something like
rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)
and use groupBy
instead reduceByKey
.
Is it correct?
PS. Yes, I understand that is partitionBy
not required for groupBy
etc. However, without preceding partitionBy
each join
, groupBy
& c can perform cross-node operations. I am looking for a way to ensure that all operations requiring grouping by my key will be done locally .
source to share
It looks like as of version 1.6 repartition(self, numPartitions, *cols)
does what I need:
.. versionchanged:: 1.6
Added optional arguments to specify split columns. Also
numPartitions
optional if split columns are specified.
source to share
Since it DataFrame
provides us with the Table and Column on top abstraction RDD
, the most convenient way to manipulate DataFrame
is to use this abstraction in conjunction with the specific table manipulation techniques that DataFrame allows.
In the DataFrame, we could:
- converts table schema with
select()
\udf()
\as()
- cut lines of lines,
filter()
orwhere()
- sunbathe aggregation through
groupBy()
andagg()
- or another analytical task using
sample()
\join()
\union()
- save your result using
saveAsTable()
\saveAsParquet()
\insertIntoJDBC()
See Spark SQL and DataFrame Guide for details .
Therefore, the overall assignment looks like this:
val people = sqlContext.parquetFile("...") val department = sqlContext.parquetFile("...") people.filter("age > 30") .join(department, people("deptId") === department("id")) .groupBy(department("name"), "gender") .agg(avg(people("salary")), max(people("age")))
And for your specific requirements, it might look like this:
val t = sqlContext.parquetFile() t.filter().select().groupBy().agg()
source to share