Using sparkDF.write.saveAsTable () inside a loop results in an exponential increase in latency between jobs

I need to execute a set of different hive queries inside a for loop.

hc=HiveContext(sc)
queryList=[set of queries]
for i in range(0,X):
    hc.sql(queryList[i])
    sparkDF.write.saveAsTable('hiveTable', mode='append')

      

While this code works like a charm for smaller X values, it causes problems for X> 100. The delay between each saveAsTable job grows exponentially, but each job more or less takes about 5 seconds.

What I tried to fix it with no luck:

  • Add gc.collect () inside the for loop once (i% 100 == 0). But this breaks the FOR loop
  • Close the current Spark and Hive context once (i% 100 == 0) and create new ones - this still doesn't solve the problem.
  • Use cluster yarns instead of client yarns - no luck!

Is there something like this, I create a connection to the hive and close it every time I call the saveAsTable function? Or to clean up the driver?

+3


source to share


1 answer


This is because you are using a for loop that runs in spark driver mode to not be distributed to a clustered worker node, means it does not use the force of parallelism or does not execute on worker nodes. try to create an RDD using parallelization with a partition that will help spawn jobs on the desktop node

or if you just want to handle the hiveContext you can create a global HiveContext like val hiveCtx = new HiveContext (sc) and reuse inside the loop.



You can also change / optimize the number of workers during the execution of a job in the cluster to improve performance

0


source







All Articles