Using Python reduce () to combine multiple PySpark datafiles
Does anyone know why using Python3 functools.reduce()
will result in worse performance when combining multiple PySpark data units than just iteratively combining the same DataFrames using a loop for
? In particular, this results in massive slowdowns followed by a memory-related error:
def join_dataframes(list_of_join_columns, left_df, right_df):
return left_df.join(right_df, on=list_of_join_columns)
joined_df = functools.reduce(
functools.partial(join_dataframes, list_of_join_columns), list_of_dataframes,
)
whereas this is not:
joined_df = list_of_dataframes[0]
joined_df.cache()
for right_df in list_of_dataframes[1:]:
joined_df = joined_df.join(right_df, on=list_of_join_columns)
Any ideas would be much appreciated. Thank!
source to share
One reason is that reduction or addition is usually functionally pure: the result of each accumulation operation is not written to the same part of memory, but rather to a new block of memory.
Basically, the garbage collector can free the previous block after each accumulation, but if it doesn't, you will allocate memory for each updated version of the accumulator.
source to share
As long as you use CPython (different implementations may, but really shouldn't, exhibit significantly different behavior in this particular case). If you look at the implementation you will see that this is simple for an outline with minimal exception handling. reduce
The kernel exactly matches the loop you are using
for element in it:
value = function(value, element)
and there is no evidence to support claims of any particular behavior.
Also, simple tests with a number of frames have practical limitations on Spark joins (joins are some of the most expensive operations in Spark ).
dfs = [
spark.range(10000).selectExpr(
"rand({}) AS id".format(i), "id AS value", "{} AS loop ".format(i)
)
for i in range(200)
]
Show no significant time difference between forward loop
def f(dfs):
df1 = dfs[0]
for df2 in dfs[1:]:
df1 = df1.join(df2, ["id"])
return df1
%timeit -n3 f(dfs)
## 6.25 s ± 257 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
and reduce
call
from functools import reduce
def g(dfs):
return reduce(lambda x, y: x.join(y, ["id"]), dfs)
%timeit -n3 g(dfs)
### 6.47 s ± 455 ms per loop (mean ± std. dev. of 7 runs, 3 loops each)
Likewise, the general JVM behaviors are comparable for a for loop
For CPU cycle and memory usage - VisualVM
and reduce
reduce CPU and memory usage - VisualVM
Finally, both generate identical execution plans
g(dfs)._jdf.queryExecution().optimizedPlan().equals(
f(dfs)._jdf.queryExecution().optimizedPlan()
)
## True
which indicates no differences in the assessment of plans and the likelihood of OOM occurrence.
In other words, your correlation does not imply causation, and the performance issues you observe are unlikely to be related to the method you use to combine DataFrames
.
source to share