Rx to trigger an action after a specified time

I have a class that has a couple of bool properties and subscribes to an observable that supplies objects that contain various values ​​(at an indeterminate rate). For example:

bool IsActive {get; private set;}
bool IsBroken {get; private set;}
bool Status {get; private set;}
...
valueStream.GetValues().Subscribe(UpdateValues);

      

UpdateValues ​​does some work based on the passed object. In particular, there is one value that I am interested in using for some specific logic. Let's call it obj.SpecialValue:

private void UpdateValues(MyObject obj)
{
  ...
  Status = obj.SpecialValue;
  ...
}

      

Now, if IsActive is true and Status is false, I want to wait three seconds before setting IsBroken to true to give the thread a chance to return true obj.SpecialValue during this time. If within three seconds it returns true then I just do nothing, otherwise set IsBroken to true. When status or IsActive is updated, check again.

At first I had:

Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(3)).Subscribe(SetIsBroken);

private void SetIsBroken()
{
  IsBroken = IsActive && !Status;
}

      

but it does more checks than necessary. This really needs to be checked when the thread or IsActive is updated.

Any advice on how to do this correctly?

+3


source to share


1 answer


Use BehavourSubject<T>

to return properties

A useful idea for this problem is to support your properties with types BehaviorSubject<bool>

. They are useful for the dual purpose of both the active property and the stream of that property's values.

You can subscribe to them as observables, but you can also access their current value through a property Value

. You change them by sending the new value through OnNext

.

For example, we could do this:

private BehaviorSubject<bool> _isActive;

public bool IsActive
{
    get { return _isActive.Value; }
    set { _isActive.OnNext(value); }
}

      

By using this on all of your properties, it becomes a fairly simple exercise to view properties for the complex state you are declaring. Assuming that _status

and _isBroken

similarly implemented forgery, we can configure the subscription as follows:

Observable.CombineLatest(_isActive,
                         _status,
                        (a,s) => a & !s).DistinctUntilChanged()
          .Where(p => p)
          .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3), scheduler)
                                     .TakeUntil(_status.Where(st => st)))
          .Subscribe(_ => _isBroken.OnNext(true));  

      

Part line uses CombineLatest

and subscribes to streams _isActive

and _status

. It is emitted when any of these changes - and the result function sets true exactly when _isActive

true, and _status

false. DistinctUntilChanged()

prevents setting _isActive

and _status

values ​​they already have by starting a new timer.

Then we use Where

to filter only for that condition.

Now SelectMany

will take true values ​​and project each of them into a stream that emits after 3 seconds using Timer

- but we use TakeUntil

to return this value in case it _status

becomes true. SelectMany

also flattens the stream of streams back to a single boolean stream.

Not sure here - you didn't mention this, but you might be wondering if _isActive

false should go, also stop the timer. If so, you can use Merge

to combine the clock for that and _status in TakeUntil

.

We can now subscribe to all of this to set _isBroken

true if this request fires, indicating that the timer has expired.



Note the argument scheduler

to Timer

- this exists, so we can pass in the test scheduler.

I'm not sure if I have written your logic correctly, but if I don't hope you can see how to amend it as needed.

Here's a complete example. Use the nuget package rx-testing

and it will run in LINQPad as written:

void Main()
{
    var tests = new Tests();
    tests.Test();
}

public class Foo
{
    private BehaviorSubject<bool> _isActive;
    private BehaviorSubject<bool> _isBroken;
    private BehaviorSubject<bool> _status;

    public bool IsActive
    {
        get { return _isActive.Value; }
        set { _isActive.OnNext(value); }
    }

    public bool IsBroken { get { return _isBroken.Value; } }
    public bool Status { get { return _status.Value; } }

    public Foo(IObservable<MyObject> valueStream, IScheduler scheduler)
    {
        _isActive = new BehaviorSubject<bool>(false);
        _isBroken = new BehaviorSubject<bool>(false);
        _status = new BehaviorSubject<bool>(false);

        // for debugging purposes
        _isActive.Subscribe(a => Console.WriteLine(
             "Time: " + scheduler.Now.TimeOfDay + " IsActive: " + a));
        _isBroken.Subscribe(a => Console.WriteLine(
             "Time: " + scheduler.Now.TimeOfDay + " IsBroken: " + a));
        _status.Subscribe(a => Console.WriteLine(
              "Time: " + scheduler.Now.TimeOfDay + " Status: " + a));

        valueStream.Subscribe(UpdateValues);

        Observable.CombineLatest(
                    _isActive,
                    _status,
                    (a,s) => a & !s).DistinctUntilChanged()
                .Where(p => p)
                .SelectMany(_ => Observable.Timer(TimeSpan.FromSeconds(3),
                                                  scheduler)
                                           .TakeUntil(_status.Where(st => st)))
                .Subscribe(_ => _isBroken.OnNext(true));            
    }

    private void UpdateValues(MyObject obj)
    {
        _status.OnNext(obj.SpecialValue);
    }   
}   

public class MyObject
{
    public MyObject(bool specialValue)
    {
        SpecialValue = specialValue;
    }

    public bool SpecialValue { get; set; }
}

public class Tests : ReactiveTest
{
    public void Test()
    {
        var testScheduler = new TestScheduler();

        var statusStream = testScheduler.CreateColdObservable<bool>(
            OnNext(TimeSpan.FromSeconds(1).Ticks, false),
            OnNext(TimeSpan.FromSeconds(3).Ticks, true),
            OnNext(TimeSpan.FromSeconds(5).Ticks, false));

        var activeStream = testScheduler.CreateColdObservable<bool>(
            OnNext(TimeSpan.FromSeconds(1).Ticks, false),
            OnNext(TimeSpan.FromSeconds(6).Ticks, true));           

        var foo = new Foo(statusStream.Select(b => new MyObject(b)), testScheduler);

        activeStream.Subscribe(b => foo.IsActive = b);       

        testScheduler.Start();               
    }        
}

      

Reply to comment

If you want isActive false to set isBroken false, then I think it adds the following so far:

isActive isStatus Action
T        F        Set Broken True after 3 seconds unless any other result occurs
T        T        Set Broken False immediately if not already false, cancel timer
F        F        Set Broken False immediately if not already false, cancel timer
F        T        Set Broken False immediately if not already false, cancel timer

      

In this case, use the following query:

Observable.CombineLatest(
            _isActive,
            _status,
            (a,s) => a & !s).DistinctUntilChanged()                    
        .Select(p => p ? Observable.Timer(TimeSpan.FromSeconds(3),
                                               scheduler)
                                   .Select(_ => true)
                       : Observable.Return(false))
        .Switch()
        .DistinctUntilChanged()
        .Subscribe(res => _isBroken.OnNext(res));

      

Please note the changes:

  • SelectMany

    is now Select

    , which turns every event into
    • Timer that returns true

      after 3 seconds
    • Or immediate false

  • The result Select

    is a stream of bool: streams IObservable<IObservable<bool>>

    . We want any new thread to reflect the undo of any previous thread. This is what it will do Switch

    - smoothing the result in the process.
  • We now apply the second one DistinctUntilChanged()

    , as a canceled timer can result in two false values ​​in the stream sequentially
  • Finally, we assign a bool value to isBroken

    .
+1


source







All Articles