How to use the first and last function in pyspark?

I used the first and last functions to get the first and last values ​​of one column. But I found that both functions do not work as I expected. I have referenced @ zero323's answer but I am still confused with both. code like:

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)
]).toDF(["k", "v"])
w = Window().partitionBy("k").orderBy('k','v')

df.select(F.col("k"), F.last("v",True).over(w).alias('v')).show()

      

result:

+---+----+
|  k|   v|
+---+----+
|  b|   1|
|  b|   3|
|  a|null|
|  a|  -1|
|  a|   1|
+---+----+

      

I assumed it should be like:

+---+----+
|  k|   v|
+---+----+
|  b|   3|
|  b|   3|
|  a|   1|
|  a|   1|
|  a|   1|
+---+----+

      

because I showed df on orderBy operation on 'k' and 'v':

df.orderBy('k','v').show()
    +---+----+
    |  k|   v|
    +---+----+
    |  a|null|
    |  a|  -1|
    |  a|   1|
    |  b|   1|
    |  b|   3|
    +---+----+

      

Also, I figured out another solution to check for problems like this, my code is like:

df.orderBy('k','v').groupBy('k').agg(F.first('v')).show()

      

I found that it is possible that its results will be different after being executed on it each time. Was anyone familiar with the same experience as me? I hope to use both functions in my project, but I found these solutions to be inconclusive.

+5


source to share


2 answers


Try to invert the sort order with .desc()

and then first()

will give the desired result.

w2 = Window().partitionBy("k").orderBy(df.v.desc())
df.select(F.col("k"), F.first("v",True).over(w2).alias('v')).show()
F.first("v",True).over(w2).alias('v').show()

      

Outputs:

+---+---+
|  k|  v|
+---+---+
|  b|  3|
|  b|  3|
|  a|  1|
|  a|  1|
|  a|  1|
+---+---+

      

You also have to be careful about partitionBy and orderBy. Since you are partitioning by 'k', all k values ​​in any given window are the same. Sorting by 'k' does nothing.



The latter function is not actually the opposite of the first, in terms of which the element from the returned window is. It returns the last nonzero value it saw as it moves through the ordered rows.

To compare their effects, here is a data frame with combinations of functions and orders. Note that in the "last_w2" column, the zero value has been replaced with -1.

df = spark.sparkContext.parallelize([
    ("a", None), ("a", 1), ("a", -1), ("b", 3), ("b", 1)]).toDF(["k", "v"])

#create two windows for comparison.
w = Window().partitionBy("k").orderBy('v')
w2 = Window().partitionBy("k").orderBy(df.v.desc())

df.select('k','v',
   F.first("v",True).over(w).alias('first_w1'),
   F.last("v",True).over(w).alias('last_w1'),
   F.first("v",True).over(w2).alias('first_w2'),
   F.last("v",True).over(w2).alias('last_w2')
).show()

      

Output:

+---+----+--------+-------+--------+-------+
|  k|   v|first_w1|last_w1|first_w2|last_w2|
+---+----+--------+-------+--------+-------+
|  b|   1|       1|      1|       3|      1|
|  b|   3|       1|      3|       3|      3|
|  a|null|    null|   null|       1|     -1|
|  a|  -1|      -1|     -1|       1|     -1|
|  a|   1|      -1|      1|       1|      1|
+---+----+--------+-------+--------+-------+

      

+8


source


Look at question no.47130030 .
The problem is not in the last () function, but in the frame, which includes only the lines before the current one.
Through

w = Window().partitionBy("k").orderBy('k','v').rowsBetween(W.unboundedPreceding,W.unboundedFollowing)

      



will give correct results for first () and last ().

0


source







All Articles