What's a good way to run periodic tasks in C # using Rx with one concurrent execution constraint?

I want to run periodic tasks with a constraint whereby only one method execution is performed at any given time.

I've experimented with Rx, but I'm not sure how to impose at most one concurrency constraint.

var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
timer.Subscribe(tick => DoSomething());

      


Also, if the task is still running, I want the subsequent schedule to go through. I don't want tasks to queue up and cause problems.

I have 2 such tasks to run periodically. Tasks in progress are currently synchronous. But I could make them asynchronous if needed.

+3


source to share


3 answers


You should have checked your code as it is, because this is exactly what Rx is overlaying already.

Try this as a test:

void Main()
{
    var timer = Observable.Interval(TimeSpan.FromMilliseconds(100));
    using (timer.Do(x => Console.WriteLine("!")).Subscribe(tick => DoSomething()))
    {
        Console.ReadLine();
    }
}

private void DoSomething()
{
    Console.Write("<");
    Console.Write(DateTime.Now.ToString("HH:mm:ss.fff"));
    Thread.Sleep(1000);
    Console.WriteLine(">");
}

      



When you run this, you get this output:

!
<16:54:57.111>
!
<16:54:58.112>
!
<16:54:59.113>
!
<16:55:00.113>
!
<16:55:01.114>
!
<16:55:02.115>
!
<16:55:03.116>
!
<16:55:04.117>
!
<16:55:05.118>
!
<16:55:06.119

      

It already ensures that there is no overlap.

+5


source


You are on the right track, you can use Select

+ Concat

to flatten the observable and limit the number of requests when illuminated (Note: if your task is taking longer than the interval, then they will start the stack as they cannot execute fast enough):



var source = Observable.Interval(TimeSpan.FromMilliseconds(100))
          //I assume you are doing async work since you want to limit concurrency
          .Select(_ => Observable.FromAsync(() => DoSomethingAsync()))
          //This is equivalent to calling Merge(1)
          .Concat();

source.Subscribe(/*Handle the result of each operation*/);

      

+2


source


Here is a factory function that does exactly what you are asking for.

public static IObservable<Unit> Periodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Concat(Observable.Return(Unit.Default).Delay(timeSpan).Repeat());
}

      

Here is a usage example

Periodic(TimeSpan.FromSeconds(1))
    .Subscribe(x =>
    {
        Console.WriteLine(DateTime.Now.ToString("mm:ss:fff"));
        Thread.Sleep(500);
    });

      

If you run this, each console print will be about 1.5 seconds long.

Note. If you don't want the first tick to fire right away, you could instead use this factory, which won't dispatch the first block until the time expires.

public static IObservable<Unit> DelayedPeriodic(TimeSpan timeSpan)
{
    return Observable.Return(Unit.Default).Delay(timeSpan).Repeat();
}

      

0


source







All Articles