Threading and Conditions: Debugging Acquisition
I have a weird problem in one of my programs where a thread is receiving a condition and another thread is reporting that no condition was received.
I did some debug data to see if the thread got this condition and it looks like it did. But another thread is telling me that the condition was not received.
Here is a summary (not very short, sorry for that) of how I did it, and the output I have:
import logging
import time
from threading import Condition, Lock, Thread
logging.basicConfig(level=logging.DEBUG)
class MyClass:
def __init__(self):
self._is_alive = False
self._thread_update = None
self._conditioned_thread = None
self._thread_condition = Condition(Lock()) # Or RLock() or nothing, same issue
def start(self):
self._is_alive = True
self._thread_update = Thread(target=self._loop_update, name="Updater")
self._conditioned_thread = Thread(target=self._loop_conditioned, name="Conditioned")
self._thread_update.start()
self._conditioned_thread.start()
def _loop_conditioned(self):
logging.debug("Starting conditioned thread")
with self._thread_condition:
while self._is_alive:
logging.debug("Awaiting... Is acquired ? %s", self._thread_condition._is_owned())
self._thread_condition.wait()
logging.debug("Success !")
def _loop_update(self):
time.sleep(1)
logging.debug("Notifying ! Is acquired ? %s", self._thread_condition._is_owned())
self._thread_condition.notify(1)
# Do some stuff
def stop(self):
self._is_alive = False
self._thread_condition.notify()
self._thread_update.join()
self._thread_condition.join()
if __name__ == "__main__":
c = MyClass()
c.start()
time.sleep(4)
c.stop()
And here's the output:
DEBUG:root:Starting conditioned thread
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Notifying ! Is acquired ? False
Exception in thread Updater:
Traceback (most recent call last):
File "/usr/lib64/python2.6/threading.py", line 532, in __bootstrap_inner
self.run()
File "/usr/lib64/python2.6/threading.py", line 484, in run
self.__target(*self.__args, **self.__kwargs)
File ".../test.py", line 39, in _loop_update
self._thread_condition.notify(1)
File "/usr/lib64/python2.6/threading.py", line 274, in notify
raise RuntimeError("cannot notify on un-acquired lock")
RuntimeError: cannot notify on un-acquired lock
Traceback (most recent call last):
File ".../test.py", line 53, in <module>
c.stop()
File ".../test.py", line 45, in stop
self._thread_condition.notify()
File "/usr/lib64/python2.6/threading.py", line 274, in notify
raise RuntimeError("cannot notify on un-acquired lock")
RuntimeError: cannot notify on un-acquired lock
The question is, why do I have a RuntimeError even though I have acquired the condition?
source to share
I'm not an expert on the topic, but the documentation has this to say:
wait (timeout = no)
Wait until notified or timeout occurs. If the calling thread did not acquire the lock when this method is called RuntimeError.
This method releases the underlying lock and then blocks until it is awakened by a call to notify () or notify_all () for the same condition variable in another thread, or until an additional timeout occurs. once awakened or timed out, he re-acquires the lock and returns.
(a main attention).
So the wait () call appears to release the lock acquired by the thread _conditioned_thread
. Perhaps this lock must then be acquired by another thread in some way (perhaps in a loop that checks to see if the lock is available); then the other thread can the notify(1)
other, first, continue the thread (and re-lock according to the last words above).
source to share
Correct code after answer:
import logging
import time
from threading import Condition, Lock, Thread
logging.basicConfig(level=logging.DEBUG)
class MyClass:
def __init__(self):
self._is_alive = False
self._thread_update = None
self._conditioned_thread = None
self._thread_condition = Condition()
def start(self):
self._is_alive = True
self._thread_update = Thread(target=self._loop_update, name="Updater")
self._conditioned_thread = Thread(target=self._loop_conditioned, name="Conditioned")
self._thread_update.start()
self._conditioned_thread.start()
def _loop_conditioned(self):
logging.debug("Starting conditioned thread")
with self._thread_condition:
while self._is_alive:
logging.debug("Awaiting... Is acquired ? %s", self._thread_condition._is_owned())
self._thread_condition.wait()
logging.debug("Success !")
def _loop_update(self):
time.sleep(1)
self._thread_condition.acquire()
logging.debug("Notifying ! Is acquired ? %s", self._thread_condition._is_owned())
self._thread_condition.notify()
self._thread_condition.release()
# Do some stuff
def stop(self):
self._is_alive = False
with self._thread_condition:
self._thread_condition.notify()
self._thread_update.join()
self._conditioned_thread.join()
if __name__ == "__main__":
c = MyClass()
c.start()
time.sleep(4)
c.stop()
Output:
DEBUG:root:Starting conditioned thread
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Notifying ! Is acquired ? True
DEBUG:root:Success !
DEBUG:root:Awaiting... Is acquired ? True
DEBUG:root:Success !
source to share