Wake up a specific thread from sleep from another thread in python

I've been chasing my tail all day now and I'm losing my mind. I am a complete lover and completely new to python, so please excuse my stupidity.

My main thread re-checks the database for a record and then runs threads for every new record it finds in the database.

The threads it starts are basically polling the database for a value, and if it doesn't find that value, it does some action, and then it sleeps for 60 seconds and starts.

Simplified noncode for a running thread

while True:
    stop = _Get_a_Value_From_Database_for_Exit() #..... a call to DBMS 
    If stop = 0:
        Do_stuff()
        time.sleep(60)
    else:
        break

      

There can be many such streams at any given time. I would like the main thread to check a different location in the database for a specific value and then can interrupt sleep in the above example on a specific thread that was started. The goal would be to get out of a particular stream as above without having to wait for the remainder of the sleep duration. All of these streams can be referenced by a common database identifier. I saw links to and tried to figure out how I replace it , but I don't know how I could use it to wake up a specific thread, not all of them. event.wait()

event.set()

time.sleep()

This is where my ignorance shows: is there a way I could do something database based id

for event.wait (both 12345.wait(60)

in the running thread and 12345.set()

in the main thread (all dynamic based on an ever changing database ). id

Thanks for the help!

+3


source to share


2 answers


The project is a bit tricky and here is my version.

  • scan database file /tmp/db.dat

    pre-filled with two words

  • manager: create a stream for each word; default - "whiskey" stream and "syrup" stream

  • if the word ends with _stop

    , for example syrup_stop

    , tell this thread to die by setting its stop event

  • each thread looks at the database file and exits if it sees a word stop

    . It will also exit if a stop event is set.

  • Note that if the Manager thread sets a worker stop_event, the worker will exit immediately. Each thread does a bit of work, but spends most of its time in a call stop_ev.wait()

    . This way, when the event becomes set, it doesn't need to sit, it can exit immediately.

The server will be happy to play! Run it, then send commands to it adding rows to the database. Try one of the following:



$ echo pie >> /tmp/db.dat  # start new thread

$ echo pie_stop >> /tmp/db.dat # stop thread by event

$ echo whiskey_stop >> /tmp/db.dat # stop another thread "

$ echo stop >> /tmp/db.dat # stop all threads

      

Source

import logging, sys, threading, time

STOP_VALUE = 'stop'

logging.basicConfig(
    level=logging.DEBUG,
    format="%(asctime)-4s %(threadName)s %(levelname)s %(message)s", 
    datefmt="%H:%M:%S",
    stream=sys.stderr,
)


class Database(list):
    PATH = '/tmp/db.dat'
    def __init__(self):
        super(Database,self).__init__()
        self._update_lock = threading.Lock()

    def update(self):
        with self._update_lock:
            self[:] = [ line.strip() for line in open(self.PATH) ]

db = Database()

def spawn(events, key):
    events[key] = threading.Event()
    th = threading.Thread(
        target=search_worker,
        kwargs=dict(stop_ev=events[key]),
        name='thread-{}'.format(key),
    )
    th.daemon = True
    th.start()

def search_worker(stop_ev):
    """
    scan database until "stop" found, or our event is set
    """
    logging.info('start')
    while True:
        logging.debug('scan')
        db.update()
        if STOP_VALUE in db:
            logging.info('stopvalue: done')
            return
        if stop_ev.wait(timeout=10):
            logging.info('event: done')
            return 

def manager():
    """
    scan database
    - word: spawn thread if none already
    - word_stop: tell thread to die by setting its stop event
    """
    logging.info('start')
    events = dict()
    while True:
        db.update()
        for key in db:
            if key == STOP_VALUE:
                continue
            if key in events:
                continue
            if key.endswith('_stop'):
                key = key.split('_')[0]
                if key not in events:
                    logging.error('stop: missing key=%s!', key)
                else:
                    # signal thread to stop
                    logging.info('stop: key=%s', key)
                    events[key].set()
                    del events[key]
            else:
                spawn(events, key)
                logging.info('spawn: key=%s', key)
        time.sleep(2)


if __name__=='__main__':

    with open(Database.PATH, 'w') as dbf:
        dbf.write(
        'whiskey\nsyrup\n'
        )

    db.update()
    logging.info('start: db=%s -- %s', db.PATH, db)

    manager_t = threading.Thread(
        target=manager,
        name='manager',
    )
    manager_t.start()
    manager_t.join()

      

+2


source


Rather change your design architecture and go for a distributed messaging process rather than re-listening dbEngine in an infinite loop with repeated dbSeek-s for a dumb value, retest for equality and then try to kill-sleep.

Both or are intelligent, no middlemen, messaging layers are very good in this sense. ZeroMQ

nanomsg

Striving to cross fire and water IMHO doesn't do anything good for the real system.



Smart, scalable, distributed process from process to process.

(Fig. On a simple distributed process-to-process messaging, courtesy imatix / ZeroMQ)

(fig. on simple distributed transmission / coordination of exchange between processes, courtesy imatix / ZeroMQ)

0


source







All Articles