Closing all open streams in GRPC-Java from the client end

I am using GRPC-Java 1.1.2. In an active GRPC session, I have several bidirectional streams open. Is there a way to clear them from the client end when the client disconnects? When I try to disconnect, I run the following search for a fixed number of times and then disconnect, but I see the following error on the server side (not sure if it is caused by another problem):

disconnect from client

while (!channel.awaitTermination(3, TimeUnit.SECONDS)) {
// check for upper bound and break if so
}
channel.shutdown().awaitTermination(3, TimeUnit.SECONDS);

      

server error

E0414 11:26:48.787276000 140735121084416 ssl_transport_security.c:439] SSL_read returned 0 unexpectedly.
E0414 11:26:48.787345000 140735121084416 secure_endpoint.c:185]        Decryption error: TSI_INTERNAL_ERROR

      

+3


source to share


2 answers


If you are using shutdownNow()

it will more actively disable the RPC streams you have. Also, awaitTermination()

you need to call shutdown()

or before calling shutdownNow()

.



However, the best solution would be to permanently end all your RPCs before closing the channel.

+3


source


If you want to close gRPC streams (backend or bdi) from the client end, you will have to attach the rpc call with the Context.CancellableContext

one found in the package io.grpc

.

Let's assume you have rpc:

service Messaging {
    rpc Listen (ListenRequest) returns (stream Message) {}
}

      



On the client side, you will handle it like this:

public class Messaging {
    private Context.CancellableContext mListenContext;

    private MessagingGrpc.MessagingStub getMessagingAsyncStub() {
    /* return your async stub */
    }

    public void listen(final ListenRequest listenRequest, final StreamObserver<Message> messageStream) {

        Runnable listenRunnable = new Runnable() {
            @Override
            public void run() {
                Messaging.this.getMessagingAsyncStub().listen(listenRequest, messageStream);
            }

        if (mListenContext != null && !mListenContext.isCancelled()) {
            Log.d(TAG, "listen: already listening");
            return;
        }

        mListenContext = Context.current().withCancellation();
        mListenContext.run(listenRunnable);
    }

    public void cancelListen() {
        if (mListenContext != null) {
            mListenContext.cancel(null);
            mListenContext = null;
        }
    }
}

      

The call cancelListen()

will emulate an error, "CANCELED", the connection will be closed, and onError

yours StreamObserver<Message> messageStream

will be called with a message with the option to add: "CANCELED".

+2


source







All Articles