Reactive expansion events

I am receiving messages over UDP on multiple streams. After each appointment, I raise MessageReceived.OnNext(message)

.

Since I am using multiple threads, messages caused by out-of-order are the problem.

How can I order a message raise using the message counter? (let's say there is a message.counter property)

Keep in mind that the message might get lost in the message (say if we have a counter hole after the X messages that the hole is not filled, I bring up the following message)

Posts should be picked up as soon as possible (if next counter received)

+3


source to share


2 answers


When specifying the requirement to detect lost messages, you did not consider the possibility that the last message did not arrive; I added timeoutDuration

one that flushes buffered messages if nothing arrives at a given time - you might think about this error instead, see the comments for how to do this.

I will resolve this by defining an extension method with the following signature:

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    TimeSpan timeoutDuration = new TimeSpan(),
    int gapTolerance = 0)

      

  • source

    - stream of unsorted messages
  • keySelector

    is a function that extracts a key int

    from a message. I assume the first key requested is 0; change if necessary.
  • timeoutDuration

    discussed above, if omitted, no timeout
  • tolerance

    - the maximum number of messages held pending an out of order message. Pass 0

    to store any number of messages
  • scheduler

    - this is the scheduler to be used for timeout and provided for testing purposes, defaults to use if not specified.

Walkthrough

Below I will give a step-by-step guide to this. The complete implementation is repeated below.

Assign a default scheduler

First of all, we have to assign a default scheduler if none were provided:

scheduler = scheduler ?? Scheduler.Default;

      

Waiting time

Now, if a timeout is requested, we'll replace the source with a copy, which will just terminate and dispatch OnCompleted

if the message doesn't arrive at timeoutDuration

.

if(timeoutDuration != TimeSpan.Zero)
    source = source.Timeout(
        timeoutDuration,
        Observable.Empty<TSource>(),
        scheduler);

      

If you want to send TimeoutException

instead, just remove the second parameter Timeout

- empty stream to select the overload that does it. Please note that we can safely share this with all subscribers, so it is located outside of the call Observable.Create

.

Create a subscription handler

We use Observable.Create

to create our stream. The lambda function, which is an argument Create

, is called whenever a subscription occurs and we pass the calling observer ( o

). Create

returns ours IObservable<T>

, so we return it here.

return Observable.Create<TSource>(o => { ...

      

Initialize some variables

We will keep track of the next expected key value in nextKey

and create SortedDictionary

to store messages out of order until they are sent.

int nextKey = 0;  
var buffer = new SortedDictionary<int, TSource>();

      

Subscribe to the source and process messages

We can now subscribe to the message flow (possibly using a timeout). First, we introduce a handler OnNext

. The following message is assigned x

:

return source.Subscribe(x => { ...

      

We call a function keySelector

to extract the key from the message:



var key = keySelector(x);

      

If a message has an old key (because it has exceeded our out-of-order message tolerance), we are just going to reset it and do it with this message (you can act differently):

// drop stale keys
if(key < nextKey) return;

      

Otherwise, we may have the expected key, in which case we can increase the nextKey

send message:

if(key == nextKey)
{
    nextKey++;
    o.OnNext(x);                    
}

      

Or we may have a message about the future out of order, in which case we have to add it to our buffer. If we do this, we also need to make sure that our buffer has not exceeded our allowance for storing out of order messages - in which case, we will also encounter the nextKey

first key in the buffer, because it is SortedDictionary

usually the next bottom key:

else if(key > nextKey)
{
    buffer.Add(key, x);
    if(gapTolerance != 0 && buffer.Count > gapTolerance)
        nextKey = buffer.First().Key;
}

      

Now, regardless of the result above, we need to clear the buffer of any keys that are now ready to go. For this we use a helper method. Note that it is configurable nextKey

, so we must be careful to pass it by reference. We simply iterate over the buffer, reading, deleting, and sending messages as the keys follow each other, incrementing nextKey

each time:

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

      

Dealing with errors

Next, we supply a handler OnError

- it will just go through any error, including a timeout exception if you decide to go that route.

Buffer wash

Finally, we have to process OnCompleted

. Here I decided to free the buffer - this would be necessary if the message out of order delayed the messages and never came. This is why we need a timeout:

() => {
    // empty buffer on completion
    foreach(var item in buffer)
        o.OnNext(item.Value);                
    o.OnCompleted();
});

      

Full implementation

Here is the complete implementation.

public static IObservable<TSource> Sort<TSource>(
    this IObservable<TSource> source,
    Func<TSource, int> keySelector,
    int gapTolerance = 0,
    TimeSpan timeoutDuration = new TimeSpan(),
    IScheduler scheduler = null)
{       
    scheduler = scheduler ?? Scheduler.Default;

    if(timeoutDuration != TimeSpan.Zero)
        source = source.Timeout(
            timeoutDuration,
            Observable.Empty<TSource>(),
            scheduler);

    return Observable.Create<TSource>(o => {
        int nextKey = 0;  
        var buffer = new SortedDictionary<int, TSource>();

        return source.Subscribe(x => {
            var key = keySelector(x);

            // drop stale keys
            if(key < nextKey) return;

            if(key == nextKey)
            {
                nextKey++;
                o.OnNext(x);                    
            }
            else if(key > nextKey)
            {
                buffer.Add(key, x);
                if(gapTolerance != 0 && buffer.Count > gapTolerance)
                    nextKey = buffer.First().Key;
            }
            SendNextConsecutiveKeys(ref nextKey, o, buffer);
        },
        o.OnError,
        () => {
            // empty buffer on completion
            foreach(var item in buffer)
                o.OnNext(item.Value);                
            o.OnCompleted();
        });
    });
}

private static void SendNextConsecutiveKeys<TSource>(
    ref int nextKey,
    IObserver<TSource> observer,
    SortedDictionary<int, TSource> buffer)
{
    TSource x;
    while(buffer.TryGetValue(nextKey, out x))
    {
        buffer.Remove(nextKey);
        nextKey++;
        observer.OnNext(x);                        
    }
}

      

Wire harness

If you include nuget rx-testing

in a console application, the following task will run: test harness for the game:

public static void Main()
{
    var tests = new Tests();
    tests.Test();
}

public class Tests : ReactiveTest
{
    public void Test()
    {
        var scheduler = new TestScheduler();

        var xs = scheduler.CreateColdObservable(
            OnNext(100, 0),
            OnNext(200, 2),
            OnNext(300, 1),
            OnNext(400, 4),
            OnNext(500, 5),
            OnNext(600, 3),
            OnNext(700, 7),
            OnNext(800, 8),
            OnNext(900, 9),            
            OnNext(1000, 6),
            OnNext(1100, 12),
            OnCompleted(1200, 0));

        //var results = scheduler.CreateObserver<int>();

        xs.Sort(
            keySelector: x => x,
            gapTolerance: 2,
            timeoutDuration: TimeSpan.FromTicks(200),
            scheduler: scheduler).Subscribe(Console.WriteLine);

        scheduler.Start();
    }
}

      

Closing comments

There are all sorts of interesting alternative approaches here. I went for this largely imperative approach because I find it easiest to follow - but there are probably some fancy groupings you can use to do this. One thing I know to be a consistent truth about Rx is there are always many ways to dump a cat!

I also don't quite agree with the idea of ​​a timeout here - in a production system I would like to implement some means of checking connectivity like heartbeat or the like. I didn't get into this because obviously it will be an application specific. Also, heart beats have been discussed on these boards and elsewhere before ( like my blog, for example ).

+6


source


Think hard about using TCP instead if you want reliable ordering - what is it for; otherwise, you will be forced to play a guessing game with UDP, and sometimes you are wrong.

For example, imagine you receive the following datagrams in the following order: [A, B, D]

When you get D, how long should you wait for C to appear before pressing D?

Whichever duration you choose, you can be wrong:



  • What if C was lost during transmission and therefore never appears?
  • What if the duration you chose is too short and you end up pressing D and then getting C?

Perhaps you could choose the duration that works best heuristically, but why not just use TCP instead?

Side note:

MessageReceived.OnNext

implies that you are using Subject<T>

, which is probably not necessary. Try converting async methods UdpClient

to observables directly, or converting them instead by writing an asynchronous iterator with Observable.Create<T>(async (observer, cancel) => { ... })

.

+4


source







All Articles