Python - multiprocessing unexpected results
I have a code containing an iterator that works well:
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
print gen(tt)
print gen(tt)
print gen(tt)
OUT:
0
1
2
But if I try to insert it into a parallel process, I don't get the expected results:
import time
import multiprocessing
m = [0,1,2,3]
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def gen(t):
return t.next()
job1 = multiprocessing.Process(target=gen, args=(tt,))
print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt,))
print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt,))
print job3.start()
OUT:
<None)>
<None)>
<None)>
I cannot figure out how to use this iterator through parallel. Can anyone help me? Thank!
UPDATE:
Following @Anand S Kumar's very helpful help, I updated my code and it works great except the output is ambiguous. I am currently trying to figure out what is wrong, maybe this will be a topic for another thread, maybe Anand can help me :)):
from threading import Thread, Lock
import time
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
time.sleep(1)
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t3 = Thread(target=f,args=(tt,))
t3.start()
t1.join()
print starter
t2.join()
print starter
t3.join()
print starter
Various outputs with the same code:
0 1 2 2 2 2 0 2 2
source to share
You are trying to print the return value of a function job.start()
that returns nothing, so it prints None
.
Instead of printing the return value job.start()
, perhaps you can move the print statement into a function gen(t)
, something like -
def gen(t):
print t.next()
And then run the program without printing job.start()
.
If you want to get the return value from a function, you can use Pool
from the multiprocessing module. [Documentation]
An example from the documentation -
from multiprocessing import Pool
def f(x):
return x*x
if __name__ == '__main__':
pool = Pool(processes=4) # start 4 worker processes
result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
print result.get(timeout=1) # prints "100" unless your computer is *very* slow
print pool.map(f, range(10))
But please note that you are actually creating multiple processes, not threads, they will not share global variables.
I believe what you want threads
, maybe an example like below will help you get started -
from threading import Thread, Lock
m = [0,1,2,3]
starter = 0
class gener(object):
def __init__(self, m):
self.m = m
self.c = 0
def __iter__(self):
return self
def next(self):
ret = self.m[self.c]
self.c += 1
return ret
tt = gener(m)
def f(t):
global starter
lock = Lock()
lock.acquire()
try:
starter = t.next()
finally:
lock.release()
t1 = Thread(target=f,args=(tt,))
t1.start()
t2 = Thread(target=f,args=(tt,))
t2.start()
t1.join()
t2.join()
source to share
Two problems:
1) the start()
function does not return a value, so you get None
to print. In the meantime, there is no need to know about it. ”
2) You are passing a generator object to each process, thereby copying the original gener
object (declared in the main process) three times, once for each forked process stack.Thus, even if you change your function to:
def gen(t):
print t.next()
all you do is call the next()
first and only time for each object gener
, print:
0 0 0
To get the desired effect, you need to iterate through the master process, passing its result to each process:
job1 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job1.start()
job2 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job2.start()
job3 = multiprocessing.Process(target=gen, args=(tt.next(),))
#print job3.start()
Then you need to execute all your functions gen
: print
value:
def gen(t):
print t
And you get:
0 1 2
source to share