Spark connect exponentially slowly

I am trying to make a connection on two Spark RDDs. I have a transaction log that is associated with categories. I formatted the RDD transaction to have the category id as the key.

transactions_cat.take(3)
[(u'707', [u'86246', u'205', u'7', u'707', u'1078778070', u'12564', u'2012-03-02 00:00:00', u'12', u'OZ', u'1', u'7.59']), 
(u'6319', [u'86246', u'205', u'63', u'6319', u'107654575', u'17876', u'2012-03-02 00:00:00', u'64', u'OZ', u'1', u'1.59']), 
(u'9753', [u'86246', u'205', u'97', u'9753', u'1022027929', u'0', u'2012-03-02 00:00:00', u'1', u'CT', u'1', u'5.99'])]

categories.take(3)
[(u'2202', 0), (u'3203', 0), (u'1726', 0)]

      

The transaction log is about 20 GB (350 million rows). The list of categories is less than 1 KB.

When I ran

transactions_cat.join(categories).count()

      

The spark starts very slowly. I have a stage that has 643 tasks. The first 10 tasks took about 1 minute. Then each task is slower and slower (approximately about 15 minutes around task 60). I'm not sure what happened.

Please see screenshots of the abstract to get a better idea. enter image description hereenter image description hereenter image description here

I am running Spark 1.1.0 with 4 workers using a python shell for a total memory of 50GB. Counting RDD transactions is pretty fast (30 minutes)

+3


source to share


1 answer


It might be wrong that Spark doesn't notice that you have a simple case of a connection problem. When one of the two RDD

that you connect with is so small that you better not be RDD

. Then you can do your own hash join implementation , which is actually much simpler than it sounds. Basically, you need:

  • Extract your category list from RDD

    with collect()

    - the resulting post will be easy to pay for itself (or if possible, not get it done RDD

    in the first place)
  • Turn it into a single entry hash table containing all the values ​​for one key (if your keys are not unique)
  • For each pair in your large RDD

    , look at the key in the hash table and create one pair for each value in the list (if not found, then this particular pair gives no results)


I have an implementation in Scala - feel free to ask questions about translation, but it should be pretty easy.

Another great opportunity is to try Spark SQL . I'm sure the long term ambition of the project includes doing this for you automatically, but I don't know if they have achieved it.

+7


source







All Articles