Non-blocking reverse proxy with netty

I am trying to write a non-blocking proxy with netty 4.1. I have a "FrontHandler" that handles incoming connections and then a "BackHandler" that handles outgoing connections. I am following HexDumpProxyHandler ( https://github.com/netty/netty/blob/ed4a89082bb29b9e7d869c5d25d6b9ea8fc9d25b/example/src/main/java/io/netty/example/proxy/HexDumplerProxyFava )

In this code, I found:

@Override
public void channelRead(final ChannelHandlerContext ctx, Object msg) {
    if (outboundChannel.isActive()) {
        outboundChannel.writeAndFlush(msg).addListener(new ChannelFutureListener() {, I've seen:

      

Meaning that the incoming message is recorded only if the connection with the outgoing client is already ready. This is obviously not ideal in the case of an HTTP proxy, so I think that would be the best way to handle this.

I'm wondering if it is possible to disable automatic reads on the front-end connection (and only manually trigger reads once the outgoing client connection is ready) is a good option. Then I can enable autoRead on the child socket in the "channelActive" event of the backend handler. However, I'm not sure how many messages I receive in the handler for each read () call (using HttpDecoder, I assume I am receiving an initial HttpRequest, but I would really like to avoid receiving a subsequent HttpContent / LastHttpContent message until I do not manually run the read () function and enable autoRead on a pipe).

Another option is to use a Promise to get a channel from the ChannelPool client:

private void setCurrentBackend(HttpRequest request) {
    pool.acquire(request, backendPromise);

    backendPromise.addListener((FutureListener<Channel>) future -> {
        Channel c = future.get();
        if (!currentBackend.compareAndSet(null, c)) {
            pool.release(c);
            throw new IllegalStateException();
        }
    });
}

      

and then copy from entry to exit through this promise. For example:

private void handleLastContent(ChannelHandlerContext frontCtx, LastHttpContent lastContent) {
    doInBackend(c -> {
        c.writeAndFlush(lastContent).addListener((ChannelFutureListener) future -> {
            if (future.isSuccess()) {
                future.channel().read();
            } else {
                pool.release(c);
                frontCtx.close();
            }
        });
    });
}
private void doInBackend(Consumer<Channel> action) {
    Channel c = currentBackend.get();
    if (c == null) {
        backendPromise.addListener((FutureListener<Channel>) future -> action.accept(future.get()));
    } else {
        action.accept(c);
    }
}

      

but I'm not sure how good it is to keep the promise there forever and make all the recordings from "front" to "back" adding listeners to it. I'm also not sure how to fulfill the promise so that the operations are performed on the correct thread ... right now I'm using:

backendPromise = group.next().<Channel> newPromise(); // bad
// or
backendPromise = frontCtx.channel().eventLoop().newPromise(); // OK?

      

(where the group is the same eventLoopGroup used in the ServerBootstrap interface).

If they are not processed through the correct stream, I guess it would be problematic to have an "else {}" optimization in the doInBackend method to avoid using a Promise and write directly to the pipe.

+3


source to share


2 answers


The no-autoread method does not work on its own because HttpRequestDecoder generates multiple messages even if only one read () was done.



I solved it using CompletableFutures chaining.

+2


source


I was working on a similar proxy application based on the MQTT protocol. So it was mainly used to create a real time chat application. However, the application I had to develop was asynchronous in nature, so I naturally did not run into such a problem. Because if

outboundChannel.isActive() == false

      

then I can just store the messages in a queue or persistent db and then process them after the outgoing channel is out. However, since you are talking about an HTTP application, this means that the application is synchronous in nature, which means that the client cannot continue to send packets until the outgoing channel is up and running. So the option you propose is that the packet will only be read after the channel is active, and you can manually process messages by disabling automatic reading in ChannelConfig

.



However, I would suggest that you check if the outboundChannel is active or not. If the channel is active, send it forward for processing. If the channel is not active, you should reject the packet by sending a response similar to Error 404

Along with this, you should configure the client to re-send packets at regular intervals and handle accordingly what needs to be done if the channel takes too long to become active and readable. Manual channelRead processing is usually not preferred and is an anti-pattern. You must let Netty handle this in the most efficient way for you.

0


source







All Articles