How to save data with each channel to NIO server

I have a Java NIO server that receives data from clients.

When the pipe is ready to read ie key.isReadable()

return true read(key)

is called to read data.

I am currently using a single read buffer for all channels and in the method read()

, I flush the buffer and read it and then finally put it into a byte array, believing that I will get all the data in one shot.

But let's say I am not getting full data in one shot (I have special characters when completing data to detect).

Problem:

So how do you store this partial data with a pipe, or how to deal with the partial read problem? or globally?

I read that the investment is not very good.

+3


source to share


2 answers


Take a look at the reactor template. Here is a link to a basic implementation by Professor Doug Lee:

http://gee.cs.oswego.edu/dl/cpjslides/nio.pdf

The idea is to have a single reactor thread that blocks when Selector is called. When I / O events are prepared, the reactor thread dispatches events to the appropriate handlers. In the above pdf, inside Reactor there is an inner receiver class that accepts new connections.

The author uses a single handler to read and write events, and maintains the state of that handler. I prefer to have separate read and write handlers, but this is not as easy to work with as a "state machine". There can only be one application per event, so some kind of injection is required to switch read / write handlers.

To maintain state between subsequent read / write, you need to do a couple of things:

  • Introduce a special protocol that tells you when a message is fully read.
  • Have a timeout or cleanup mechanism for stale connections
  • Maintaining client sessions

So, you can do something like this:

public class Reactor implements Runnable{

    Selector selector = Selector.open();

    ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();

    public Reactor(int port) throws IOException {

        serverSocketChannel.socket().bind(new InetSocketAddress(port));

        serverSocketChannel.configureBlocking(false);

        // let Reactor handle new connection events
        registerAcceptor();

    }

    /**
     * Registers Acceptor as handler for new client connections.
     * 
     * @throws ClosedChannelException
     */
    private void registerAcceptor() throws ClosedChannelException {


        SelectionKey selectionKey0 = serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);

        selectionKey0.attach(new Acceptor());
    }

    @Override
    public void run(){

        while(!Thread.interrupted()){

            startReactorLoop();

        }

    }

    private void startReactorLoop() {

        try {

            // wait for new events for each registered or new clients
            selector.select();

            // get selection keys for pending events
            Set<SelectionKey> selectedKeys = selector.selectedKeys();

            Iterator<SelectionKey> selectedKeysIterator = selectedKeys.iterator();

            while (selectedKeysIterator.hasNext()) {

                // dispatch even to handler for the given key
                dispatch(selectedKeysIterator.next());

                // remove dispatched key from the collection
                selectedKeysIterator.remove();
            }

        } catch (IOException e) {
            // TODO add handling of this exception
            e.printStackTrace();
        }
    }

    private void dispatch(SelectionKey interestedEvent) {

        if (interestedEvent.attachment() != null) {

            EventHandler handler = (EventHandler) interestedEvent.attachment();

            handler.processEvent();
        }

    }

    private class Acceptor implements EventHandler {

        @Override
        public void processEvent() {

            try {

                SocketChannel clientConnection = serverSocketChannel.accept();

                if (clientConnection != null) {

                    registerChannel(clientConnection);

                }

            } catch (IOException e) {e.printStackTrace();}

        }
    /**
     *  Save Channel - key association - in Map perhaps.
     * This is required for subsequent/partial reads/writes
     */
    private void registerChannel(SocketChannel clientChannel) {


        // notify injection mechanism of new connection (so it can activate Read Handler)
}

      

Once the read event is processed, notify the injection engine that can be injected into the write handler.

New instances of read and write handlers are created by the injection engine once, when a new connection is available. This injection mechanism switches handlers as needed. The search for handlers for each channel is carried out from the map, which is filled when a connection is received using the `registerChannel () method.

Read and write handlers have instances ByteBuffer

, and since each Socket has its own pair of handlers, you can now maintain state between partial reads and writes.



Two tips for improving productivity:

  • Try to read it first immediately when the connection is accepted. Only if you are not reading enough data, as defined by the header in your custom protocol, register the channel of interest to read events.

  • Try to write first without registering interest in write events, and only if you don't write all the data, register interest per write.

This will reduce the number of Selector awakenings.

Something like that:

SocketChannel socketChannel;

byte[] outData;

final static int MAX_OUTPUT = 1024;

ByteBuffer output = ByteBuffer.allocate(MAX_OUTPUT);

// if message was not written fully
if (socketChannel.write(output) < messageSize()) {

// register interest for write event
SelectionKey selectionKey = socketChannel.register(selector, SelectionKey.OP_WRITE); 
        selectionKey.attach(writeHandler);
        selector.wakeup();

      

}

Finally, a temporary task should be set that checks if the connections / SelectedKeys are saved. If the client breaks the TCP connection, the server usually doesn't know about it. As a result, there will be a number of event handlers in memory attached as attachments to legacy connections, leading to a memory leak.

This is why you might say the investment is not very good, but the problem can be solved.

There are two easy ways to deal with this:

  • TCP support can be enabled

  • A periodic task can check the label of the last activity on a given channel. If it is idle for longer, the server should terminate the connection.

+4


source


There's an ancient and very inaccurate NIO blog from someone in the Amazon that wrongly claims the key attachments are memory leaks. Complete and complete BS. Not even logical. This is also the one where he states that you need all sorts of additional queues. Never had to do that, in about 13 years NIO.

You need ByteBuffer

per channel or maybe two, one for reading and one for writing. You can save one file as the attachment itself: if you need two or have other data to store, you need to define a class Session

that contains both buffers and anything you want to bind to the channel, like client credentials and use object Session

as an attachment.



You really can't get very far in NIO with one buffer for all channels.

+1


source







All Articles