C # GRPC asynchronous response stream

How do I generate the values โ€‹โ€‹of the response streams for RPC from outside the handler? (specifically from IObservable), I do the following, but it creates cross-threading problems because it AnRxObservable

is shared with RPC handlers ...

public override Task GetTicker(RequestProto request, ServerCallContext context)
{
    var subscription = AnRxObservable.Subscribe(value =>
    {
        responseStream.WriteAsync(new ResponseProto
        {
            Value = value
        });
    });

    // Wait for the RPC to be canceled (my extension method
    // that returns a task that completes when the CancellationToken
    // is cancelled)
    await context.CancellationToken.WhenCancelled();

    // Dispose of the buffered stream
    bufferedStream.Dispose();

    // Dispose subscriber (tells rx that we aren't subscribed anymore)
    subscription.Dispose();

    return Task.FromResult(1);
}

      

This code doesn't feel good ... but I don't see any other way to stream RPC responses from a shared source created outside of the RPC handler.

+3


source to share


1 answer


Generally speaking, when you are trying to convert from a push (IObservable) model to a pull model (enumerating the responses to write and write them), you need an intermediate buffer for the message - for example, blockingQueue. The body of the handler can be an asynchronous loop that tries to get the next message for the queue (preferably in asynchronous mode) and writes it to responseStream.

Also, keep in mind that the gRPC API only allows you to have one response in flight at any given time - and your snippet does not honor that. Therefore, you need to wait for WriteAsync () before starting another write (this is another reason why you need an intermediate queue).



This link might be helpful in explaining the push vs pull paradigms: When to use IEnumerable vs IObservable?

+2


source







All Articles