Killing child processes created in __init__ class in Python
(New to Python and OO - sorry in advance if I'm stupid here)
I'm trying to define a Python 3 class in such a way that when instantiated, two subprocesses are also created. These subprocesses do some background work (sending and listening to UDP packets). Subprocesses also need to communicate with each other and with the instance (update the instance's attributes based on what is received from UDP, among other things).
I am creating my subprocesses using os.fork because I do not understand how to use the subprocess module to send multiple file descriptors to child processes - perhaps this is part of my problem.
The problem I'm running into is how to kill the child processes when the instance is destroyed. My understanding is that I shouldn't use destructors in Python because stuff needs to be cleaned up and garbage collected automatically by Python. In any case, the following code leaves the children running after it exits.
What's the correct approach?
import os
from time import sleep
class A:
def __init__(self):
sfp, pts = os.pipe() # senderFromParent, parentToSender
pfs, stp = os.pipe() # parentFromSender, senderToParent
pfl, ltp = os.pipe() # parentFromListener, listenerToParent
sfl, lts = os.pipe() # senderFromListener, listenerToSender
pid = os.fork()
if pid:
# parent
os.close(sfp)
os.close(stp)
os.close(lts)
os.close(ltp)
os.close(sfl)
self.pts = os.fdopen(pts, 'w') # allow creator of A inst to
self.pfs = os.fdopen(pfs, 'r') # send and receive messages
self.pfl = os.fdopen(pfl, 'r') # to/from sender and
else: # listener processes
# sender or listener
os.close(pts)
os.close(pfs)
os.close(pfl)
pid = os.fork()
if pid:
# sender
os.close(ltp)
os.close(lts)
sender(self, sfp, stp, sfl)
else:
# listener
os.close(stp)
os.close(sfp)
os.close(sfl)
listener(self, ltp, lts)
def sender(a, sfp, stp, sfl):
sfp = os.fdopen(sfp, 'r') # receive messages from parent
stp = os.fdopen(stp, 'w') # send messages to parent
sfl = os.fdopen(sfl, 'r') # received messages from listener
while True:
# send UDP packets based on messages from parent and process
# responses from listener (some responses passed back to parent)
print("Sender alive")
sleep(1)
def listener(a, ltp, lts):
ltp = os.fdopen(ltp, 'w') # send messages to parent
lts = os.fdopen(lts, 'w') # send messages to sender
while True:
# listen for and process incoming UDP packets, sending some
# to sender and some to parent
print("Listener alive")
sleep(1)
a = A()
Doing the above gives:
Sender alive
Listener alive
Sender alive
Listener alive
...
source to share
Actually, you have to use destructors. Python objects have a method __del__
that is called just before garbage collection.
In your case, you should define
def __del__(self):
...
inside yours class A
, which sends appropriate kill signals to your child processes. Don't forget to store the child PIDs in the parent process, of course.
source to share
As suggested here , you can create a child process using the multiprocessing module with the daemon = True flag.
Example:
from multiprocessing import Process
p = Process(target=f, args=('bob',))
p.daemon = True
p.start()
source to share
There is no point in trying to reinvent the wheel. subprocess
does whatever you need and more, although it multiprocessing
will be just a process, so we will use it.
You can use multiprocessing.Pipe
to create connections and send messages back and forth between two processes. You can make the handset "duplex" so both ends can send and receive if you need to. You can use multiprocessing.Manager
to create a common Namespace
across processes (sharing state between listener, sender and parent). There is a warning using multiprocessing.list
, multiprocessing.dict
or multiprocessing.Namespace
. Any modified object assigned to it will not see the changes made to that object until it is reassigned to a managed object.
eg.
namespace.attr = {}
# change below not cascaded to other processes
namespace.attr["key"] = "value"
# force change to other processes
namespace.attr = namespace.attr
If you need to have more than one write process to the same attribute, you will need to use synchronization to prevent simultaneous modification by one process, destroying changes made by another process.
Sample code:
from multiprocessing import Process, Pipe, Manager
class Reader:
def __init__(self, writer_conn, namespace):
self.writer_conn = writer_conn
self.namespace = namespace
def read(self):
self.namespace.msgs_recv = 0
with self.writer_conn:
try:
while True:
obj = self.writer_conn.recv()
self.namespace.msgs_recv += 1
print("Reader got:", repr(obj))
except EOFError:
print("Reader has no more data to receive")
class Writer:
def __init__(self, reader_conn, namespace):
self.reader_conn = reader_conn
self.namespace = namespace
def write(self, msgs):
self.namespace.msgs_sent = 0
with self.reader_conn:
for msg in msgs:
self.reader_conn.send(msg)
self.namespace.msgs_sent += 1
def create_child_processes(reader, writer, msgs):
p_write = Process(target=Writer.write, args=(writer, msgs))
p_write.start()
# This is very important otherwise reader will hang after writer has finished.
# The order of this statement coming after p_write.start(), but after
# p_read.start() is also important. Look up file descriptors and how they
# are inherited by child processes on Unix and how a any valid fd to the
# write side of a pipe will keep all read ends open
writer.reader_conn.close()
p_read = Process(target=Reader.read, args=(reader,))
p_read.start()
return p_read, p_write
def run_mp_pipe():
manager = Manager()
namespace = manager.Namespace()
read_conn, write_conn = Pipe()
reader = Reader(read_conn, namespace)
writer = Writer(write_conn, namespace)
p_read, p_write = create_child_processes(reader, writer,
msgs=["hello", "world", {"key", "value"}])
print("starting")
p_write.join()
p_read.join()
print("done")
print(namespace)
assert namespace.msgs_sent == namespace.msgs_recv
if __name__ == "__main__":
run_mp_pipe()
Output:
starting
Reader got: 'hello'
Reader got: 'world'
Reader got: {'key', 'value'}
Reader has no more data to receive
done
Namespace(msgs_recv=3, msgs_sent=3)
source to share