Non-blocking multiprocessor communication.

I am using multiprocessing.connection.Listener for cross-process communication and it works like a charm for me. Now I would really like my mainloop to do something else between commands from the client. Unfortunately listener.accept () blocks execution until a connection is established with the client process.

Is there an easy way to manage non-blocking validation for multiprocessor communication? Time-out? Or should I use a dedicated thread?

    # Simplified code:

    from multiprocessing.connection import Listener

    def mainloop():
        listener = Listener(address=(localhost, 6000), authkey=b'secret')

        while True:
            conn = listener.accept() # <---  This blocks!
            msg = conn.recv() 
            print ('got message: %r' % msg)
            conn.close()

      

+9


source to share


1 answer


I myself have not used the Listener object - for this task I usually use multiprocessing.Queue

; doco at the following link:

https://docs.python.org/2/library/queue.html#Queue.Queue

This object can be used to send and receive any object that can expand eyedroppers using APIs; I think you are most interested in:

  • in process A
    • .put('some message')

  • in process B
    • .get_nowait() # will raise Queue.Empty if nothing is available- handle that to move on with your execution



The only limitation with this is that you will need to control both Process objects at some point in order to be able to allocate a queue for them - something like this:

import time
from Queue import Empty
from multiprocessing import Queue, Process


def receiver(q):
    while 1:
        try:
            message = q.get_nowait()
            print 'receiver got', message
        except Empty:
            print 'nothing to receive, sleeping'
            time.sleep(1)


def sender(q):
    while 1:
        message = 'some message'
        q.put('some message')
        print 'sender sent', message
        time.sleep(1)


some_queue = Queue()

process_a = Process(
    target=receiver,
    args=(some_queue,)
)

process_b = Process(
    target=sender,
    args=(some_queue,)
)

process_a.start()
process_b.start()

print 'ctrl + c to exit'
try:
    while 1:
        time.sleep(1)
except KeyboardInterrupt:
    pass

process_a.terminate()
process_b.terminate()

process_a.join()
process_b.join()

      

Queues are good because you can actually have as many consumers and as many producers for the same Queue object as you like (handy for distributing tasks).

I have to point out that simply calling .terminate()

in a Process is bad form - you should use your shiny new messaging system to send a disconnect message or something.

0


source







All Articles