MPI4PY Python Error 11 Creating Too Many Threads

I am working with some code in Python and MPI4PY that is throwing a strange error. When I try to run the code below, I throw the following:

ERROR; return code from pthread_create() is 11
    Error detail: Resource temporarily unavailable
sh: fork: retry: Resource temporarily unavailable
/home/sfortney/anaconda/lib/python2.7/site-packages/numexpr/cpuinfo.py:40: UserWarning: [Errno 11] Resource temporarily unavailable 
warnings.warn(str(e), UserWarning, stacklevel=stacklevel)

      

I extended this code from a simpler, working MPI4PY script that I posted and also below. From my research into this error, it seems that I am creating too many threads. It seems strange to me, since I am not calling on any threads, but on several processors. (My basic understanding is that threads are a kernel that would not be affected if I were just calling multiple kernels and doing one on each. Sorry if that's not the case.).

I can't figure out why the code below works fine, but the code immediately below which uses the same structure doesn't work. Why does the code below work with thread restrictions? Where in the code does it even call multiple threads?

I've posted all of the code below for reproducibility of the error. If it's relevant, I am running this on a 32-bit LinuxBox.

#to run this call "mpiexec -n 10 python par_implement_wavefront.py" in terminal

from __future__ import division
import pandas as pd
import numpy as np
import itertools
import os
from itertools import chain, combinations 
from operator import add
from collections import Counter

home="/home/sfortney" 
np.set_printoptions(precision=2, suppress=True)
#choose dimensionality and granularity
dim=2
gran=5

from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
command_buffer = np.zeros(3) # first entry is boolean, second is tuple with objective function inputs, third is array index
result_buffer=np.zeros(3) # first position is node, 



if rank==0:
    #defining all of our functions we will need on the root node first 


    #makes ax1 into the axes of an n dim array
    def axis_fitter(arr, dim, gran, start=1, stop=101):
        ax1=np.linspace(start,stop, num=gran)
        for i in range(dim):
                indexlist=[0]*dim
                indexlist[i]= slice(None)
                arr[indexlist]=ax1
        return arr


   #this is used to make the inital queues
    #fix me to work with nan's!
    def queue_init(arr):
        queue=[]
        queueposs=[]
        queuedone=np.argwhere(arr >0).tolist()
        return queue,queueposs,queuedone  



   #this is used in the queue updating function
    def queue_sorter(queue):
        queue.sort(key=lambda x: np.linalg.norm(np.array(x)))
    #    using the L1 norm
    #    queue.sort(key=lambda x: sum(x))
        return queue





    #this finds all the indicies to the "back" of our box
    def back_index(dim):
        standardbasis=[]
        for i in range(dim):
            vec=[0]*dim
            vec[i]=vec[i]+1
            standardbasis.append(vec)
        powerset=[]
        for z in chain.from_iterable(combinations(standardbasis,r) for r in range(len(standardbasis)+1)):
            powerset.append(z)
        powersetnew=[]
        for i in range(len(powerset)):
            powersetnew.append([sum(x) for x in zip(*list(powerset[i]))])
        powersetnew.remove([])
        powersetnew=[[i*(-1) for i in x] for x in powersetnew]
        return powersetnew




    #this takes a completed index and updates our queue of possible values
    #as well as our done queue 
    def queue_update(queue,queueposs,queuedone, arr,dim,comp_idx=[0,0]):
        queuedone.append(comp_idx)
        if comp_idx==[0,0]:
            init_index=[1]*dim
            queue.append(init_index)
            for i in range(dim):
                poss_index=[1]*dim
                poss_index[i]=2
                queueposs.append(poss_index)
            return queue,queueposs,queuedone

        else:
            queuedone.append(comp_idx)
            try:
                queueposs.remove(comp_idx)
            except:
                pass

            for i in range(dim):
                new_idx=comp_idx[:]
                new_idx[i]=new_idx[i]+1

                back_list=back_index(dim)
                back_list2=[]
                for x in back_list:
                    back_list2.append(list(np.add(np.asarray(new_idx),np.asarray(x))))

                if set(tuple(x) for x in back_list2).issubset(set(tuple(x) for x in queuedone)):
                    queueposs.append(new_idx)



            queueposs=list(set(tuple(x) for x in queueposs)-set(tuple(x) for x in queuedone))
            queueposs=[list(x) for x in queueposs]
            queueposs=queue_sorter(queueposs)

            try:
                for x in range(len(queueposs)):
                    queueappender=(queueposs).pop(x)
                    queue.append(queueappender)
            except:
                print "queueposs empty"

            queue=queue_sorter(queue)

            return queue,queueposs,queuedone



    #this function makes it so we dont have to pass the whole array through MPI but only the pertinent  information 
    def objectivefuncprimer(arr, queue_elem, dim):
        inputs=back_index(dim) 
        inputs2=[]
        for x in inputs:
            inputs2.append(list(np.add(np.asarray(queue_elem),np.asarray(x))))
        inputs3=[]
        for x in range(len(inputs2)):
            inputs3.append(arr[tuple(inputs2[x])])

        return inputs3

    #this function takes a value and an index and assigns the array that value at the index
    def arrupdater(val,idx):
        arr[tuple(idx)]=val
        return arr, idx

    #########Initializing 

    all_finished=False
    #make our empty array
    sizer=tuple([gran]*dim)
    arr=np.zeros(shape=sizer)
    nodes_avail=range(1, size) # 0 is not a worker

    #assumes axes all start at same place
    ax1=np.linspace(20,30, num=gran)
    arr=axis_fitter(arr, dim, gran)

    #fitting axes and initializing queues
    arr=axis_fitter(arr, dim, gran, start=20, stop=30)
    queue,queueposs,queuedone =queue_init(arr)

    #running first updater
    queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim)

    def sender(queue):
        send_num=min(len(queue),len(nodes_avail))
        for k in range(send_num):
            node=nodes_avail.pop()
            queue_elem=queue.pop(k)
            command_buffer[0]=int(all_finished)          
            command_buffer[1]=queue_elem
            command_buffer[2]=objectivefuncprimer(arr,queue_elem,dim)

            comm.Send(command_buffer, dest=node)


    while all_finished==False:
        sender(queue)
        comm.Recv(result_buffer,source=MPI.ANY_SOURCE)

        arr,comp_idx=arrupdater(result_buffer[1],result_buffer[2])
        queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim,comp_idx)

        nodes_avail.append(result_buffer[0])

        if len(queuedone)==gran**2:
            for n in range(1, size):
                comm.Send(np.array([True,0,0]), dest=n)
            all_finished=True
            print arr 


if rank>0:
    all_finished_worker=False

    #this test function will only work in 2d   
    def objectivefunc2d_2(inputs):
        #this will be important for more complicated functions later
        #backnum=(2**dim)-1
        val=sum(inputs)        
        return val  

    while all_finished_worker==False:
        comm.Recv(command_buffer, source=0)
        all_finished_worker=bool(command_buffer[0])
        if all_finished_worker==False:
            result=objectivefunc2d_2(command_buffer[2])
#            print str(result) +" from "+str(rank)
            result_buffer=np.array([rank,result,command_buffer[1]])
            comm.Send(result_buffer, dest=0)

      

This code works and has the basic structure I used above, but with a much simpler example.

from __future__ import division
import numpy as np
import os
from itertools import chain, combinations 
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE

comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
command_buffer = np.zeros(2) # first entry is boolean, rest is data
result_buffer=np.zeros(2) # first position is node, rest is data


if rank==0:
    all_finished=False
    nodes_avail=range(1, size) # 0 is not a worker
    arr=[]
    q=range(20)

    def primer(q):
        return int(all_finished),q

    def sender(q):
        send_num=min(len(q),len(nodes_avail))
        for k in range(send_num):
            node=nodes_avail.pop()
            queue_init=q.pop()
            command_buffer[0]=primer(queue_init)[0]          
            command_buffer[1]=primer(queue_init)[1]

            comm.Send(command_buffer, dest=node)

    while all_finished==False: 
        sender(q)

        # update q
        comm.Recv(result_buffer,source=MPI.ANY_SOURCE)
        arr.append(result_buffer[1])
        nodes_avail.append(result_buffer[0])
        if len(arr)==20:
            for n in range(1, size):
                comm.Send(np.array([True,0]), dest=n)
            all_finished=True
            print arr 


if rank>0:
    all_finished_worker=False
    while all_finished_worker==False:
        comm.Recv(command_buffer, source=0)
        all_finished_worker=bool(command_buffer[0])
        if all_finished_worker==False:
            result=command_buffer[1]*2
#            print str(result) +" from "+str(rank)
            result_buffer=np.array([rank,result])
            comm.Send(result_buffer, dest=0)

      

+3


source to share





All Articles