Observable for expected C # method

Observable.FromAsyncPattern can be used to create BeginX EndX style observable async methods.

I may be misunderstanding things, but is there a similar function to create observable from new style methods of asynchronous type, i.e. .. Stream.ReadAsync?

+3


source to share


2 answers


You can create IObservable<T>

from Task<T>

using ToObservable :



using System.Reactive.Threading.Tasks;

Stream s = ...;
IObservable<int> o = s.ReadAsync(buffer, offset, count).ToObservable();

      

+6


source


Note that Lee's answer is correct, but ultimately what I ended up with was using Observable.Create to read continuously from the stream, see below -



    public static IConnectableObservable<Command> GetReadObservable(this CommandReader reader)
    {

       return Observable.Create<Command>(async (subject, token) =>
        {


            try
            {

                while (true)
                {

                    if (token.IsCancellationRequested)
                    {
                        subject.OnCompleted();
                        return;
                    }

                    Command cmd = await reader.ReadCommandAsync();

                    subject.OnNext(cmd);

                }

            }

            catch (Exception ex)
            {
                try
                {
                    subject.OnError(ex);
                }
                catch (Exception)
                {
                    Debug.WriteLine("An exception was thrown while trying to call OnError on the observable subject -- means you're not catching exceptions everywhere");
                    throw;
                }
            }

        }).Publish();

    }

      

+2


source







All Articles