IObservable?

IObservable

?

. James , - , . , .

# F # ( FSharpx), , , Observables?.

, , , , F #? ( "RetryAfterDelay" ) ( RetryAfterDelay

) . , , . , , , .:)... , , ...

...

,

public enum EventTypeEnum
{
    None    = 0,
    Normal  = 1,
    Faulted = 2
}

public class Event
{
    public EventTypeEnum Type { get; set; }
} 

private static IObservable<int> FaultingSequence1()
{
    var subject = new ReplaySubject<int>();
    subject.OnNext(1);
    subject.OnNext(2);
    subject.OnError(new InvalidOperationException("Something went wrong!"));

    return subject;
}

private static IEnumerable<int> FaultingSequence2()
{                           
    for(int i = 0; i < 3; ++i)
    {
        yield return 1;
    }

    throw new InvalidOperationException("Something went wrong!");
}

//Additional pondering: Why isn't FaultingSequence2().ToObservable() too be procted by Catch?
//
//This part is for illustratory purposes here. This is the piece I'd like
//behave so that exceptions would get transformed to Events with EventTypeEnum.Faulted
//and passed along to the stream that been subscribed to while resubscribing to 
//FaultingSequence1. That is, the subscribed would learn about the fault through a
//domain event type.
//Retry does the resubscribing, but only on OnError.
var stream = FaultingSequence1().Catch<int, Exception>(ex =>
{
    Console.WriteLine("Exception: {0}", ex);
    return Observable.Throw<int>(ex);
}).Retry().Select(i => new Event { Type = EventTypeEnum.Normal });

//How to get this to print "Event type: Normal", "Event type: Normal", "Event type: Faulted"?
stream.Subscribe(i => Console.WriteLine("Event type: {0}", i.Type));

      

! ?

+3




1


Materialize

, Notification<T>

:

OnNext:
    OnNext a Notification<T> with Kind OnNext containing a value.

OnError:
    OnNext a Notification<T> with Kind OnError containing an exception.
    OnCompleted.

OnCompleted:
    OnNext a Notification<T> with Kind OnCompleted
    OnCompleted.

      

, - OnError OnCompleted, OnError . , - ...

source
    .Materialize()
    .Repeat();

      

, , ( OnCompleted).



, , , OnError , , OnError OnNext Notification<T>

. - :

source
    .Materialize()
    .SelectMany(notification => 
        notification.Kind == NotificationKind.OnError
            ? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
            : Observable.Return(notification)
    )
    .Retry();

      

, ( OnCompleted), .

, , , , :

source
    .Materialize()
    .SelectMany(notification => 
        notification.Kind == NotificationKind.OnError
            ? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
            : Observable.Return(notification)
    )
    .Retry()
    .Map(notification => {
        switch (notification.Kind) {
            case (NotificationKind.OnNext):      return // something.
            case (NotificationKind.OnError):     return // something.
            case (NotificationKind.OnCompleted): return // something.
            default: throw new NotImplementedException();
        }
    });

      

+4









All Articles