Open list between process on python server

I have a simple UDPServer that does multiprocessing .

I want to create a list containing information about all clients.

I am using Manager , but I don't understand how to add information to the list - I need a Transfer Manager object to process, but how? My path with the new attribute doesn't work.

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()

      

How to fix it? Thank!

Output:

Exception happened during processing of request from ('127.0.0.1', 55679)
Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 724, in _callmethod
    conn = self._tls.connection
AttributeError: 'ForkAwareLocal' object has no attribute 'connection'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 584, in process_request
    self.finish_request(request, client_address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 344, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/socketserver.py", line 665, in __init__
    self.handle()
  File "server.py", line 15, in handle
    ChatHandler.clients.append(self.client_address)
  File "<string>", line 2, in append
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 728, in _callmethod
    self._connect()
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/managers.py", line 715, in _connect
    conn = self._Client(self._token.address, authkey=self._authkey)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 495, in Client
    c = SocketClient(address)
  File "/Library/Frameworks/Python.framework/Versions/3.4/lib/python3.4/multiprocessing/connection.py", line 624, in SocketClient
    s.connect(address)
FileNotFoundError: [Errno 2] No such file or directory

      

+3


source to share


2 answers


The problem is that you are allowing the main process to terminate as soon as the workflow starts. When the process that created it multiprocessing.Manager

finishes executing it, the server is Manager

closed, which means that your shared list object is now useless. This is because the object Manager

registers the function shutdown

as a "finalizer" with the module multiprocessing

, which means that it will run just before the process ends. Here's the code that registers it in BaseManager.__init__

:

    # register a finalizer
    self._state.value = State.STARTED
    self.shutdown = util.Finalize(
        self, type(self)._finalize_manager,
        args=(self._process, self._address, self._authkey,
              self._state, self._Client),
        exitpriority=0
        )

      

Here's the code that actually turns off:



@staticmethod
def _finalize_manager(process, address, authkey, state, _Client):
    '''
    Shutdown the manager process; will be registered as a finalizer
    '''
    if process.is_alive():
        util.info('sending shutdown message to manager')
        try:
            conn = _Client(address, authkey=authkey)
            try:
                dispatch(conn, None, 'shutdown')
            finally:
                conn.close()
        except Exception:
            pass

        process.join(timeout=1.0)
        if process.is_alive():
            util.info('manager still alive')
            if hasattr(process, 'terminate'):
                util.info('trying to `terminate()` manager process')
                process.terminate()
                process.join(timeout=0.1)
                if process.is_alive():
                    util.info('manager still alive after terminate')

    state.value = State.SHUTDOWN
    try:
        del BaseProxy._address_to_local[address]
    except KeyError:
        pass

      

The fix is ​​simple - don't let the main process immediately start the process that starts the UDP server by calling server_process.join()

:

import multiprocessing
from socketserver import UDPServer, ForkingMixIn, DatagramRequestHandler
from socket import socket, AF_INET, SOCK_DGRAM
from settings import host, port, number_of_connections

class ChatHandler(DatagramRequestHandler):

    def handle(self):
        cur_process = multiprocessing.current_process()
        data = self.request[0].strip()
        socket = self.request[1]
        ChatHandler.clients.append(self.client_address) # error here
        print(ChatHandler.clients)


class ChatServer(ForkingMixIn, UDPServer):
    pass


if __name__ == '__main__':
    server = ChatServer((host, port), ChatHandler)
    ChatHandler.clients = multiprocessing.Manager().list()
    server_process = multiprocessing.Process(target=server.serve_forever)
    server_process.daemon = False
    server_process.start()
    server_process.join() # This fixes the issue.

      

+6


source


Below is an example of a UDP server and a generic list.

  • the parent code creates the Manager control list and passes it start_server()

  • this function, in turn, actually starts the server, keeping a shared list so that the server and its handler can access it.

  • when a packet arrives, the method runs handle()

    . This allows you to access the server using the self.server

    and shared list with the self.server.client_list

    ChatServer instance attribute.

I tested by starting the server, waiting for seconds, and then sending the UDP "beer" packet using the command netcat

. For some reason, it sends Xs first and each pin is duplicated. This is a bug, but the code should point you in the right direction.

Source



import multiprocessing as mp, signal, sys
from SocketServer import (
    UDPServer, ForkingMixIn, DatagramRequestHandler
)

class ChatHandler(DatagramRequestHandler):
    def handle(self):
        data,_socket = self.request
        curproc = mp.current_process()
        print '{}: {}'.format(
            curproc,
            dict(
                data_len=len(data), 
                data=data.strip(),
                client=self.client_address,
            ))
        self.server.client_list.append(
            self.client_address)
        print('{}: {}'.format(
            curproc,
            dict(client_list=self.server.client_list),
        ))

class ChatServer(ForkingMixIn, UDPServer):
    client_list = None

def start_server(client_list):
    server = ChatServer(('', 9876), ChatHandler)
    server.client_list = client_list
    server.serve_forever()

if __name__ == '__main__':
    clist = mp.Manager().list()
    mp.Process(
        target=start_server, args=[clist],
        name='udpserver',
    ).start()

    signal.alarm(5)             # die in 5 seconds
    signal.pause()              # wait for control-C or alarm

      

test run

(sleep 1 ; echo beer | nc -vvu localhost 9876 ) &
python ./mshared.py

<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
Connection to localhost 9876 port [udp/*] succeeded!
<Process(udpserver, started)>: {'data': 'X', 'client': ('127.0.0.1', 49399), 'data_len': 1}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}
<Process(udpserver, started)>: {'data': 'beer', 'client': ('127.0.0.1', 49399), 'data_len': 5}
<Process(udpserver, started)>: {'client_list': <ListProxy object, typeid 'list' at 0x1774650>}

      

0


source







All Articles