Using Python reduce () to combine multiple PySpark datafiles

Does anyone know why using Python3 functools.reduce()

will result in worse performance when combining multiple PySpark data units than just iteratively combining the same DataFrames using a loop for

? In particular, this results in massive slowdowns followed by a memory-related error:

def join_dataframes(list_of_join_columns, left_df, right_df):
    return left_df.join(right_df, on=list_of_join_columns)

joined_df = functools.reduce(
    functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)

      

whereas this is not:

joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
    joined_df = joined_df.join(right_df, on=list_of_join_columns)

      

Any ideas would be much appreciated. Thank!

+3


source to share


2 answers


One reason is that reduction or addition is usually functionally pure: the result of each accumulation operation is not written to the same part of memory, but rather to a new block of memory.



Basically, the garbage collector can free the previous block after each accumulation, but if it doesn't, you will allocate memory for each updated version of the accumulator.

-1


source


As long as you use CPython (different implementations may, but really shouldn't, exhibit significantly different behavior in this particular case). If you look at the implementation you will see that this is simple for an outline with minimal exception handling. reduce

The kernel exactly matches the loop you are using

for element in it:
    value = function(value, element)

      

and there is no evidence to support claims of any particular behavior.

Also, simple tests with a number of frames have practical limitations on Spark joins (joins are some of the most expensive operations in Spark ).

dfs = [
    spark.range(10000).selectExpr(
        "rand({}) AS id".format(i), "id AS value",  "{} AS loop ".format(i)
    )
    for i in range(200)
]

      

Show no significant time difference between forward loop

def f(dfs):
    df1 = dfs[0]
    for df2 in dfs[1:]:
        df1 = df1.join(df2, ["id"])
    return df1

%timeit -n3 f(dfs)                 
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

      

and reduce

call



from functools import reduce

def g(dfs):
    return reduce(lambda x, y: x.join(y, ["id"]), dfs) 

%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)

      

Likewise, the general JVM behaviors are comparable for a for loop

For CPU cycle and memory usage - VisualVM

and reduce

reduce CPU and memory usage - VisualVM

Finally, both generate identical execution plans

g(dfs)._jdf.queryExecution().optimizedPlan().equals( 
    f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True

      

which indicates no differences in the assessment of plans and the likelihood of OOM occurrence.

In other words, your correlation does not imply causation, and the performance issues you observe are unlikely to be related to the method you use to combine DataFrames

.

0


source







All Articles