MPI4PY Python Error 11 Creating Too Many Threads
I am working with some code in Python and MPI4PY that is throwing a strange error. When I try to run the code below, I throw the following:
ERROR; return code from pthread_create() is 11
Error detail: Resource temporarily unavailable
sh: fork: retry: Resource temporarily unavailable
/home/sfortney/anaconda/lib/python2.7/site-packages/numexpr/cpuinfo.py:40: UserWarning: [Errno 11] Resource temporarily unavailable
warnings.warn(str(e), UserWarning, stacklevel=stacklevel)
I extended this code from a simpler, working MPI4PY script that I posted and also below. From my research into this error, it seems that I am creating too many threads. It seems strange to me, since I am not calling on any threads, but on several processors. (My basic understanding is that threads are a kernel that would not be affected if I were just calling multiple kernels and doing one on each. Sorry if that's not the case.).
I can't figure out why the code below works fine, but the code immediately below which uses the same structure doesn't work. Why does the code below work with thread restrictions? Where in the code does it even call multiple threads?
I've posted all of the code below for reproducibility of the error. If it's relevant, I am running this on a 32-bit LinuxBox.
#to run this call "mpiexec -n 10 python par_implement_wavefront.py" in terminal
from __future__ import division
import pandas as pd
import numpy as np
import itertools
import os
from itertools import chain, combinations
from operator import add
from collections import Counter
home="/home/sfortney"
np.set_printoptions(precision=2, suppress=True)
#choose dimensionality and granularity
dim=2
gran=5
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
command_buffer = np.zeros(3) # first entry is boolean, second is tuple with objective function inputs, third is array index
result_buffer=np.zeros(3) # first position is node,
if rank==0:
#defining all of our functions we will need on the root node first
#makes ax1 into the axes of an n dim array
def axis_fitter(arr, dim, gran, start=1, stop=101):
ax1=np.linspace(start,stop, num=gran)
for i in range(dim):
indexlist=[0]*dim
indexlist[i]= slice(None)
arr[indexlist]=ax1
return arr
#this is used to make the inital queues
#fix me to work with nan's!
def queue_init(arr):
queue=[]
queueposs=[]
queuedone=np.argwhere(arr >0).tolist()
return queue,queueposs,queuedone
#this is used in the queue updating function
def queue_sorter(queue):
queue.sort(key=lambda x: np.linalg.norm(np.array(x)))
# using the L1 norm
# queue.sort(key=lambda x: sum(x))
return queue
#this finds all the indicies to the "back" of our box
def back_index(dim):
standardbasis=[]
for i in range(dim):
vec=[0]*dim
vec[i]=vec[i]+1
standardbasis.append(vec)
powerset=[]
for z in chain.from_iterable(combinations(standardbasis,r) for r in range(len(standardbasis)+1)):
powerset.append(z)
powersetnew=[]
for i in range(len(powerset)):
powersetnew.append([sum(x) for x in zip(*list(powerset[i]))])
powersetnew.remove([])
powersetnew=[[i*(-1) for i in x] for x in powersetnew]
return powersetnew
#this takes a completed index and updates our queue of possible values
#as well as our done queue
def queue_update(queue,queueposs,queuedone, arr,dim,comp_idx=[0,0]):
queuedone.append(comp_idx)
if comp_idx==[0,0]:
init_index=[1]*dim
queue.append(init_index)
for i in range(dim):
poss_index=[1]*dim
poss_index[i]=2
queueposs.append(poss_index)
return queue,queueposs,queuedone
else:
queuedone.append(comp_idx)
try:
queueposs.remove(comp_idx)
except:
pass
for i in range(dim):
new_idx=comp_idx[:]
new_idx[i]=new_idx[i]+1
back_list=back_index(dim)
back_list2=[]
for x in back_list:
back_list2.append(list(np.add(np.asarray(new_idx),np.asarray(x))))
if set(tuple(x) for x in back_list2).issubset(set(tuple(x) for x in queuedone)):
queueposs.append(new_idx)
queueposs=list(set(tuple(x) for x in queueposs)-set(tuple(x) for x in queuedone))
queueposs=[list(x) for x in queueposs]
queueposs=queue_sorter(queueposs)
try:
for x in range(len(queueposs)):
queueappender=(queueposs).pop(x)
queue.append(queueappender)
except:
print "queueposs empty"
queue=queue_sorter(queue)
return queue,queueposs,queuedone
#this function makes it so we dont have to pass the whole array through MPI but only the pertinent information
def objectivefuncprimer(arr, queue_elem, dim):
inputs=back_index(dim)
inputs2=[]
for x in inputs:
inputs2.append(list(np.add(np.asarray(queue_elem),np.asarray(x))))
inputs3=[]
for x in range(len(inputs2)):
inputs3.append(arr[tuple(inputs2[x])])
return inputs3
#this function takes a value and an index and assigns the array that value at the index
def arrupdater(val,idx):
arr[tuple(idx)]=val
return arr, idx
#########Initializing
all_finished=False
#make our empty array
sizer=tuple([gran]*dim)
arr=np.zeros(shape=sizer)
nodes_avail=range(1, size) # 0 is not a worker
#assumes axes all start at same place
ax1=np.linspace(20,30, num=gran)
arr=axis_fitter(arr, dim, gran)
#fitting axes and initializing queues
arr=axis_fitter(arr, dim, gran, start=20, stop=30)
queue,queueposs,queuedone =queue_init(arr)
#running first updater
queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim)
def sender(queue):
send_num=min(len(queue),len(nodes_avail))
for k in range(send_num):
node=nodes_avail.pop()
queue_elem=queue.pop(k)
command_buffer[0]=int(all_finished)
command_buffer[1]=queue_elem
command_buffer[2]=objectivefuncprimer(arr,queue_elem,dim)
comm.Send(command_buffer, dest=node)
while all_finished==False:
sender(queue)
comm.Recv(result_buffer,source=MPI.ANY_SOURCE)
arr,comp_idx=arrupdater(result_buffer[1],result_buffer[2])
queue,queueposs,queuedone=queue_update(queue,queueposs,queuedone,arr,dim,comp_idx)
nodes_avail.append(result_buffer[0])
if len(queuedone)==gran**2:
for n in range(1, size):
comm.Send(np.array([True,0,0]), dest=n)
all_finished=True
print arr
if rank>0:
all_finished_worker=False
#this test function will only work in 2d
def objectivefunc2d_2(inputs):
#this will be important for more complicated functions later
#backnum=(2**dim)-1
val=sum(inputs)
return val
while all_finished_worker==False:
comm.Recv(command_buffer, source=0)
all_finished_worker=bool(command_buffer[0])
if all_finished_worker==False:
result=objectivefunc2d_2(command_buffer[2])
# print str(result) +" from "+str(rank)
result_buffer=np.array([rank,result,command_buffer[1]])
comm.Send(result_buffer, dest=0)
This code works and has the basic structure I used above, but with a much simpler example.
from __future__ import division
import numpy as np
import os
from itertools import chain, combinations
from mpi4py import MPI
from mpi4py.MPI import ANY_SOURCE
comm = MPI.COMM_WORLD
rank = comm.Get_rank()
size = comm.Get_size()
command_buffer = np.zeros(2) # first entry is boolean, rest is data
result_buffer=np.zeros(2) # first position is node, rest is data
if rank==0:
all_finished=False
nodes_avail=range(1, size) # 0 is not a worker
arr=[]
q=range(20)
def primer(q):
return int(all_finished),q
def sender(q):
send_num=min(len(q),len(nodes_avail))
for k in range(send_num):
node=nodes_avail.pop()
queue_init=q.pop()
command_buffer[0]=primer(queue_init)[0]
command_buffer[1]=primer(queue_init)[1]
comm.Send(command_buffer, dest=node)
while all_finished==False:
sender(q)
# update q
comm.Recv(result_buffer,source=MPI.ANY_SOURCE)
arr.append(result_buffer[1])
nodes_avail.append(result_buffer[0])
if len(arr)==20:
for n in range(1, size):
comm.Send(np.array([True,0]), dest=n)
all_finished=True
print arr
if rank>0:
all_finished_worker=False
while all_finished_worker==False:
comm.Recv(command_buffer, source=0)
all_finished_worker=bool(command_buffer[0])
if all_finished_worker==False:
result=command_buffer[1]*2
# print str(result) +" from "+str(rank)
result_buffer=np.array([rank,result])
comm.Send(result_buffer, dest=0)
source to share
No one has answered this question yet
Check out similar questions: