Canceling a long Rx stream
If I have a long running thread, something like:
inputStream.Select(n => Task.Run(() =>
{
// Long running operation
Thread.Sleep(TimeSpan.FromSeconds(5));
return n * n;
}).ToObservable())
.Switch()
.Subscribe(result =>
{
// Use result in some way
Console.WriteLine(result);
});
How can I get the CancellationToken inside the call Task.Run
, so that when it Switch
distributes the in-flight checkout subscription, it cancels the CancellationToken as canceled, so I know to abort the checkout.
source to share
You can use a method Observable.StartAsync
like
inputStream.Select(n => Observable.StartAsync((token => Task.Run(() =>
{
if (token.IsCancellationRequested)
{
// .. don't need to do anything
return 0;
}
else
{
Thread.Sleep(TimeSpan.FromSeconds(1));
return n * n;
}
}))))
.Switch()
.Subscribe(Console.WriteLine);
Alternatively, if you will be creating multiple values, you can use the overload Observable.Create
that works with Task
to get the CancellationToken. For example.
inputStream.Select(n => Observable.Create<int>((observer, token) => Task.Run(() =>
{
while (!token.IsCancellationRequested)
{
Thread.Sleep(TimeSpan.FromSeconds(1));
observer.OnNext(n * n);
}
observer.OnCompleted();
})))
.Switch()
.Subscribe(Console.WriteLine);
Inside your task, you need to call OnNext
to create values. The task return value, if any, is ignored.
source to share