Is there something like ThrottleOrMax in rx?

Use case: I am writing a thing that tracks changes and saves automatically. I want Throttle so that I don't save more often than every five seconds. I want to save every 30 seconds if there is a continuous stream of changes.

Couldn't find observable .Throttle (mergeTime, maxTime) in the docs and could only think of ugly ways to write my own, so from this question.


Here's how you can do it with GroupByUntil


public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
    return source
            t => 0, // they all get the same key
            t => t, // the element is the element
            g =>
                // expire the group when it slows down for throttle
                // or when it exceeds maxTime
                return g
                    .Throttle(throttle, scheduler ?? Scheduler.Default)
                    .Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default);
        .SelectMany(g => g.LastAsync());


And here's a way to do it with Window


public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
    return source.Publish(p => p
            .Window(() =>
                // close the window when p slows down for throttle
                // or when it exceeds maxTime.
                // do not start throttling or the maxTime timer
                // until the first p of the new window arrives
                var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default);
                var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default);
                // signal when either timer signals
                return throttleTimer.Amb(timeoutTimer);
            .SelectMany(w => w.TakeLast(1)));


Here is an interactive marble chart (drag the input marbles around):

Examples["throttleWithMax"] = {
  category: "Custom",
  label: "throttleWithMax(5, 10)",
  inputs: [
    [1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {
      return {
        d: i,
        t: i
  apply: function(inputs, scheduler, Rx) {
    Rx.Observable.prototype.throttleWithMax = function(throttle, maxTime, scheduler) {
      var s = scheduler || Rx.Scheduler.timeout;
      return this
        .publish(function(p) {
          return p
            .window(function() {
              var throttleTimer = p.debounce(throttle, s);
              var timeoutTimer = p.delay(maxTime, s);
              return Rx.Observable.amb(throttleTimer, timeoutTimer);
            .flatMap(function(w) {
              return w.takeLast(1);

    return inputs[0].throttleWithMax(5, 10, scheduler);

var d = document.createElement("div");
d.innerHTML = '<rx-marbles key="throttleWithMax"></rx-marbles>';

And here is a unit test that uses TestScheduler

to manage the clock and extract the randomness of the system clock from it:

private const int _THROTTLE = 50;
private const int _TIMEOUT = 100;
private const int _COMPLETE = 100000;
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")]
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")]
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")]
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")]
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")]
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")]
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes)
    var scheduler = new TestScheduler();
    var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) };
    var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray());
    var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler);
    var observer = scheduler.CreateObserver<int>();

    // start the clock

    // check the results
    var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList();
    CollectionAssert.AreEqual(expected, observer.Messages);


Here's the complete unit test code.



