How can you quickly save a data / RDD frame from PySpark to disk as a CSV / Parquet file?

I have a Google Dataproc Cluster running and I send it a PySpark Job that reads in a file from Google Cloud Storage (945MB CSV file with 4 million rows -> only takes 48 seconds to read) into PySpark Dataframe and applies a function to that frame ( parsed_dataframe = raw_dataframe.rdd.map(parse_user_agents).toDF()

→ takes about 4 or 5 seconds).

Then I need to save these modified results back to Google Cloud Storage as a CSV or Parquet GZIP'd file. I can also save these modified results locally and then copy them to the GCS bucket.

I will redo the dataframe via parsed_dataframe = parsed_dataframe.repartition(15)

and then try to save this new framework with

parsed_dataframe.write.parquet("gs://somefolder/proto.parquet")

parsed_dataframe.write.format("com.databricks.spark.csv").save("gs://somefolder/", header="true")

parsed_dataframe.write.format("com.databricks.spark.csv").options(codec="org.apache.hadoop.io.compress.GzipCodec").save("gs://nyt_regi_usage/2017/max_0722/regi_usage/", header="true")

Each of these methods (and their various variations with lower / higher partitions and saving locally compared to GCS) takes over 60 minutes for 4 million rows (945 MB), which is quite a long time.

How can I optimize this / make the data economy faster?

It should be noted that both Dataproc Cluster and GCS are in the same region / zone and that the cluster has a n1-highmem-8

(8CPU, 52GB memory) Master node with 15 worker nodes (only variables that I am still testing)

+3


source to share


3 answers


Some red flags are here.

1) assuming DF, then converting to RDD for processing and back to DF is very inefficient in itself. You lose catalyst and tungsten optimizations going back to RDD. Try changing the function to work inside DF.

2) reallocation leads to shuffling, but more importantly means that the computation will now be limited to those executors who manage 15 partitions. If your performers are large (7 cores, 40x RAM) this is probably not a problem.



What happens if you write the output before redistributing?

Please provide more code and ideally curve the UI output to show how long each step of the work takes.

+2


source


Try this, it will take a few minutes:

your_dataframe.write.csv("export_location", mode="overwrite", header=True, sep="|")

      



Make sure you add mode="overwrite"

if you want to overwrite the old version.

0


source


Are you calling action on parsed_dataframe

?

As you wrote above, Spark will not evaluate your function until you call write

. If you don't call an action, see how long it takes parsed_dataframe.cache().count()

. I suspect it will take an hour and then start up parsed_dataframe.write(...)

much faster.

0


source







All Articles