Multiprocessing passing dicts array through shared memory
The following code works, but it is very slow due to the transfer of large datasets. In actual implementation, the speed it takes to create a process and send data is almost the same as the computation time, so by the time the second process is created, the first process is almost complete with computation, what does parallelism do? meaningless.
The code is the same as in this question Multiprocessing has a truncation at 992 integers which are merged as a result with the proposed change working and implemented below. However, I ran into a common problem like others, I suppose, big data baiting taking a long time.
I see answers using multiprocessing.array to pass a shared memory array. I have an array of 4000 indices, but each index has a dictionary with 200 key / value pairs. The data is simply read by each process, some calculation is done and then a matrix (4000x3) is returned (without any dicts).
Answers like this Is the readonly shared data copied across processes for Python multiprocessing? use the map. Is it possible to keep the system below and implement shared memory? Is there an efficient way to send data to each process using a dicts array, like wrapping the dict in some kind of manager and then putting that inside a multiprocessing.array?
import multiprocessing
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,200):
data[str(i)] = i
CalcManager(total,start=0,end=3000)
def CalcManager(myData,start,end):
print 'in calc manager'
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
print 'starting processes'
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print result
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
print 'started process'
results = []
temp = []
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
result_q.put(results)
return
if __name__== '__main__':
main()
solved
by simply putting the list of dictionaries in the manager the problem was solved.
manager=Manager()
d=manager.list(myData)
It seems that the manager holding the list is also managing the dict contained in that list. Startup times are a bit slow, so it seems like the data is still being copied, but it is executed once at the beginning and then the data is sliced internally.
import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double
def main():
data = {}
total = []
for j in range(0,3000):
total.append(data)
for i in range(0,100):
data[str(i)] = i
CalcManager(total,start=0,end=500)
def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])
manager = Manager()
d = manager.list(myData)
#Multi processing
#Set the number of processes to use.
nprocs = 3
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
result_q = multiprocessing.Queue()
#Setup an empty array to store our processes
procs = []
#Divide up the data for the set number of processes
interval = (end-start)/nprocs
new_start = start
#Create all the processes while dividing the work appropriately
for i in range(nprocs):
new_end = new_start + interval
#Make sure we dont go past the size of the data
if new_end > end:
new_end = end
#Generate a new process and pass it the arguments
data = myData[new_start:new_end]
#Create the processes and pass the data and the result queue
p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
#Print out the results
for i in range(nprocs):
result = result_q.get()
print len(result)
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
#print 'started process'
results = []
temp = []
data = data[start:end]
for i in range(0,22):
results.append(temp)
for j in range(0,3):
temp.append(j)
print len(data)
result_q.put(results)
return
if __name__ == '__main__':
main()
source to share
You can see some improvement by using multiprocessing.Manager
to keep your list on the server-manager and to have each child access the elements from the dict file by pulling them out of that one shared list instead of copying slices for each child process
def CalcManager(myData,start,end):
print 'in calc manager'
print type(myData[0])
manager = Manager()
d = manager.list(myData)
nprocs = 3
result_q = multiprocessing.Queue()
procs = []
interval = (end-start)/nprocs
new_start = start
for i in range(nprocs):
new_end = new_start + interval
if new_end > end:
new_end = end
p = multiprocessing.Process(target=multiProcess,
args=(d, new_start, new_end, result_q, i))
procs.append(p)
p.start()
#Increment our next start to the current end
new_start = new_end+1
print 'finished starting'
for i in range(nprocs):
result = result_q.get()
print len(result)
#Joint the process to wait for all data/process to be finished
for p in procs:
p.join()
This copies your entire list data
into the process Manager
prior to creating any of your employees. Manager
returns an object Proxy
that lets you share the list
. Then you just hand off to the Proxy
workers, which means their startup time will be significantly reduced as there is no longer any need to copy the list fragments data
. The disadvantage here is that access to the list will be slower for children, as access has to go to the manager's process via IPC. Whether or not this actually affects performance is highly dependent on what kind of work you are doing on list
your workflows, but it is worth trying as it requires very few code changes.
source to share
Looking at your question, I am assuming the following:
- For each element in
myData
you want to return the result (some kind of matrix) - You've created a JoinableQueue (
tasks
), probably to store input, but don't know how to use it.
Code
import logging
import multiprocessing
def create_logger(logger_name):
''' Create a logger that log to the console '''
logger = logging.getLogger(logger_name)
logger.setLevel(logging.DEBUG)
# create console handler and set appropriate level
ch = logging.StreamHandler()
formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s")
ch.setFormatter(formatter)
logger.addHandler(ch)
return logger
def main():
global logger
logger = create_logger(__name__)
logger.info('Main started')
data = []
for i in range(0,100):
data.append({str(i):i})
CalcManager(data,start=0,end=50)
logger.info('Main ended')
def CalcManager(myData,start,end):
logger.info('CalcManager started')
#Initialize the multiprocessing queue so we can get the values returned to us
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Add tasks
for i in range(start, end):
tasks.put(myData[i])
# Create processes to do work
nprocs = 3
for i in range(nprocs):
logger.info('starting processes')
p = multiprocessing.Process(target=worker,args=(tasks,results))
p.daemon = True
p.start()
# Wait for tasks completion, i.e. tasks queue is empty
try:
tasks.join()
except KeyboardInterrupt:
logger.info('Cancel tasks')
# Print out the results
print 'RESULTS'
while not results.empty():
result = results.get()
print result
logger.info('CalManager ended')
def worker(tasks, results):
while True:
try:
task = tasks.get() # one row of input
task['done'] = True # simular work being done
results.put(task) # Save the result to the output queue
finally:
# JoinableQueue: for every get(), we need a task_done()
tasks.task_done()
if __name__== '__main__':
main()
Discussion
- For a multiprocessing situation, I recommend using a module
logging
as it offers several advantages:- It is thread and process safe; that you will not have a situation where the result of one process is mixed.
- You can set up logging to show process name, function name - very handy for debugging
-
CalcManager
is essentially a task manager that does the following- Creates three processes
- Fill the input queue,
tasks
- Waiting for task completion
- Prints the result
- Note that when creating processes, I mark them as daemon , which means that they will be killed when the main program exits. You don't have to worry about killing them.
-
worker
where the work is done- Each one runs forever (cycle
while True
) - Each time through the loop they will receive one unit of input, do some processing, and then put the result in the output file
- After the task completes, it calls
task_done()
so that the main process knows when all the tasks will be done. I puttask_done
in a suggestionfinally
to make sure it will work even if there was an error during processing.
- Each one runs forever (cycle
source to share