Multiprocessing passing dicts array through shared memory

The following code works, but it is very slow due to the transfer of large datasets. In actual implementation, the speed it takes to create a process and send data is almost the same as the computation time, so by the time the second process is created, the first process is almost complete with computation, what does parallelism do? meaningless.

The code is the same as in this question Multiprocessing has a truncation at 992 integers which are merged as a result with the proposed change working and implemented below. However, I ran into a common problem like others, I suppose, big data baiting taking a long time.

I see answers using multiprocessing.array to pass a shared memory array. I have an array of 4000 indices, but each index has a dictionary with 200 key / value pairs. The data is simply read by each process, some calculation is done and then a matrix (4000x3) is returned (without any dicts).

Answers like this Is the readonly shared data copied across processes for Python multiprocessing? use the map. Is it possible to keep the system below and implement shared memory? Is there an efficient way to send data to each process using a dicts array, like wrapping the dict in some kind of manager and then putting that inside a multiprocessing.array?

import multiprocessing

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,200):
            data[str(i)] = i

    CalcManager(total,start=0,end=3000)

def CalcManager(myData,start,end):
    print 'in calc manager'
    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        print 'starting processes'
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(data,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print result

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    print 'started process'
    results = []
    temp = []
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    result_q.put(results)
    return

if __name__== '__main__':   
    main()

      


solved

by simply putting the list of dictionaries in the manager the problem was solved.

manager=Manager()
d=manager.list(myData)

      

It seems that the manager holding the list is also managing the dict contained in that list. Startup times are a bit slow, so it seems like the data is still being copied, but it is executed once at the beginning and then the data is sliced ​​internally.

import multiprocessing
import multiprocessing.sharedctypes as mt
from multiprocessing import Process, Lock, Manager
from ctypes import Structure, c_double

def main():
    data = {}
    total = []
    for j in range(0,3000):
        total.append(data)
        for i in range(0,100):
            data[str(i)] = i

    CalcManager(total,start=0,end=500)

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    #Multi processing
    #Set the number of processes to use.  
    nprocs = 3
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    result_q = multiprocessing.Queue()
    #Setup an empty array to store our processes
    procs = []
    #Divide up the data for the set number of processes 
    interval = (end-start)/nprocs 
    new_start = start
    #Create all the processes while dividing the work appropriately
    for i in range(nprocs):
        new_end = new_start + interval
        #Make sure we dont go past the size of the data 
        if new_end > end:
            new_end = end 
        #Generate a new process and pass it the arguments 
        data = myData[new_start:new_end]
        #Create the processes and pass the data and the result queue
        p = multiprocessing.Process(target=multiProcess,args=(d,new_start,new_end,result_q,i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'    

    #Print out the results
    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

#MultiProcess Handling
def multiProcess(data,start,end,result_q,proc_num):
    #print 'started process'
    results = []
    temp = []
    data = data[start:end]
    for i in range(0,22):
        results.append(temp)
        for j in range(0,3):
            temp.append(j)
    print len(data)        
    result_q.put(results)
    return

if __name__ == '__main__':
    main()

      

+3


source to share


2 answers


You can see some improvement by using multiprocessing.Manager

to keep your list on the server-manager and to have each child access the elements from the dict file by pulling them out of that one shared list instead of copying slices for each child process

def CalcManager(myData,start,end):
    print 'in calc manager'
    print type(myData[0])

    manager = Manager()
    d = manager.list(myData)

    nprocs = 3 
    result_q = multiprocessing.Queue()
    procs = []

    interval = (end-start)/nprocs 
    new_start = start

    for i in range(nprocs):
        new_end = new_start + interval
        if new_end > end:
            new_end = end 
        p = multiprocessing.Process(target=multiProcess,
                                    args=(d, new_start, new_end, result_q, i))
        procs.append(p)
        p.start()
        #Increment our next start to the current end 
        new_start = new_end+1
    print 'finished starting'        

    for i in range(nprocs):
        result = result_q.get()
        print len(result)

    #Joint the process to wait for all data/process to be finished
    for p in procs:
        p.join()

      



This copies your entire list data

into the process Manager

prior to creating any of your employees. Manager

returns an object Proxy

that lets you share the list

. Then you just hand off to the Proxy

workers, which means their startup time will be significantly reduced as there is no longer any need to copy the list fragments data

. The disadvantage here is that access to the list will be slower for children, as access has to go to the manager's process via IPC. Whether or not this actually affects performance is highly dependent on what kind of work you are doing on list

your workflows, but it is worth trying as it requires very few code changes.

+2


source


Looking at your question, I am assuming the following:

  • For each element in myData

    you want to return the result (some kind of matrix)
  • You've created a JoinableQueue ( tasks

    ), probably to store input, but don't know how to use it.

Code



import logging
import multiprocessing


def create_logger(logger_name):
    ''' Create a logger that log to the console '''
    logger = logging.getLogger(logger_name)
    logger.setLevel(logging.DEBUG)

    # create console handler and set appropriate level
    ch = logging.StreamHandler()
    formatter = logging.Formatter("%(processName)s %(funcName)s() %(levelname)s: %(message)s")
    ch.setFormatter(formatter)
    logger.addHandler(ch)
    return logger

def main():
    global logger
    logger = create_logger(__name__)
    logger.info('Main started')
    data = []
    for i in range(0,100):
        data.append({str(i):i})

    CalcManager(data,start=0,end=50)
    logger.info('Main ended')

def CalcManager(myData,start,end):
    logger.info('CalcManager started')
    #Initialize the multiprocessing queue so we can get the values returned to us
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Add tasks
    for i in range(start, end):
        tasks.put(myData[i])

    # Create processes to do work
    nprocs = 3
    for i in range(nprocs):
        logger.info('starting processes')
        p = multiprocessing.Process(target=worker,args=(tasks,results))
        p.daemon = True
        p.start()

    # Wait for tasks completion, i.e. tasks queue is empty
    try:
        tasks.join()
    except KeyboardInterrupt:
        logger.info('Cancel tasks')

    # Print out the results
    print 'RESULTS'
    while not results.empty():
        result = results.get()
        print result

    logger.info('CalManager ended')

def worker(tasks, results):
    while True:
        try:
            task = tasks.get()  # one row of input
            task['done'] = True # simular work being done
            results.put(task)   # Save the result to the output queue
        finally:
            # JoinableQueue: for every get(), we need a task_done()
            tasks.task_done()


if __name__== '__main__':   
    main()

      

Discussion

  • For a multiprocessing situation, I recommend using a module logging

    as it offers several advantages:
    • It is thread and process safe; that you will not have a situation where the result of one process is mixed.
    • You can set up logging to show process name, function name - very handy for debugging
  • CalcManager

    is essentially a task manager that does the following
    • Creates three processes
    • Fill the input queue, tasks

    • Waiting for task completion
    • Prints the result
  • Note that when creating processes, I mark them as daemon , which means that they will be killed when the main program exits. You don't have to worry about killing them.
  • worker

    where the work is done
    • Each one runs forever (cycle while True

      )
    • Each time through the loop they will receive one unit of input, do some processing, and then put the result in the output file
    • After the task completes, it calls task_done()

      so that the main process knows when all the tasks will be done. I put task_done

      in a suggestion finally

      to make sure it will work even if there was an error during processing.
+2


source







All Articles