Why does Python's multiprocessing queue obfuscate dictionaries?
I am trying to create a multiprocessing multithreaded program in python. I have been successful so far, but I ran into a problem that was bugging me.
I have 3 classes. The main class is a manager that creates one or more subprocesses (Subprocess class) and connects to each of them with dedicated multiprocessing. Queue. It then dispatches these subprocess commands through the queue to create socket control threads (Server_Thread class). configuration parameters for Server_Thread are created in the Manager class and passed to the subprocess via a queue as a dictionary.
Code follows
import threading
import multiprocessing
import socket
import time
class Server_Thread(threading.Thread):
def __init__(self, client_config):
threading.Thread.__init__(self)
self.address = client_config['address']
self.port = client_config['port']
def run(self):
self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
print "Binding to: local host, port = ", self.port
self.socket.bind((socket.gethostname(), self.port))
self.socket.listen(1)
self.running = True
while self.running:
client_socket, client_address = self.socket.accept()
# do stuff
def stop(self):
self.running = False
class Subprocess(multiprocessing.Process):
def __init__(self, queue):
multiprocessing.Process.__init__(self)
self.queue = queue
self.server_thread_list = []
def run(self):
self.running = True
while self.running:
command = self.queue.get()
if command[0] == "create_client":
server_thread = Server_Thread(command[1])
server_thread.start()
self.server_thread_list.append(server_thread)
elif command[0] == "terminate":
self.running = False
for server_thread in self.server_thread_list:
server_thread.stop()
server_thread.join()
class Manager:
def __init__(self):
self.client_config = {}
self.client_config['junk'] = range(10000) # actually contains lots of stuff
self.client_config['address'] = 'localhost'
def run(self):
current_bind_port = 40001
self.queue = multiprocessing.Queue()
subprocess = Subprocess(self.queue)
subprocess.start()
for i in range(20):
print "creating socket thread at port =", current_bind_port
self.client_config['port'] = current_bind_port
self.queue.put(("create_client", self.client_config.copy())) # pass a dictionary copy
current_bind_port += 1
time.sleep(10)
self.queue.put(("terminate", None))
subprocess.join()
if __name__ == "__main__":
manager = Manager()
manager.run()
The problem is when I run this sometimes it works fine, but sometimes the config dictionary gets messed up in the queue. I think it has something to do with the queue fill rate and empty rate, and I think it overflows without warning.
Exit with some restructuring (multiple processes are mixed with printing)
>Python temp.py creating socket thread at port = 40001 creating socket thread at port = 40002 creating socket thread at port = 40003 creating socket thread at port = 40004 creating socket thread at port = 40005 creating socket thread at port = 40006 creating socket thread at port = 40007 creating socket thread at port = 40008 creating socket thread at port = 40009 creating socket thread at port = 40010 creating socket thread at port = 40011 creating socket thread at port = 40012 creating socket thread at port = 40013 creating socket thread at port = 40014 creating socket thread at port = 40015 creating socket thread at port = 40016 creating socket thread at port = 40017 creating socket thread at port = 40018 creating socket thread at port = 40019 creating socket thread at port = 40020 << OK Binding to: local host, port = 40001 Binding to: local host, port = 40020 << NOT OK from here Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Binding to: local host, port = 40020 Exception in thread Thread-4: Traceback (most recent call last): File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner self.run() File "Y:\cStation\Python\iReact connection PoC\temp.py", line 18, in run self.socket.bind((socket.gethostname(), self.port)) File "C:\Python27\lib\socket.py", line 224, in meth return getattr(self._sock,name)(*args) error: [Errno 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted .... Get this message several more times ....
The problem seems to become less frequent (but does not go away completely) if I insert the time.sleep (0.1) command after I have queued each create_thread command.
Interestingly, the command tuple "create_thread"
is passed without problems, the problem seems to be a dictionary of values. Is there a way to make sure the queue is ok for writing before inserting non-write values ββinto it time.wait()
? I tried to deliver while not self.queue.empty(): pass
but seems to be stuck forever after several commands ...
source to share
No one has answered this question yet
Check out similar questions: