Rx to trigger an action after a specified time
I have a class that has a couple of bool properties and subscribes to an observable that supplies objects that contain various values ββ(at an indeterminate rate). For example:
bool IsActive {get; private set;}
bool IsBroken {get; private set;}
bool Status {get; private set;}
...
valueStream.GetValues().Subscribe(UpdateValues);
UpdateValues ββdoes some work based on the passed object. In particular, there is one value that I am interested in using for some specific logic. Let's call it obj.SpecialValue:
private void UpdateValues(MyObject obj)
{
...
Status = obj.SpecialValue;
...
}
Now, if IsActive is true and Status is false, I want to wait three seconds before setting IsBroken to true to give the thread a chance to return true obj.SpecialValue during this time. If within three seconds it returns true then I just do nothing, otherwise set IsBroken to true. When status or IsActive is updated, check again.
At first I had:
Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3)).Subscribe(SetIsBroken);
private void SetIsBroken()
{
IsBroken = IsActive && !Status;
}
but it does more checks than necessary. This really needs to be checked when the thread or IsActive is updated.
Any advice on how to do this correctly?
Use BehavourSubject<T>
to return properties
A useful idea for this problem is to support your properties with types BehaviorSubject<bool>
. They are useful for the dual purpose of both the active property and the stream of that property's values.
You can subscribe to them as observables, but you can also access their current value through a property Value
. You change them by sending the new value through OnNext
.
For example, we could do this:
private BehaviorSubject<bool> _isActive;
public bool IsActive
{
get { return _isActive.Value; }
set { _isActive.OnNext(value); }
}
By using this on all of your properties, it becomes a fairly simple exercise to view properties for the complex state you are declaring. Assuming that _status
and _isBroken
similarly implemented forgery, we can configure the subscription as follows:
Observable.CombineLatest(_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Where(p => p)
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3), scheduler)
.TakeUntil(_status.Where(st => st)))
.Subscribe(_ => _isBroken.OnNext(true));
Part line uses CombineLatest
and subscribes to streams _isActive
and _status
. It is emitted when any of these changes - and the result function sets true exactly when _isActive
true, and _status
false. DistinctUntilChanged()
prevents setting _isActive
and _status
values ββthey already have by starting a new timer.
Then we use Where
to filter only for that condition.
Now SelectMany
will take true values ββand project each of them into a stream that emits after 3 seconds using Timer
- but we use TakeUntil
to return this value in case it _status
becomes true. SelectMany
also flattens the stream of streams back to a single boolean stream.
Not sure here - you didn't mention this, but you might be wondering if _isActive
false should go, also stop the timer. If so, you can use Merge
to combine the clock for that and _status in TakeUntil
.
We can now subscribe to all of this to set _isBroken
true if this request fires, indicating that the timer has expired.
Note the argument scheduler
to Timer
- this exists, so we can pass in the test scheduler.
I'm not sure if I have written your logic correctly, but if I don't hope you can see how to amend it as needed.
Here's a complete example. Use the nuget package rx-testing
and it will run in LINQPad as written:
void Main()
{
var tests = new Tests();
tests.Test();
}
public class Foo
{
private BehaviorSubject<bool> _isActive;
private BehaviorSubject<bool> _isBroken;
private BehaviorSubject<bool> _status;
public bool IsActive
{
get { return _isActive.Value; }
set { _isActive.OnNext(value); }
}
public bool IsBroken { get { return _isBroken.Value; } }
public bool Status { get { return _status.Value; } }
public Foo(IObservable<MyObject> valueStream, IScheduler scheduler)
{
_isActive = new BehaviorSubject<bool>(false);
_isBroken = new BehaviorSubject<bool>(false);
_status = new BehaviorSubject<bool>(false);
// for debugging purposes
_isActive.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " IsActive: " + a));
_isBroken.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " IsBroken: " + a));
_status.Subscribe(a => Console.WriteLine(
"Time: " + scheduler.Now.TimeOfDay + " Status: " + a));
valueStream.Subscribe(UpdateValues);
Observable.CombineLatest(
_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Where(p => p)
.SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3),
scheduler)
.TakeUntil(_status.Where(st => st)))
.Subscribe(_ => _isBroken.OnNext(true));
}
private void UpdateValues(MyObject obj)
{
_status.OnNext(obj.SpecialValue);
}
}
public class MyObject
{
public MyObject(bool specialValue)
{
SpecialValue = specialValue;
}
public bool SpecialValue { get; set; }
}
public class Tests : ReactiveTest
{
public void Test()
{
var testScheduler = new TestScheduler();
var statusStream = testScheduler.CreateColdObservable<bool>(
OnNext(TimeSpan.FromSeconds(1).Ticks, false),
OnNext(TimeSpan.FromSeconds(3).Ticks, true),
OnNext(TimeSpan.FromSeconds(5).Ticks, false));
var activeStream = testScheduler.CreateColdObservable<bool>(
OnNext(TimeSpan.FromSeconds(1).Ticks, false),
OnNext(TimeSpan.FromSeconds(6).Ticks, true));
var foo = new Foo(statusStream.Select(b => new MyObject(b)), testScheduler);
activeStream.Subscribe(b => foo.IsActive = b);
testScheduler.Start();
}
}
Reply to comment
If you want isActive false to set isBroken false, then I think it adds the following so far:
isActive isStatus Action
T F Set Broken True after 3 seconds unless any other result occurs
T T Set Broken False immediately if not already false, cancel timer
F F Set Broken False immediately if not already false, cancel timer
F T Set Broken False immediately if not already false, cancel timer
In this case, use the following query:
Observable.CombineLatest(
_isActive,
_status,
(a,s) => a & !s).DistinctUntilChanged()
.Select(p => p ? Observable.Timer(TimeSpan.FromSeconds(3),
scheduler)
.Select(_ => true)
: Observable.Return(false))
.Switch()
.DistinctUntilChanged()
.Subscribe(res => _isBroken.OnNext(res));
Please note the changes:
-
SelectMany
is nowSelect
, which turns every event into- Timer that returns
true
after 3 seconds - Or immediate
false
- Timer that returns
- The result
Select
is a stream of bool: streamsIObservable<IObservable<bool>>
. We want any new thread to reflect the undo of any previous thread. This is what it will doSwitch
- smoothing the result in the process. - We now apply the second one
DistinctUntilChanged()
, as a canceled timer can result in two false values ββin the stream sequentially - Finally, we assign a bool value to
isBroken
.