Why does Python's multiprocessing queue obfuscate dictionaries?

I am trying to create a multiprocessing multithreaded program in python. I have been successful so far, but I ran into a problem that was bugging me.

I have 3 classes. The main class is a manager that creates one or more subprocesses (Subprocess class) and connects to each of them with dedicated multiprocessing. Queue. It then dispatches these subprocess commands through the queue to create socket control threads (Server_Thread class). configuration parameters for Server_Thread are created in the Manager class and passed to the subprocess via a queue as a dictionary.

Code follows

import threading
import multiprocessing
import socket
import time


class Server_Thread(threading.Thread):
    def __init__(self, client_config):
        threading.Thread.__init__(self)
        self.address = client_config['address']
        self.port = client_config['port']

    def run(self):
        self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        print "Binding to: local host, port = ", self.port 
        self.socket.bind((socket.gethostname(), self.port))
        self.socket.listen(1)

        self.running = True
        while self.running:    
            client_socket, client_address = self.socket.accept()
            # do stuff

    def stop(self):
        self.running = False


class Subprocess(multiprocessing.Process):
    def __init__(self, queue):
        multiprocessing.Process.__init__(self)
        self.queue = queue
        self.server_thread_list = []

    def run(self):
        self.running = True
        while self.running:
            command = self.queue.get()
            if command[0] == "create_client":
                server_thread = Server_Thread(command[1])
                server_thread.start()
                self.server_thread_list.append(server_thread)
            elif command[0] == "terminate":
                self.running = False
        for server_thread in self.server_thread_list:
            server_thread.stop()
            server_thread.join()


class Manager:
    def __init__(self):
        self.client_config = {}     
        self.client_config['junk'] = range(10000)    # actually contains lots of stuff
        self.client_config['address'] = 'localhost'

    def run(self):
        current_bind_port = 40001
        self.queue = multiprocessing.Queue()
        subprocess = Subprocess(self.queue)
        subprocess.start()
        for i in range(20):
            print "creating socket thread at port =", current_bind_port
            self.client_config['port'] = current_bind_port
            self.queue.put(("create_client", self.client_config.copy()))    # pass a dictionary copy
            current_bind_port += 1
        time.sleep(10)
        self.queue.put(("terminate", None))
        subprocess.join()


if __name__ == "__main__":
    manager = Manager()
    manager.run()

      

The problem is when I run this sometimes it works fine, but sometimes the config dictionary gets messed up in the queue. I think it has something to do with the queue fill rate and empty rate, and I think it overflows without warning.

Exit with some restructuring (multiple processes are mixed with printing)

>Python temp.py
creating socket thread at port = 40001
creating socket thread at port = 40002
creating socket thread at port = 40003
creating socket thread at port = 40004
creating socket thread at port = 40005
creating socket thread at port = 40006
creating socket thread at port = 40007
creating socket thread at port = 40008
creating socket thread at port = 40009
creating socket thread at port = 40010
creating socket thread at port = 40011
creating socket thread at port = 40012
creating socket thread at port = 40013
creating socket thread at port = 40014
creating socket thread at port = 40015
creating socket thread at port = 40016
creating socket thread at port = 40017
creating socket thread at port = 40018
creating socket thread at port = 40019
creating socket thread at port = 40020  << OK

Binding to: local host, port =  40001
Binding to: local host, port =  40020  << NOT OK from here
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020
Binding to: local host, port =  40020

Exception in thread Thread-4:
Traceback (most recent call last):
  File "C:\Python27\lib\threading.py", line 810, in __bootstrap_inner
    self.run()
  File "Y:\cStation\Python\iReact connection PoC\temp.py", line 18, in run
    self.socket.bind((socket.gethostname(), self.port))
  File "C:\Python27\lib\socket.py", line 224, in meth
    return getattr(self._sock,name)(*args)
error: [Errno 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted

.... Get this message several more times ....

      

The problem seems to become less frequent (but does not go away completely) if I insert the time.sleep (0.1) command after I have queued each create_thread command.

Interestingly, the command tuple "create_thread"

is passed without problems, the problem seems to be a dictionary of values. Is there a way to make sure the queue is ok for writing before inserting non-write values ​​into it time.wait()

? I tried to deliver while not self.queue.empty(): pass

but seems to be stuck forever after several commands ...

+3


source to share





All Articles