Echo Server simultaneously with 1000 clients (lost messages + connecting to errors)

I am reading "Netty In Action V5". Reading to chapters 2.3 and 2.4, I tried with the EchoServer and EchoClient examples, when I tested one client connected to the server everything worked fine ... then I modified the example to allow multiple clients to connect to the server. My goal was to run a stresstest: 1000 clients will connect to the server, and each client will echo 100 messages to the server, and when all clients have finished, I get the total time of the whole process. The server was deployed on a Linux machine (VPS) and the clients were deployed on a window machine.

I got 2 problems when running stresstest:

Some clients received an error message:

java.io.IOException: An existing connection was forcibly closed by the remote host 
    at sun.nio.ch.SocketDispatcher.read0(Native Method)
    at sun.nio.ch.SocketDispatcher.read(SocketDispatcher.java:43)
    at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
    at sun.nio.ch.IOUtil.read(IOUtil.java:192)
    at sun.nio.ch.SocketChannelImpl.read(SocketChannelImpl.java:379)
    at io.netty.buffer.UnpooledUnsafeDirectByteBuf.setBytes(UnpooledUnsafeDirectByteBuf.java:447)
    at io.netty.buffer.AbstractByteBuf.writeBytes(AbstractByteBuf.java:881)
    at io.netty.channel.socket.nio.NioSocketChannel.doReadBytes(NioSocketChannel.java:242)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:119)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:511)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:468)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:382)\at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:354)
    at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:110)
    at io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:137)
    at java.lang.Thread.run(Thread.java:745)

      

But some clients didn't receive the message from the server

Working environment:

  • Netty-all-4.0.30.Final

  • JDK1.8.0_25

  • Echo clients have been deployed on Window 7 Ultimate

  • Echo Server has been deployed on Linux Centos 6

NettyClient class:

public class NettyClient {
    private Bootstrap bootstrap;
    private EventLoopGroup group;

    public NettyClient(final ChannelInboundHandlerAdapter handler) {
        group = new NioEventLoopGroup();
        bootstrap = new Bootstrap();
        bootstrap.group(group);
        bootstrap.channel(NioSocketChannel.class);
        bootstrap.handler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(String host, int port) throws Exception {
        bootstrap.remoteAddress(new InetSocketAddress(host, port));
        bootstrap.connect();
    }

    public void stop() {
        try {
            group.shutdownGracefully().sync();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

      

NettyServer class:

public class NettyServer {
    private EventLoopGroup parentGroup;
    private EventLoopGroup childGroup;
    private ServerBootstrap boopstrap;

    public NettyServer(final ChannelInboundHandlerAdapter handler) {
        parentGroup = new NioEventLoopGroup(300);
        childGroup = new NioEventLoopGroup(300);
        boopstrap = new ServerBootstrap();
        boopstrap.group(parentGroup, childGroup);
        boopstrap.channel(NioServerSocketChannel.class);
        boopstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel channel) throws Exception {
                channel.pipeline().addLast(handler);
            }
        });
    }

    public void start(int port) throws Exception {
        boopstrap.localAddress(new InetSocketAddress(port));
        ChannelFuture future = boopstrap.bind().sync();
        System.err.println("Start Netty server on port " + port);
        future.channel().closeFuture().sync();
    }

    public void stop() throws Exception {
        parentGroup.shutdownGracefully().sync();
        childGroup.shutdownGracefully().sync();
    }
}

      

EchoClient class

public class EchoClient {
    private static final String HOST = "203.12.37.22";
    private static final int PORT = 3344;
    private static final int NUMBER_CONNECTION = 1000;
    private static final int NUMBER_ECHO = 10;
    private static CountDownLatch counter = new CountDownLatch(NUMBER_CONNECTION);

    public static void main(String[] args) throws Exception {
        List<NettyClient> listClients = Collections.synchronizedList(new ArrayList<NettyClient>());
        for (int i = 0; i < NUMBER_CONNECTION; i++) {
            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        NettyClient client = new NettyClient(new EchoClientHandler(NUMBER_ECHO) {
                            @Override
                            protected void onFinishEcho() {
                                counter.countDown();
                                System.err.println((NUMBER_CONNECTION - counter.getCount()) + "/" + NUMBER_CONNECTION);
                            }
                        });
                        client.start(HOST, PORT);
                        listClients.add(client);
                    } catch (Exception ex) {
                        ex.printStackTrace();
                    }
                }
            }).start();
        }

        long t1 = System.currentTimeMillis();
        counter.await();
        long t2 = System.currentTimeMillis();
        System.err.println("Totla time: " + (t2 - t1));

        for (NettyClient client : listClients) {
            client.stop();
        }
    }

    private static class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {

        private static final String ECHO_MSG = "Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo Echo";
        private int numberEcho;
        private int curNumberEcho = 0;

        public EchoClientHandler(int numberEcho) {
            this.numberEcho = numberEcho;
        }

        @Override
        public void channelActive(ChannelHandlerContext ctx) throws Exception {
            ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
        }

        @Override
        protected void channelRead0(ChannelHandlerContext ctx, ByteBuf in) throws Exception {
            curNumberEcho++;
            if (curNumberEcho >= numberEcho) {
                onFinishEcho();
            } else {
                ctx.writeAndFlush(Unpooled.copiedBuffer(ECHO_MSG, CharsetUtil.UTF_8));
            }
        }

        protected void onFinishEcho() {

        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            cause.printStackTrace();
            ctx.close();
        }
    }
}

      

EchoServer class:

public class EchoServer {
    private static final int PORT = 3344;

    public static void main(String[] args) throws Exception {
        NettyServer server = new NettyServer(new EchoServerHandler());
        server.start(PORT);
        System.err.println("Start server on port " + PORT);
    }

    @Sharable
    private static class EchoServerHandler extends ChannelInboundHandlerAdapter {
        @Override
        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
            ctx.write(msg);
        }

        @Override
        public void channelReadComplete(ChannelHandlerContext ctx) throws Exception {
            ctx.flush();
        }

        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
            ctx.close();
        }
    }
}

      

+3


source to share


1 answer


You can change 2 things:

  • Create just one boot client and reuse it for all of your clients, not every client. So extract your download from the client side and only keep the connection as you did in the beginning. This will limit the number of threads inside.

  • Close the client side connection when the ping pong count is reached. Currently you are only making a call to the empty onFinishEcho method, which does not cause any crashes on the client side, so the client does not stop ... And so the channel does not close ...

Perhaps you have some limitation on the number of threads on the client side.



Another element may also arise: you are not specifying any codec (string codec or whatever), which may result in a partial transmission from the client or server, treated as a full response.

For example, you might have the first "Echo Echo Echo" block sending one packet containing the beginning of your buffer, while the other parts (more than "Echo") will be sent through later packets.

To prevent this, you must use a single codec so that your final handler receives the actual complete message, not the partial one. If not, you may run into other issues, such as a server side error trying to send an extra packet while the channel is being closed by the client earlier than expected ...

+1


source







All Articles