Canceling a long Rx stream

If I have a long running thread, something like:

inputStream.Select(n => Task.Run(() =>
{
    // Long running operation
    Thread.Sleep(TimeSpan.FromSeconds(5));

    return n * n;
}).ToObservable())
.Switch()
.Subscribe(result => 
{
    // Use result in some way
    Console.WriteLine(result);
});

      

How can I get the CancellationToken inside the call Task.Run

, so that when it Switch

distributes the in-flight checkout subscription, it cancels the CancellationToken as canceled, so I know to abort the checkout.

+3


source to share


1 answer


You can use a method Observable.StartAsync

like

inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
    if (token.IsCancellationRequested)
    {
        // .. don't need to do anything
        return 0;
    }
    else
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        return n * n;

    }                   
}))))
.Switch()
.Subscribe(Console.WriteLine);

      

Alternatively, if you will be creating multiple values, you can use the overload Observable.Create

that works with Task

to get the CancellationToken. For example.



inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
    while (!token.IsCancellationRequested)
    {
        Thread.Sleep(TimeSpan.FromSeconds(1));
        observer.OnNext(n * n);
    }

    observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);

      

Inside your task, you need to call OnNext

to create values. The task return value, if any, is ignored.

+3


source







All Articles