Python MongoDB (PyMongo) Mutliprocessing cursor
I'm trying to make a MongoDB multiprocessing utility, it works fine, but I think I have a performance problem ... Even with 20 workers, it doesn't process more than 2800 documents per second ... I think I can get it 5x faster .. This is my code, it doesn't do anything fancy, it just prints the remaining time to the end of the cursor.
There is probably a better way to do multiprocessing on a MongoDB cursor, because I need to run some things on each document with a 17.4M record collection, so performance and less time is a must.
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, 'β' * (pc / 5), pc))
sys.stdout.flush()
def dowork(queue):
for p, i, pcount in iter(queue.get, 'STOP'):
remaining_time(pcount, i)
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = products.count()
i = 1
print "Procesando %s productos..." % pcount
for p in products:
try:
queue.put((p, i, pcount))
i += 1
except Exception, e:
utils.log(e)
continue
queue.put('STOP')
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
for p in procs:
p.start()
populate_jobs(queue)
for p in procs:
p.join()
Also, I noticed that about every 2500 aprox documents, the script pauses for about 0.5s 1s, which is obviously a bad problem. This is a MongoDB issue because if I do the exact same loop but the range(0, 1000000)
script does not pause at all and runs at 57,000 iterations per second, for a total of 20 seconds to complete the script. A huge difference from 2800 MongoDB documents per second ...
This is the code to run a 1,000,000 iteration loop instead of docs.
def populate_jobs(queue):
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
if products:
pcount = 1000000
i = 1
print "Procesando %s productos..." % pcount
for p in range(0, 1000000):
queue.put((p, i, pcount))
i += 1
queue.put('STOP')
UPDATE
As I saw the problem is not with the multiprocessing itself, but with the cursor filling Queue
which does not work in multiprocessing, this is one simple process that populates the Queue
( populateJobs
) method , perhaps if I could make the cursor multithread / multirpocess and fill Queue
in parallel , it will fill up faster then the multiprocessing method dowork
will run faster because I think there is a bottleneck where I only fill about 2800 units per second in Queue
and get much more in dowork
multiprocessing, but I don't know how I can parallelize the cursor MongoDB
...
Perhaps the problem is the latency between my computer and the MongoDB server. This latency, between what I ask about the next cursor and MongoDB telling me what it is, reduces my performance by 2000% (from 61,000 rows to 2,800 docs / s) NOPE I tried MongoDB on localhost and the performance is exactly the same same ... it makes me shit
source to share
Here's what you can use Pool
to feed babies:
START = time.time()
def remaining_time(a, b):
if START:
y = (time.time() - START)
z = ((a * y) / b) - y
d = time.strftime('%H:%M:%S', time.gmtime(z))
e = round(b / y)
progress("{0}/{1} | Tiempo restante {2} ({3}p/s)".format(b, a, d, e), b, a)
def progress(p, c, t):
pc = (c * 100) / t
sys.stdout.write("%s [%-20s] %d%%\r" % (p, 'β' * (pc / 5), pc))
sys.stdout.flush()
def dowork(args):
p, i, pcount = args
remaining_time(pcount, i)
def main():
queue = multiprocessing.Queue()
procs = [multiprocessing.Process(target=dowork, args=(queue,)) for _ in range(CONFIG_POOL_SIZE)]
pool = multiprocessing.Pool(CONFIG_POOL_SIZE)
mongo_query = {}
products = MONGO.mydb.items.find(mongo_query, no_cursor_timeout=True)
pcount = products.count()
pool.map(dowork, ((p, idx, pcount) for idx,p in enumerate(products)))
pool.close()
pool.join()
Note that use pool.map
requires loading everything from the cursor into memory at once, although this can be a problem due to how large it is. You can use imap
to avoid consuming it all at once, but you need to specify chunksize
to minimize the IPC overhead:
# Calculate chunksize using same algorithm used internally by pool.map
chunksize, extra = divmod(pcount, CONFIG_POOL_SIZE * 4)
if extra:
chunksize += 1
pool.imap(dowork, ((p, idx, pcount) for idx,p in enumerate(products)), chunksize=chunksize)
pool.close()
pool.join()
For 1,000,000 items, that gives a chunksize of 12,500. You can try sizes larger and smaller than this and see how it affects performance.
I'm not sure if this will help much if the bottleneck is just pulling data out of MongoDB.
source to share
Why are you using multiprocessing? You don't seem to be doing the actual work on other threads using a queue. Python has a global interpreter blocker that makes multi-threaded code less efficient than you'd expect. This program is probably slower, not faster.
Some performance tips:
-
Try to set
batch_size
in the call to find () to some large number (e.g. 20000). This is the maximum number of documents returned at a time before the customer receives more, and the default is 101. -
Try setting
cursor_type
topymongo.cursor.CursorType.EXHAUST
, which can reduce the latency you see.
source to share