Twisted Trial: how to test a MultiService with a client in it - Reactor was unclean
I inherited a Twisted MultiService to which I try to add tests, but whatever I do, I get a DirtyReactorAggregateError. The service connects to the server with twisted.application.internet.TCPClient
. I think the error is due to the TCPClient not being disconnected, but I'm not sure how I should disable it. What is the proper way to test Twisted Service with a client in it?
Here's a test case:
from labrad.node import *
from twisted.trial import unittest
import os
from socket import gethostname
class NodeTestCase(unittest.TestCase):
def setUp(self):
import labrad
name = os.environ.get('LABRADNODE', gethostname()) + '_test'
self.node = Node(name,
labrad.constants.MANAGER_HOST,
labrad.constants.MANAGER_PORT)
self.node.startService()
#self.addCleanup(self.node.stopService)
def test_nothing(self):
self.assertEqual(3, 3)
def tearDown(self):
return self.node.stopService()
Here's the Node service itself:
class Node(MultiService):
"""Parent Service that keeps the node running.
If the manager is stopped or we lose the network connection,
this service attempts to restart it so that we will come
back online when the manager is back up.
"""
reconnectDelay = 10
def __init__(self, name, host, port):
MultiService.__init__(self)
self.name = name
self.host = host
self.port = port
def startService(self):
MultiService.startService(self)
self.startConnection()
def startConnection(self):
"""Attempt to start the node and connect to LabRAD."""
print 'Connecting to %s:%d...' % (self.host, self.port)
self.node = NodeServer(self.name, self.host, self.port)
self.node.onStartup().addErrback(self._error)
self.node.onShutdown().addCallbacks(self._disconnected, self._error)
self.cxn = TCPClient(self.host, self.port, self.node)
self.addService(self.cxn)
def stopService(self):
if hasattr(self, 'cxn'):
d = defer.maybeDeferred(self.cxn.stopService)
self.removeService(self.cxn)
del self.cxn
return defer.gatherResults([MultiService.stopService(self), d])
else:
return MultiService.stopService(self)
def _disconnected(self, data):
print 'Node disconnected from manager.'
return self._reconnect()
def _error(self, failure):
r = failure.trap(UserError)
if r == UserError:
print "UserError found!"
return None
print failure.getErrorMessage()
return self._reconnect()
def _reconnect(self):
"""Clean up from the last run and reconnect."""
## hack: manually clearing the dispatcher...
dispatcher.connections.clear()
dispatcher.senders.clear()
dispatcher._boundMethods.clear()
## end hack
if hasattr(self, 'cxn'):
self.removeService(self.cxn)
del self.cxn
reactor.callLater(self.reconnectDelay, self.startConnection)
print 'Will try to reconnect in %d seconds...' % self.reconnectDelay
source to share
You should refactor your service so that it can use something like MemoryReactor
from twisted.test.proto_helpers
(one open module per package twisted.test
, though hopefully it will eventually come out of twisted.test
).
The way to use it MemoryReactor
is to pass it to your code as a usable reactor. If you want to know what happens when the connection is successful, look at some of its public attributes - tcpClients
for connectTCP
, tcpServers
for listenTCP
, etc. Then your tests can pull the Factory instances that were passed to connectTCP
/ listenTCP
etc and call buildProtocol
on them and makeConnection
on the result. To get the implementation ITransport
you can use twisted.test.proto_helpers.StringTransportWithDisconnection
. You might even take a look at (private API! Be careful, it will crash without warning! Although it really should be public ) twisted.test.iosim.IOPump
to relay traffic between clients and servers.
If you really need to run entire systems without deterministic real-world testing, with all the complexity and random unrelated failures that imply, here's an article actually shutting down the client and server .
source to share
I had a similar problem trying to test an application instance. I ended up creating a base Python class that used the setUp and tearDown methods to start / stop the application.
from twisted.application.app import startApplication
from twisted.application.service import IServiceCollection
from twisted.internet.defer import inlineCallbacks
from twisted.trial import unittest
class MyTest(unittest.TestCase):
def setUp(self):
startApplication(self.app, save=False)
@inlineCallbacks
def tearDown(self):
sc = self.app.getComponent(IServiceCollection)
yield sc.stopService()
source to share