Spark DataTables: Where is the By section?

We have the general flow of processing Sparks something like this:

Loading:

rdd = sqlContext.parquetFile("mydata/")
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())

      

Processing id

( so we do partitionBy

above
!)

rdd.reduceByKey(....)
rdd.join(...)

      

However, Spark 1.3 changed sqlContext.parquetFile

to return DataFrame

instead RDD

, and no longer has methods partitionBy

, getNumPartitions

and reduceByKey

.

What are we doing with now partitionBy

?

We can replace the boot code with something like

rdd = sqlContext.parquetFile("mydata/").rdd
rdd = rdd.map(lambda row: (row.id,(some stuff)))
rdd = rdd.filter(....)
rdd = rdd.partitionBy(rdd.getNumPatitions())
df = rdd.map(lambda ...: Row(...)).toDF(???)

      

and use groupBy

instead reduceByKey

.

Is it correct?

PS. Yes, I understand that is partitionBy

not required for groupBy

etc. However, without preceding partitionBy

each join

, groupBy

& c can perform cross-node operations. I am looking for a way to ensure that all operations requiring grouping by my key will be done locally .

+3


source to share


2 answers


It looks like as of version 1.6 repartition(self, numPartitions, *cols)

does what I need:

.. versionchanged:: 1.6

      



Added optional arguments to specify split columns. Also numPartitions

optional if split columns are specified.

+1


source


Since it DataFrame

provides us with the Table and Column on top abstraction RDD

, the most convenient way to manipulate DataFrame

is to use this abstraction in conjunction with the specific table manipulation techniques that DataFrame allows.

In the DataFrame, we could:

  • converts table schema with select()

    \ udf()

    \as()

  • cut lines of lines, filter()

    orwhere()

  • sunbathe aggregation through groupBy()

    andagg()

  • or another analytical task using sample()

    \ join()

    \union()

  • save your result using saveAsTable()

    \ saveAsParquet()

    \insertIntoJDBC()

See Spark SQL and DataFrame Guide for details .



Therefore, the overall assignment looks like this:

val people = sqlContext.parquetFile("...")
val department = sqlContext.parquetFile("...")

people.filter("age > 30")
  .join(department, people("deptId") === department("id"))
  .groupBy(department("name"), "gender")
  .agg(avg(people("salary")), max(people("age")))

      

And for your specific requirements, it might look like this:

val t = sqlContext.parquetFile()
t.filter().select().groupBy().agg()

      

0


source







All Articles