Is it possible to execute a function every x seconds in python when it executes pool.map?

I am running pool.map on a large dataset and I want to print a report to the console every minute. Is it possible? As I understand python is a synchronous language, it cannot do it like nodejs.

Perhaps this can be done with threading .. or what?

finished = 0

def make_job():
   sleep(1)
   global finished
   finished += 1

# I want to call this function every minute
def display_status():
   print 'finished: ' + finished

def main():
    data = [...]
    pool = ThreadPool(45)
    results = pool.map(make_job, data)
    pool.close()
    pool.join()

      

+3


source to share


3 answers


You can use a constant threading timer like from this question: Python threading.timer - repeat a function every n seconds

from threading import Timer,Event 

class perpetualTimer(object):

   # give it a cycle time (t) and a callback (hFunction) 
   def __init__(self,t,hFunction):
      self.t=t
      self.stop = Event()
      self.hFunction = hFunction
      self.thread = Timer(self.t,self.handle_function)

   def handle_function(self):
      self.hFunction()
      self.thread = Timer(self.t,self.handle_function)
      if not self.stop.is_set():
          self.thread.start()

   def start(self):
      self.stop.clear()
      self.thread.start()

   def cancel(self):
      self.stop.set()
      self.thread.cancel()

      

Basically it is just a wrapper for an object Timer

that creates a new object Timer

every time you call the function you want. Don't expect millisecond precision (or even close) from them, but it should be perfect for your purposes.

With this, your example would become:

finished = 0

def make_job():
   sleep(1)
   global finished
   finished += 1

def display_status():
   print 'finished: ' + finished

def main():
    data = [...]
    pool = ThreadPool(45)

    # set up the monitor to make run the function every minute
    monitor = PerpetualTimer(60,display_status)
    monitor.start()
    results = pool.map(make_job, data)
    pool.close()
    pool.join()
    monitor.cancel()

      



EDIT

A cleaner solution could be (thanks to the comments below):

from threading import Event,Thread 

class RepeatTimer(Thread):
    def __init__(self, t, callback, event):
        Thread.__init__(self)
        self.stop = event
        self.wait_time = t
        self.callback = callback
        self.daemon = True

    def run(self):
        while not self.stop.wait(self.wait_time):
            self.callback()

      

Then in your code:

def main():
    data = [...]
    pool = ThreadPool(45)
    stop_flag = Event()
    RepeatTimer(60,display_status,stop_flag).start()
    results = pool.map(make_job, data)
    pool.close()
    pool.join()
    stop_flag.set()

      

+4


source


One way to do this is to use the main thread as a control thread. Something like below should work:

def main():
   data = [...]
   results = []
   step = 0
   pool = ThreadPool(16)
   pool.map_async(make_job, data, callback=results.extend)
   pool.close()
   while True:
      if results:
          break
      step += 1
      sleep(1)
      if step % 60 == 0:
          print "status update" + ...

      



I used .map()

instead .map_async()

as the first one is synchronous. Also, you probably need to replace results.extend

it with something more efficient. Finally, thanks to the GIL, the speed improvement may be much less than expected.

By the way, a little funny that you wrote that Python is synchronous in the question that asks about ThreadPool;).

+2


source


Let's consider using the module time

. The function time.time()

returns the current UNIX time .

For example, the call time.time()

now returns 1410384038.967499

. He'll be back in a second 1410384039.967499

.

The way I did it would use a while loop instead of results = pool(...)

and on each iteration run the check like this:

last_time = time.time()
while (...):
    new_time = time.time()
    if new_time > last_time+60:
        print "status update" + ...
        last_time = new_time
    (your computation here)

      

So, check if (at least) a minute has passed since the last status update. It should print a status update approximately every sixty seconds.

Sorry this is an incomplete answer, but I hope this helps or gives you helpful ideas.

+1


source







All Articles