Spark connect exponentially slowly
I am trying to make a connection on two Spark RDDs. I have a transaction log that is associated with categories. I formatted the RDD transaction to have the category id as the key.
transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']),
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']),
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]
categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]
The transaction log is about 20 GB (350 million rows). The list of categories is less than 1 KB.
When I ran
transactions_cat.join(categories).count()
The spark starts very slowly. I have a stage that has 643 tasks. The first 10 tasks took about 1 minute. Then each task is slower and slower (approximately about 15 minutes around task 60). I'm not sure what happened.
Please see screenshots of the abstract to get a better idea.
I am running Spark 1.1.0 with 4 workers using a python shell for a total memory of 50GB. Counting RDD transactions is pretty fast (30 minutes)
source to share
It might be wrong that Spark doesn't notice that you have a simple case of a connection problem. When one of the two RDD
that you connect with is so small that you better not be RDD
. Then you can do your own hash join implementation , which is actually much simpler than it sounds. Basically, you need:
- Extract your category list from
RDD
withcollect()
- the resulting post will be easy to pay for itself (or if possible, not get it doneRDD
in the first place) - Turn it into a single entry hash table containing all the values ββfor one key (if your keys are not unique)
- For each pair in your large
RDD
, look at the key in the hash table and create one pair for each value in the list (if not found, then this particular pair gives no results)
I have an implementation in Scala - feel free to ask questions about translation, but it should be pretty easy.
Another great opportunity is to try Spark SQL . I'm sure the long term ambition of the project includes doing this for you automatically, but I don't know if they have achieved it.
source to share