Killing child processes created in __init__ class in Python

(New to Python and OO - sorry in advance if I'm stupid here)

I'm trying to define a Python 3 class in such a way that when instantiated, two subprocesses are also created. These subprocesses do some background work (sending and listening to UDP packets). Subprocesses also need to communicate with each other and with the instance (update the instance's attributes based on what is received from UDP, among other things).

I am creating my subprocesses using os.fork because I do not understand how to use the subprocess module to send multiple file descriptors to child processes - perhaps this is part of my problem.

The problem I'm running into is how to kill the child processes when the instance is destroyed. My understanding is that I shouldn't use destructors in Python because stuff needs to be cleaned up and garbage collected automatically by Python. In any case, the following code leaves the children running after it exits.

What's the correct approach?

import os
from time import sleep

class A:
    def __init__(self):
        sfp, pts = os.pipe() # senderFromParent, parentToSender
        pfs, stp = os.pipe() # parentFromSender, senderToParent
        pfl, ltp = os.pipe() # parentFromListener, listenerToParent
        sfl, lts = os.pipe() # senderFromListener, listenerToSender
        pid = os.fork()
        if pid:
            # parent
            os.close(sfp)
            os.close(stp)
            os.close(lts)
            os.close(ltp)
            os.close(sfl)
            self.pts = os.fdopen(pts, 'w') # allow creator of A inst to
            self.pfs = os.fdopen(pfs, 'r') # send and receive messages
            self.pfl = os.fdopen(pfl, 'r') # to/from sender and
        else:                              # listener processes
            # sender or listener
            os.close(pts)
            os.close(pfs)
            os.close(pfl)
            pid = os.fork()
            if pid:
                # sender
                os.close(ltp)
                os.close(lts)
                sender(self, sfp, stp, sfl)
            else:
                # listener
                os.close(stp)
                os.close(sfp)
                os.close(sfl)
                listener(self, ltp, lts)

def sender(a, sfp, stp, sfl):
    sfp = os.fdopen(sfp, 'r') # receive messages from parent
    stp = os.fdopen(stp, 'w') # send messages to parent
    sfl = os.fdopen(sfl, 'r') # received messages from listener
    while True:
        # send UDP packets based on messages from parent and process
        # responses from listener (some responses passed back to parent)
        print("Sender alive")
        sleep(1)

def listener(a, ltp, lts):
    ltp = os.fdopen(ltp, 'w') # send messages to parent
    lts = os.fdopen(lts, 'w') # send messages to sender
    while True:
        # listen for and process incoming UDP packets, sending some
        # to sender and some to parent
        print("Listener alive")
        sleep(1)

a = A()

      

Doing the above gives:

Sender alive
Listener alive
Sender alive
Listener alive
...

      

+3


source to share


3 answers


Actually, you have to use destructors. Python objects have a method __del__

that is called just before garbage collection.

In your case, you should define



def __del__(self):
   ...

      

inside yours class A

, which sends appropriate kill signals to your child processes. Don't forget to store the child PIDs in the parent process, of course.

0


source


As suggested here , you can create a child process using the multiprocessing module with the daemon = True flag.

Example:



from multiprocessing import Process

p = Process(target=f, args=('bob',))
p.daemon = True
p.start()

      

0


source


There is no point in trying to reinvent the wheel. subprocess

does whatever you need and more, although it multiprocessing

will be just a process, so we will use it.

You can use multiprocessing.Pipe

to create connections and send messages back and forth between two processes. You can make the handset "duplex" so both ends can send and receive if you need to. You can use multiprocessing.Manager

to create a common Namespace

across processes (sharing state between listener, sender and parent). There is a warning using multiprocessing.list

, multiprocessing.dict

or multiprocessing.Namespace

. Any modified object assigned to it will not see the changes made to that object until it is reassigned to a managed object.

eg.

namespace.attr = {}
# change below not cascaded to other processes
namespace.attr["key"] = "value"
# force change to other processes
namespace.attr = namespace.attr

      

If you need to have more than one write process to the same attribute, you will need to use synchronization to prevent simultaneous modification by one process, destroying changes made by another process.

Sample code:

from multiprocessing import Process, Pipe, Manager

class Reader:

    def __init__(self, writer_conn, namespace):
        self.writer_conn = writer_conn
        self.namespace = namespace

    def read(self):
        self.namespace.msgs_recv = 0
        with self.writer_conn:
            try:
                while True:
                    obj = self.writer_conn.recv()
                    self.namespace.msgs_recv += 1
                    print("Reader got:", repr(obj))
            except EOFError:
                print("Reader has no more data to receive")

class Writer:

    def __init__(self, reader_conn, namespace):
        self.reader_conn = reader_conn
        self.namespace = namespace

    def write(self, msgs):
        self.namespace.msgs_sent = 0
        with self.reader_conn:
            for msg in msgs:
                self.reader_conn.send(msg)
                self.namespace.msgs_sent += 1

def create_child_processes(reader, writer, msgs):
    p_write = Process(target=Writer.write, args=(writer, msgs))
    p_write.start()

    # This is very important otherwise reader will hang after writer has finished.
    # The order of this statement coming after p_write.start(), but after
    # p_read.start() is also important. Look up file descriptors and how they
    # are inherited by child processes on Unix and how a any valid fd to the
    # write side of a pipe will keep all read ends open
    writer.reader_conn.close()

    p_read = Process(target=Reader.read, args=(reader,))
    p_read.start()

    return p_read, p_write

def run_mp_pipe():

    manager = Manager()
    namespace = manager.Namespace()
    read_conn, write_conn = Pipe()

    reader = Reader(read_conn, namespace)
    writer = Writer(write_conn, namespace)

    p_read, p_write = create_child_processes(reader, writer, 
        msgs=["hello", "world", {"key", "value"}])

    print("starting")

    p_write.join()
    p_read.join()

    print("done")
    print(namespace)
    assert namespace.msgs_sent == namespace.msgs_recv

if __name__ == "__main__":
    run_mp_pipe()

      

Output:

starting
Reader got: 'hello'
Reader got: 'world'
Reader got: {'key', 'value'}
Reader has no more data to receive
done
Namespace(msgs_recv=3, msgs_sent=3)

      

0


source







All Articles