Is there something like ThrottleOrMax in rx?
Use case: I am writing a thing that tracks changes and saves automatically. I want Throttle so that I don't save more often than every five seconds. I want to save every 30 seconds if there is a continuous stream of changes.
Couldn't find observable .Throttle (mergeTime, maxTime) in the docs and could only think of ugly ways to write my own, so from this question.
source to share
Here's how you can do it with GroupByUntil
:
public static IObservable<T> ThrottleWithMax_GroupBy<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
return source
.GroupByUntil(
t => 0, // they all get the same key
t => t, // the element is the element
g =>
{
// expire the group when it slows down for throttle
// or when it exceeds maxTime
return g
.Throttle(throttle, scheduler ?? Scheduler.Default)
.Timeout(maxTime, Observable.Empty<T>(), scheduler ?? Scheduler.Default);
})
.SelectMany(g => g.LastAsync());
}
And here's a way to do it with Window
:
public static IObservable<T> ThrottleWithMax_Window<T>(this IObservable<T> source, TimeSpan throttle, TimeSpan maxTime, IScheduler scheduler = null)
{
return source.Publish(p => p
.Window(() =>
{
// close the window when p slows down for throttle
// or when it exceeds maxTime.
// do not start throttling or the maxTime timer
// until the first p of the new window arrives
var throttleTimer = p.Throttle(throttle, scheduler ?? Scheduler.Default);
var timeoutTimer = p.Delay(maxTime, scheduler ?? Scheduler.Default);
// signal when either timer signals
return throttleTimer.Amb(timeoutTimer);
})
.SelectMany(w => w.TakeLast(1)));
}
Here is an interactive marble chart (drag the input marbles around):
Examples["throttleWithMax"] = {
category: "Custom",
label: "throttleWithMax(5, 10)",
inputs: [
[1, 4, 8, 12, 20, 24, 28, 50].map(function(i) {
return {
d: i,
t: i
};
}).concat([55])
],
apply: function(inputs, scheduler, Rx) {
Rx.Observable.prototype.throttleWithMax = function(throttle, maxTime, scheduler) {
var s = scheduler || Rx.Scheduler.timeout;
return this
.publish(function(p) {
return p
.window(function() {
var throttleTimer = p.debounce(throttle, s);
var timeoutTimer = p.delay(maxTime, s);
return Rx.Observable.amb(throttleTimer, timeoutTimer);
})
.flatMap(function(w) {
return w.takeLast(1);
});
});
};
return inputs[0].throttleWithMax(5, 10, scheduler);
}
};
var d = document.createElement("div");
document.body.appendChild(d);
d.innerHTML = '<rx-marbles key="throttleWithMax"></rx-marbles>';
<script src="http://bman654.github.io/samples/rxmarbles-old/element.js"></script>
<!--[if lt IE 7]>
<p class="browsehappy">You are using an <strong>outdated</strong> browser. Please <a href="http://browsehappy.com/">upgrade your browser</a> to improve your experience.</p>
<![endif]-->
And here is a unit test that uses TestScheduler
to manage the clock and extract the randomness of the system clock from it:
private const int _THROTTLE = 50;
private const int _TIMEOUT = 100;
private const int _COMPLETE = 100000;
[TestCase("groupby", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "g1")]
[TestCase("groupby", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "g2")]
[TestCase("groupby", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "g3")]
[TestCase("window", new[] { 1, 10 }, new[] { 10 }, new[] { 10 + _THROTTLE }, TestName = "w1")]
[TestCase("window", new[] { 1, 10, 40, 60 }, new[] { 60 }, new[] { 1 + _TIMEOUT }, TestName = "w2")]
[TestCase("window", new[] { 1, 45, 1000, 1040, 1080, 1110, }, new[] { 45, 1080, 1110 }, new[] { 45 + _THROTTLE, 1000 + _TIMEOUT, 1110 + _THROTTLE }, TestName = "w3")]
public void Throttle(string which, int[] pattern, int[] expectedPattern, int[] expectedTimes)
{
var scheduler = new TestScheduler();
var completeEvent = new[] { ReactiveTest.OnCompleted(_COMPLETE, _COMPLETE) };
var source = scheduler.CreateColdObservable(pattern.Select(v => ReactiveTest.OnNext(v, v)).Concat(completeEvent).ToArray());
var throttled = source.ThrottleWithMax(which, TimeSpan.FromTicks(_THROTTLE), TimeSpan.FromTicks(_TIMEOUT), scheduler);
var observer = scheduler.CreateObserver<int>();
throttled.Subscribe(observer);
// start the clock
scheduler.Start();
// check the results
var expected = expectedPattern.Zip(expectedTimes, (v, t) => ReactiveTest.OnNext(t, v)).Concat(completeEvent).ToList();
CollectionAssert.AreEqual(expected, observer.Messages);
}
Here's the complete unit test code.
source to share