How to limit the number of simultaneous connections in Twisted

so I have a twisted server that I created and I was wondering what is the best way to limit the number of concurrent connections?

Does my Factory have a None return in a better way? When I do this I am throwing a lot of exceptions like:

exceptions.AttributeError: 'NoneType' object has no attribute 'makeConnection'

      

I would like clients to just sit in the queue until the current connection number is returned, but I don't know how to do this asynchronously.

I am currently using my Factory, like this:

class HandleClientFactory(Factory):

    def __init__(self):
            self.numConnections = 0 

    def buildProtocol(self, addr):
            #limit connection number here
            if self.numConnections >= Max_Clients:
                    logging.warning("Reached maximum Client connections")
                    return None

            return HandleClient(self)

      

which works but trips rather than waits and also throws a lot of unhandled errors.

+3


source to share


1 answer


You have to build it yourself. Luckily the chunks are mostly meant for this (you could perhaps ask for a few more appropriate items, but ...)

First, to avoid AttributeError

(which actually causes the connection to close), be sure to return the provider IProtocol

from your method buildProtocol

.

class DoesNothing(Protocol):
    pass

class YourFactory(Factory):
    def buildProtocol(self, addr):
        if self.currentConnections < self.maxConnections:
            return Factory.buildProtocol(self, addr)
        protocol = DoesNothing()
        protocol.factory = self
        return protocol

      

If you use this factory (filling in the missing parts - for example, initializing maxConnections

and therefore tracking currentConnections

correctly), you will find that clients that connect after the limit is reached get the value DoesNothing

. They can send as much data as they like with this protocol. He will drop it all. He will never send them any data. It will leave the connection open until it closes it. In short, it doesn't do anything.

However, you also wanted customers to actually receive the service when the number of connections dropped below the limit.

To do this, you need a few more parts:

  • You should keep any data they might send buffered so it's readable when you're ready to read it.
  • You need to keep track of connections so you can service them when the time is ripe.
  • You must start serving them at the specified time.

For the first one, you can use the "pause" function of most transports:

class PauseTransport(Protocol):
    def makeConnection(self, transport):
        transport.pauseProducing()

class YourFactory(Factory):
    def buildProtocol(self, addr):
        if self.currentConnections < self.maxConnections:
            return Factory.buildProtocol(self, addr)
        protocol = PauseTransport()
        protocol.factory = self
        return protocol

      

PauseTransport

similar to DoesNothing

, but with a minor (and useful) difference, that as soon as it is associated with a transport, it reports that the transport is paused. This way, no data will ever be read from the connection, and all of it will be buffered when you're ready to do so.

There are many possible solutions for the next requirement. One of the simplest is to use a factory as a store:



class PauseAndStoreTransport(Protocol):
    def makeConnection(self, transport):
        transport.pauseProducing()
        self.factory.addPausedTransport(transport)

class YourFactory(Factory):
    def buildProtocol(self, addr):
        # As above
        ...

    def addPausedTransport(self, transport):
        self.transports.append(transport)

      

Again, with proper configuration (for example, initialize the attribute transports

), you now have a list of all transports that correspond to the connections you accepted above the concurrency prefix that are waiting to be served.

For the latter requirement, all that is needed is to create and initialize a protocol that is truly capable of serving your customers. Activation is simple (this is your protocol, you probably know how it works). Initialization is largely about calling a method makeConnection

:

class YourFactory(Factory):
    def buildProtocol(self, addr):
        # As above
        ...
    def addPausedTransport(self, transport):
        # As above
        ...
    def oneConnectionDisconnected(self)
        self.currentConnections -= 1
        if self.currentConnections < self.maxConnections:
            transport = self.transports.pop(0)
            protocol = self.buildProtocol(address)
            protocol.makeConnection(transport)
            transport.resumeProducing()

      

I've omitted the details of the traceback address

argument required buildProtocol

(since it's transport

implemented from its point of origin in this part of the program, it should be clear how to do something similar to the original address value if your program really wants it).

Also, whatever happens here is to take the next transport in the queue (you can use a different scheduling algorithm if you want, like LIFO) and connect it to the protocol of your choice like Twisted would. Finally, you will undo the previous pause operation so that the data starts flowing.

Or ... almost. This would be nice, unless Twisted transports actually showed some way to change what protocol they deliver data to. Thus, as recorded, data from clients will actually be delivered to the original protocol instance PauseAndStoreTransport

. You can hack that (and hack is definitely the right word). Store both transport and PauseAndStoreTransport

instance in a list on the factory, then:

    def oneConnectionDisconnected(self)
        self.currentConnections -= 1
        if self.currentConnections < self.maxConnections:
            originalProtocol, transport = self.transports.pop(0)
            newProtocol = self.buildProtocol(address)

            originalProtocol.dataReceived = newProtocol.dataReceived
            originalProtocol.connectionLost = newProtocol.connectionLost

            newProtocol.makeConnection(transport)
            transport.resumeProducing()

      

Now the object that the transport wants to call the on methods has replaced its methods with those on which the object you want the methods to be called. Again, this is clearly a hack. You can probably put together something less hacky (like a third protocol class that explicitly supports delegation to another protocol). The idea will be the same - it will simply be worn on your keyboard. For what it's worth, I suspect it might be easier and less typing to do something like this using Tubes , but I'll leave the attempt at a solution based on this library for someone else.

I avoided solving the problem of keeping currentConnections

properly updated. Since you already have it numConnections

in your question, I assume that you know how to manage this part. All I did in the last step was to assume that the way you do the decrement step is called oneConnectionDisconnected

on the factory.

I also avoided referring to the event that the queue in the queue gets boring and leaves. This will basically work as written - Twisted will not notice that the connection has been closed until you call it resumeProducing

, and then connectionLost

gets called in your application protocol. This should be fine as your protocol should handle lost connections anyway.

+5


source







All Articles