IObservable?
IObservable
?
. James , - , . , .
# F # ( FSharpx), , , Observables?.
, , , , F #? ( "RetryAfterDelay" ) ( RetryAfterDelay
) . , , . , , , .:)... , , ...
...
,
public enum EventTypeEnum
{
None = 0,
Normal = 1,
Faulted = 2
}
public class Event
{
public EventTypeEnum Type { get; set; }
}
private static IObservable<int> FaultingSequence1()
{
var subject = new ReplaySubject<int>();
subject.OnNext(1);
subject.OnNext(2);
subject.OnError(new InvalidOperationException("Something went wrong!"));
return subject;
}
private static IEnumerable<int> FaultingSequence2()
{
for(int i = 0; i < 3; ++i)
{
yield return 1;
}
throw new InvalidOperationException("Something went wrong!");
}
//Additional pondering: Why isn't FaultingSequence2().ToObservable() too be procted by Catch?
//
//This part is for illustratory purposes here. This is the piece I'd like
//behave so that exceptions would get transformed to Events with EventTypeEnum.Faulted
//and passed along to the stream that been subscribed to while resubscribing to
//FaultingSequence1. That is, the subscribed would learn about the fault through a
//domain event type.
//Retry does the resubscribing, but only on OnError.
var stream = FaultingSequence1().Catch<int, Exception>(ex =>
{
Console.WriteLine("Exception: {0}", ex);
return Observable.Throw<int>(ex);
}).Retry().Select(i => new Event { Type = EventTypeEnum.Normal });
//How to get this to print "Event type: Normal", "Event type: Normal", "Event type: Faulted"?
stream.Subscribe(i => Console.WriteLine("Event type: {0}", i.Type));
! ?
Materialize
, Notification<T>
:
OnNext:
OnNext a Notification<T> with Kind OnNext containing a value.
OnError:
OnNext a Notification<T> with Kind OnError containing an exception.
OnCompleted.
OnCompleted:
OnNext a Notification<T> with Kind OnCompleted
OnCompleted.
, - OnError OnCompleted, OnError . , - ...
source
.Materialize()
.Repeat();
, , ( OnCompleted).
, , , OnError , , OnError OnNext Notification<T>
. - :
source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry();
, ( OnCompleted), .
, , , , :
source
.Materialize()
.SelectMany(notification =>
notification.Kind == NotificationKind.OnError
? Observable.Return(notification).Concat(Observable.Exception(notification.Exception))
: Observable.Return(notification)
)
.Retry()
.Map(notification => {
switch (notification.Kind) {
case (NotificationKind.OnNext): return // something.
case (NotificationKind.OnError): return // something.
case (NotificationKind.OnCompleted): return // something.
default: throw new NotImplementedException();
}
});