How to aggregate one column at a time and take maximum of others in pyspark?

I have columns X

(row), Y

(row) and Z

(float).

I want too

  • collection on X
  • take the maximum of column Z
  • report ALL values ​​for columns X, Y and Z

If there are multiple values ​​for column Y that correspond to the maximum for column Z, then take the maximum of those values ​​in column Y.

For example, my table is like table1

::

col X col Y col Z
A     1     5
A     2     10
A     3     10
B     5     15

      

as a result:

A     3     10
B     5     15

      

If I was using SQL, I would do it like this:

select X, Y, Z 
from table1 
join (select max(Z) as max_Z from table1 group by X) table2
on table1.Z = table2.max_Z

      

However, how to do this when 1) column Z is a float? and 2) Am I using pyspark sql?

+3


source to share


3 answers


The next two solutions are in Scala, but honestly can't resist posting them to promote my favorite window aggregation functions. Unfortunately.

The only question is which structured query is more efficient / efficient?

Window aggregation function: rank

val df = Seq(
  ("A",1,5),
  ("A",2,10),
  ("A",3,10),
  ("B",5,15)
).toDF("x", "y", "z")

scala> df.show
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  A|  1|  5|
|  A|  2| 10|
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

// describe window specification
import org.apache.spark.sql.expressions.Window
val byX = Window.partitionBy("x").orderBy($"z".desc).orderBy($"y".desc)

// use rank to calculate the best X
scala> df.withColumn("rank", rank over byX)
  .select("x", "y", "z")
  .where($"rank" === 1) // <-- take the first row
  .orderBy("x")
  .show
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

      



Window aggregation function: first and dropDuplicates

I've always thought of alternatives to a function rank

and first

usually came up in my mind.

// use first and dropDuplicates
scala> df.
  withColumn("y", first("y") over byX).
  withColumn("z", first("z") over byX).
  dropDuplicates.
  orderBy("x").
  show
+---+---+---+
|  x|  y|  z|
+---+---+---+
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

      

+2


source


You can use the function Window

. My approach here is to create a Window function that first splits the dataframe into X

. Then arrange the columns Y

and Z

on its value.

  • We can just choose rank == 1

    for the line we are interested in.
  • Or we can use first

    it drop_duplicates

    to achieve the same task.

PS. Thanks to Jacek Laskowski for the comments and the Scala solution that leads to this solution.

Create a dataset with sample toys

from pyspark.sql.window import Window
import pyspark.sql.functions as func

data=[('A',1,5),
      ('A',2,10),
      ('A',3,10),
      ('B',5,15)]
df = spark.createDataFrame(data,schema=['X','Y','Z'])

      

Window aggregation function: rank

Apply windows function using function rank

w = Window.partitionBy(df['X']).orderBy([func.col('Y').desc(), func.col('Z').desc()])
df_max = df.select('X', 'Y', 'Z', func.rank().over(w).alias("rank"))
df_final = df_max.where(func.col('rank') == 1).select('X', 'Y', 'Z').orderBy('X')
df_final.show()

      



Output

+---+---+---+
|  X|  Y|  Z|
+---+---+---+
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

      

Window aggregation function: first and drop_duplicates

This task can also be achieved using first

and drop_duplicates

as follows

df_final = df.select('X', func.first('Y').over(w).alias('Y'), func.first('Z').over(w).alias('Z'))\
    .drop_duplicates()\
    .orderBy('X')
df_final.show()

      

Output

+---+---+---+
|  X|  Y|  Z|
+---+---+---+
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

      

+1


source


Lets create a dataframe from your sample data as -

data=[('A',1,5),
('A',2,10),
('A',3,10),
('B',5,15)]

df = spark.createDataFrame(data,schema=['X','Y','Z'])
df.show()

      

output:

+---+---+---+
|  X|  Y|  Z|
+---+---+---+
|  A|  1|  5|
|  A|  2| 10|
|  A|  3| 10|
|  B|  5| 15|
+---+---+---+

      

:

# create a intermediate dataframe that find max of Z 
df1 = df.groupby('X').max('Z').toDF('X2','max_Z')

      

:

 # create 2nd intermidiate dataframe that finds max of Y where Z =  max of Z

 df2 = df.join(df1,df.X==df1.X2)\
        .where(col('Z')==col('max_Z'))\
        .groupBy('X')\
        .max('Y').toDF('X','max_Y')

      

:

# join above two to form final result

result = df1.join(df2,df1.X2==df2.X)\
            .select('X','max_Y','max_Z')\
            .orderBy('X')

result.show()

      

:

+---+-----+-----+
|  X|max_Y|max_Z|
+---+-----+-----+
|  A|    3|   10|
|  B|    5|   15|
+---+-----+-----+

      

0


source







All Articles