Threading and Conditions: Debugging Acquisition

I have a weird problem in one of my programs where a thread is receiving a condition and another thread is reporting that no condition was received.

I did some debug data to see if the thread got this condition and it looks like it did. But another thread is telling me that the condition was not received.

Here is a summary (not very short, sorry for that) of how I did it, and the output I have:

import logging
import time
from threading import Condition, Lock, Thread
logging.basicConfig(level=logging.DEBUG)

class MyClass:

    def __init__(self):

        self._is_alive = False
        self._thread_update = None
        self._conditioned_thread = None
        self._thread_condition = Condition(Lock()) # Or RLock() or nothing, same issue

    def start(self):

        self._is_alive = True
        self._thread_update = Thread(target=self._loop_update, name="Updater")
        self._conditioned_thread = Thread(target=self._loop_conditioned, name="Conditioned")
        self._thread_update.start()
        self._conditioned_thread.start()

    def _loop_conditioned(self):
        logging.debug("Starting conditioned thread")

        with self._thread_condition:
            while self._is_alive:
                logging.debug("Awaiting... Is acquired ? %s", self._thread_condition._is_owned())
                self._thread_condition.wait()
                logging.debug("Success !")

    def _loop_update(self):
        time.sleep(1)
        logging.debug("Notifying ! Is acquired ? %s", self._thread_condition._is_owned())
        self._thread_condition.notify(1)
        # Do some stuff

    def stop(self):

        self._is_alive = False
        self._thread_condition.notify()
        self._thread_update.join()
        self._thread_condition.join()

if __name__ == "__main__":
    c = MyClass()
    c.start()
    time.sleep(4)
    c.stop()

      

And here's the output:

DEBUG:root:Starting conditioned thread
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Notifying ! Is acquired ? False
Exception in thread Updater:
Traceback (most recent call last):
  File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
    self.run()
  File "/usr/lib64/python2.6/threading.py", line 484, in run
    self.__target(*self.__args, **self.__kwargs)
  File ".../test.py", line 39, in _loop_update
    self._thread_condition.notify(1)
  File "/usr/lib64/python2.6/threading.py", line 274, in notify
    raise RuntimeError("cannot notify on un-acquired lock")
RuntimeError: cannot notify on un-acquired lock

Traceback (most recent call last):
  File ".../test.py", line 53, in <module>
    c.stop()
  File ".../test.py", line 45, in stop
    self._thread_condition.notify()
  File "/usr/lib64/python2.6/threading.py", line 274, in notify
    raise RuntimeError("cannot notify on un-acquired lock")
RuntimeError: cannot notify on un-acquired lock

      

The question is, why do I have a RuntimeError even though I have acquired the condition?

+3


source to share


2 answers


I'm not an expert on the topic, but the documentation has this to say:

wait (timeout = no)

Wait until notified or timeout occurs. If the calling thread did not acquire the lock when this method is called RuntimeError.

This method releases the underlying lock and then blocks until it is awakened by a call to notify () or notify_all () for the same condition variable in another thread, or until an additional timeout occurs. once awakened or timed out, he re-acquires the lock and returns.



(a main attention).

So the wait () call appears to release the lock acquired by the thread _conditioned_thread

. Perhaps this lock must then be acquired by another thread in some way (perhaps in a loop that checks to see if the lock is available); then the other thread can the notify(1)

other, first, continue the thread (and re-lock according to the last words above).

+2


source


Correct code after answer:

import logging
import time
from threading import Condition, Lock, Thread
logging.basicConfig(level=logging.DEBUG)

class MyClass:

    def __init__(self):

        self._is_alive = False
        self._thread_update = None
        self._conditioned_thread = None
        self._thread_condition = Condition()

    def start(self):

        self._is_alive = True
        self._thread_update = Thread(target=self._loop_update, name="Updater")
        self._conditioned_thread = Thread(target=self._loop_conditioned, name="Conditioned")
        self._thread_update.start()
        self._conditioned_thread.start()

    def _loop_conditioned(self):
        logging.debug("Starting conditioned thread")

        with self._thread_condition:
            while self._is_alive:
                logging.debug("Awaiting... Is acquired ? %s", self._thread_condition._is_owned())
                self._thread_condition.wait()
                logging.debug("Success !")

    def _loop_update(self):
        time.sleep(1)

        self._thread_condition.acquire()
        logging.debug("Notifying ! Is acquired ? %s", self._thread_condition._is_owned())
        self._thread_condition.notify()

        self._thread_condition.release()
        # Do some stuff

    def stop(self):

        self._is_alive = False
        with self._thread_condition:
            self._thread_condition.notify()
        self._thread_update.join()
        self._conditioned_thread.join()

if __name__ == "__main__":
    c = MyClass()
    c.start()
    time.sleep(4)
    c.stop()

      



Output:

DEBUG:root:Starting conditioned thread
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Notifying ! Is acquired ? True
DEBUG:root:Success !
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Success !

      

+1


source







All Articles