Throttle RX task based on CPU usage

I have a multi-year task (creating textures from depth images from Kinect One) that is being implemented using Reactive Extensions. Its essence is indicated below:

kinectWrapper.DepthFrames
    .ObserveOn(new EventLoopScheduler())
    .Select(f => do some CPU intensive data manipulation to create the color texture I want)
    .Subscribe(colorFrame => fill texture on GPU)

      

The problem is that both selection and subscription are quite heavy on the system and won't work at full speed. I was able to get it to run at an acceptable speed on my development PC with help .Sample(TimeSpan.FromMilliseconds(100))

, but I'd rather lower the frame rate based on CPU usage.

I think there are two possibilities:

  • Create a secondary IObservable as an input to Sample that dynamically throttles the main event loop.
  • Write my own IScheduler that runs scheduled tasks when it loads with tasks.
+3


source to share


1 answer


The solution could be achieved by changing the behavior of the extension method found here: http://rxx.codeplex.com/workitem/20724

Below is an example. In this case, I changed the behavior so that the extension method limits the number of notifications in the queue, discarding the oldest until the queue size is acceptable.

To suit your requirements, you can change this to drop certain notifications based on CPU metrics, which you can read using the System.Diagnostics.PerformanceCounter class.



However, you can also try to distract yourself from such specific details, perhaps you can use the extension method below with a scheduler that uses a low priority thread.

This means that notifications are more likely to be dropped when the processor is busy.

kinectWrapper.DepthFrames.ThrottledObserveOn(
    new EventLoopScheduler(start => new Thread(start) {Priority = ThreadPriority.Lowest, IsBackground = true}),
    5).Select(...

public static IObservable<TSource> ThrottledObserveOn<TSource>(
    this IObservable<TSource> source,
    IScheduler scheduler,
    int maximumQueuedNotifications)
{
    Contract.Requires(source != null);
    Contract.Requires(scheduler != null);
    Contract.Requires(maximumQueuedNotifications >= 0);

    return Observable.Create<TSource>(observer =>
    {
        var notificationsGate = new object();
        var acceptingNotification = false;
        var nextNotifications = new Queue<Notification<TSource>>();
        Notification<TSource> completionNotification = null;
        var schedulerDisposable = new MultipleAssignmentDisposable();

        var subscriptionDisposable = source.Materialize().Subscribe(notification =>
        {
            bool startAcceptingNotifications;

            lock (notificationsGate)
            {
                startAcceptingNotifications = !acceptingNotification;
                acceptingNotification = true;

                if (notification.Kind == NotificationKind.OnNext)
                {
                    nextNotifications.Enqueue(notification);
                }
                else
                {
                    completionNotification = notification;
                }
            }

            if (startAcceptingNotifications)
            {
                schedulerDisposable.Disposable = scheduler.Schedule(rescheduleAction =>
                {
                    Notification<TSource> notificationToAccept;
                    lock (notificationsGate)
                    {
                        if (nextNotifications.Any())
                        {
                            do
                            {
                                notificationToAccept = nextNotifications.Dequeue();
                            }
                            while (nextNotifications.Count > maximumQueuedNotifications);
                        }
                        else
                        {
                            notificationToAccept = completionNotification;
                            completionNotification = null;
                        }
                    }

                    notificationToAccept.Accept(observer);

                    bool continueAcceptingNotification;

                    lock (notificationsGate)
                    {
                        continueAcceptingNotification = acceptingNotification = nextNotifications.Any() || completionNotification != null;
                    }

                    if (continueAcceptingNotification)
                    {
                        rescheduleAction();
                    }
                });
            }
        });
        return new CompositeDisposable(subscriptionDisposable, schedulerDisposable);
    });
}

      

+2


source







All Articles