Is it possible to execute a function every x seconds in python when it executes pool.map?
I am running pool.map on a large dataset and I want to print a report to the console every minute. Is it possible? As I understand python is a synchronous language, it cannot do it like nodejs.
Perhaps this can be done with threading .. or what?
finished = 0
def make_job():
sleep(1)
global finished
finished += 1
# I want to call this function every minute
def display_status():
print 'finished: ' + finished
def main():
data = [...]
pool = ThreadPool(45)
results = pool.map(make_job, data)
pool.close()
pool.join()
source to share
You can use a constant threading timer like from this question: Python threading.timer - repeat a function every n seconds
from threading import Timer,Event
class perpetualTimer(object):
# give it a cycle time (t) and a callback (hFunction)
def __init__(self,t,hFunction):
self.t=t
self.stop = Event()
self.hFunction = hFunction
self.thread = Timer(self.t,self.handle_function)
def handle_function(self):
self.hFunction()
self.thread = Timer(self.t,self.handle_function)
if not self.stop.is_set():
self.thread.start()
def start(self):
self.stop.clear()
self.thread.start()
def cancel(self):
self.stop.set()
self.thread.cancel()
Basically it is just a wrapper for an object Timer
that creates a new object Timer
every time you call the function you want. Don't expect millisecond precision (or even close) from them, but it should be perfect for your purposes.
With this, your example would become:
finished = 0
def make_job():
sleep(1)
global finished
finished += 1
def display_status():
print 'finished: ' + finished
def main():
data = [...]
pool = ThreadPool(45)
# set up the monitor to make run the function every minute
monitor = PerpetualTimer(60,display_status)
monitor.start()
results = pool.map(make_job, data)
pool.close()
pool.join()
monitor.cancel()
EDIT
A cleaner solution could be (thanks to the comments below):
from threading import Event,Thread
class RepeatTimer(Thread):
def __init__(self, t, callback, event):
Thread.__init__(self)
self.stop = event
self.wait_time = t
self.callback = callback
self.daemon = True
def run(self):
while not self.stop.wait(self.wait_time):
self.callback()
Then in your code:
def main():
data = [...]
pool = ThreadPool(45)
stop_flag = Event()
RepeatTimer(60,display_status,stop_flag).start()
results = pool.map(make_job, data)
pool.close()
pool.join()
stop_flag.set()
source to share
One way to do this is to use the main thread as a control thread. Something like below should work:
def main():
data = [...]
results = []
step = 0
pool = ThreadPool(16)
pool.map_async(make_job, data, callback=results.extend)
pool.close()
while True:
if results:
break
step += 1
sleep(1)
if step % 60 == 0:
print "status update" + ...
I used .map()
instead .map_async()
as the first one is synchronous. Also, you probably need to replace results.extend
it with something more efficient. Finally, thanks to the GIL, the speed improvement may be much less than expected.
By the way, a little funny that you wrote that Python is synchronous in the question that asks about ThreadPool;).
source to share
Let's consider using the module time
. The function time.time()
returns the current UNIX time .
For example, the call time.time()
now returns 1410384038.967499
. He'll be back in a second 1410384039.967499
.
The way I did it would use a while loop instead of results = pool(...)
and on each iteration run the check like this:
last_time = time.time()
while (...):
new_time = time.time()
if new_time > last_time+60:
print "status update" + ...
last_time = new_time
(your computation here)
So, check if (at least) a minute has passed since the last status update. It should print a status update approximately every sixty seconds.
Sorry this is an incomplete answer, but I hope this helps or gives you helpful ideas.
source to share