Wait for the Rx watcher to exit without blocking

I have an application running an observable interval with multiple watchers. Every 0.5 seconds, the interval downloads some XML data from the web server and then observers do some application-specific processing on a background thread. Once the data is no longer needed, the subscribers and the observable interval will be removed, so observer OnNext / OnCompleted / OnError will no longer be raised. So far so good.

My problem: In some rare cases, it's possible that my OnNext observer is still working after calling Dispose! Before proceeding with further operations after disposal, I would like to make sure that OnNext is complete.

My current solution: I introduced a locker field in my observer class (see code). After scrapping, I am trying to acquire a lock and only continue after the lock has been purchased. While this solution works (?), It somehow just seems wrong to me.

Question: Is there a more elegant, more "Rx Way" to solve this problem?

using System;
using System.Collections.Generic;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace RxExperimental
{
    internal sealed class MyXmlDataFromWeb
    {
        public string SomeXmlDataFromWeb { get; set; }
    }

    internal sealed class MyObserver : IObserver<MyXmlDataFromWeb>
    {
        private readonly object _locker = new object();
        private readonly string _observerName;

        public MyObserver(string observerName) {
            this._observerName = observerName;
        }

        public object Locker {
            get { return this._locker; }
        }

        public void OnCompleted() {
            lock (this._locker) {
                Console.WriteLine("{0}: Completed.", this._observerName);
            }
        }

        public void OnError(Exception error) {
            lock (this._locker) {
                Console.WriteLine("{0}: An error occured: {1}", this._observerName, error.Message);
            }
        }

        public void OnNext(MyXmlDataFromWeb value) {
            lock (this._locker) {
                Console.WriteLine("  {0}: OnNext running on thread {1}... ", this._observerName, Thread.CurrentThread.ManagedThreadId);
                Console.WriteLine("  {0}: XML received: {1}", this._observerName, value.SomeXmlDataFromWeb);
                Thread.Sleep(5000); // simulate some long running operation
                Console.WriteLine("  {0}: OnNext running on thread {1}... Done.", this._observerName, Thread.CurrentThread.ManagedThreadId);
            }
        }
    }

    internal sealed class Program
    {
        private static void Main() {
            const int interval = 500;
            //
            var dataSource = Observable.Interval(TimeSpan.FromMilliseconds(interval), NewThreadScheduler.Default).Select(_ => {
                var data = new MyXmlDataFromWeb {
                    SomeXmlDataFromWeb = String.Format("<timestamp>{0:yyyy.MM.dd HH:mm:ss:fff}</timestamp>", DateTime.Now)
                };
                return data;
            }).Publish();
            //
            var observer1 = new MyObserver("Observer 1");
            var observer2 = new MyObserver("Observer 2");
            //
            var subscription1 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer1);
            var subscription2 = dataSource.ObserveOn(NewThreadScheduler.Default).Subscribe(observer2);
            //
            var connection = dataSource.Connect();
            //
            Console.WriteLine("Press any key to cancel ...");
            Console.ReadLine();
            //
            subscription1.Dispose();
            subscription2.Dispose();
            connection.Dispose();
            //
            lock (observer1.Locker) {
                Console.WriteLine("Observer 1 completed.");
            }
            lock (observer2.Locker) {
                Console.WriteLine("Observer 2 completed.");
            }
            //
            Console.WriteLine("Can only be executed, after all observers completed.");
        }
    }
}

      

+3


source to share


1 answer


Yes, there is a more Rx way to do this.

The first point to note is that unsubscribing from the observed stream is essentially independent of what is currently happening in the observer. There is no feedback. Since you have a requirement that you know definitively when the observations are over, you need to simulate this into your observable stream. In other words, instead of unsubscribing from the stream, you must end the stream so you can observe the event OnComplete

. In your case, you can use TakeUntil

to terminate an observable rather than unsubscribe it.

The second point is that your main program should watch when your "watcher" finishes its work. But since you've made your "observer" actual IObservable

, you really don't have a way to do it. This is a common source of confusion that I see when people first start using Rx. If you model your "observer" as another link in an observable chain, your main program can observe. Specifically, your "observer" is nothing more than a match operation (with side effects) that maps incoming Xml data to "made" messages.



So if you refactor your code you can get what you want ...

public class MyObserver
{
    private readonly string _name;

    public MyObserver(string name) { _name = name; }

    public IObservable<Unit> Handle(IObservable<MyXmlDataFromWeb source)
    {
        return source.Select(value =>
        {
            Thread.Sleep(5000); // simulate work
            return Unit.Default;
        });
    }
}

// main
var endSignal = new Subject<Unit>();
var dataSource = Observable
    .Interval(...)
    .Select(...)
    .TakeUntil(endSignal)
    .Publish();
var observer1 = new MyObserver("Observer 1");
var observer2 = new MyObserver("Observer 2");
var results1 = observer1.Handle(dataSource.ObserveOn(...));
var results2 = observer2.Handle(dataSource.ObserveOn(...));
// since you just want to know when they are all done
// just merge them.
// use ToTask() to subscribe them and collect the results
// as a Task
var processingDone = results1.Merge(results2).Count().ToTask();

dataSource.Connect();

Console.WriteLine("Press any key to cancel ...");
Console.ReadLine();

// end the stream
endSignal.OnNext(Unit.Default);

// wait for the processing to complete.
// use await, or Task.Result
var numProcessed = await processingDone;

      

+3


source







All Articles