Correct pattern for chain of transformations (pipeline) on observable stream?
I have the following scenario:
- taking into account the flow of objects
IObservable<E>
- handle each
E
one to get either anE1
error state, in which case I need an error messageM1
- process each
E1
one to get eitherE2
an error messageM2
- ....
There is an additional complication is that the result of En and / or an error message Mn
may depend on the values of E
, E1
, ..., En-1
- not only from En-1
.
With all of this in mind, is there a better sample than what I am using?
[Edit] As requested, I've added a fully working example; unfortunately made this post quite large.
internal class Program
{
private static void Main()
{
var stream = Enumerable.Range(1, 10).Select(i => new Record { Id = i }).ToObservable();
stream
.Select(it => new ComplexType { Item = it })
.SelectIfOk(Process1)
.SelectIfOk(Process2)
.SelectIfOk(ProcessN)
.Subscribe(DisplayResult);
Console.ReadLine();
}
private static ComplexType Process1(ComplexType data)
{
// do some processing
data.E1 = data.Item.Id * 10;
// check for errors in output
if (data.E1 == 30 || data.E1 == 70)
{
data.Errors.Add("Error");
}
return data;
}
private static ComplexType Process2(ComplexType data)
{
// do some processing
data.E2 = (data.E1 - 3).ToString();
// check for errors in output
// can generate multiple errors for the same item
if (data.E2.StartsWith("4"))
{
// does not only depend on the immediate precursor, E1 in this case
data.Errors.Add("Starts with 4 -- " + data.Item.Id);
}
if (data.E2.StartsWith("8"))
{
data.Errors.Add("Starts with 8");
}
return data;
}
private static ComplexType ProcessN(ComplexType data)
{
// do some processing
data.EN = "Success " + data.E2;
// this one doesn't generate errors
return data;
}
private static void DisplayResult(ComplexType data)
{
if (data.Errors.Any())
{
Console.WriteLine("{0:##0} has errors: " + string.Join(",", data.Errors));
}
else
{
Console.WriteLine("{0:##0}: {1}", data.Item.Id, data.EN);
}
}
}
These are the classes used in the above code example:
public class Record
{
public int Id { get; set; }
public string FullName { get; set; }
public string OtherStuff { get; set; }
}
public class ComplexType
{
public Record Item { get; set; }
// intermediary results
public int E1 { get; set; }
public string E2 { get; set; }
// final result
public string EN { get; set; }
public List<string> Errors { get; set; }
public ComplexType()
{
Errors = new List<string>();
}
}
Note that between the types E1
, E2
..., En
no links between them (in particular, they did not all inherit the same general type).
SelectIfOk
is an extension method:
public static IObservable<T> SelectIfOk<T>(this IObservable<T> observable,
Func<T, T> selector)
where T : ComplexType
{
return observable.Select(item => item.Errors.Any() ? item : selector(item));
}
The result of running this code is:
1: Success 7
2: Success 17
3 has errors: Error
4: Success 37
5 has errors: Starts with 4 -- 5
6: Success 57
7 has errors: Error
8: Success 77
9 has errors: Starts with 8
10: Success 97
I am using ComplexType so that I can carry both intermediate results and error state and it just looks like ... fishy. I've been looking at this code for a week (it was a hobby project) and I continue to feel like I'm missing the right way to do things with Rx.
[Edit] I forgot to mention a very important thing: it is very important that I process all elements in the stream, even if some of them generate errors; so I can't just use an overload Subscribe
that takes an exception - it will terminate the thread. Abandoning one item when the error is good (if Process1
generates an error, then Process2
, ..., are ProcessN
no longer executed), but does not leave the whole thread.
[Edit] One more clarification: if it helps, the processing I mean is more natural for the TPL DataFlow library, except that I'm limited to .NET 4.0 so I can't use it.
BTW, I couldn't find any serious discussion of error handling in Rx, it usually mentions the Subscribe
overload / the call OnError
and what it is. Does anyone have any guidelines for an in-depth approach to the topic?
source to share
Knowing everyone ProcessN
ahead of time means you can get rid of multiple operators Select
, simplifying the request and revealing your true intent.
stream.Select(e => Process(e));
...
? Process(E e)
{
// Process1, Process2, ...ProcessN
}
Now we can see that this is not a reactive problem. It looks more like an interactive aggregation problem.
You also haven't specified which result you should ultimately see in Subscribe , although I'm just assuming it's an aggregated result of the processes.
To determine the return type, you first need to determine the return type ProcessN
. Instead of using, ComplexType
I will use a type with better semantics:
Either<E, Exception> Process(?);
So each function ProcessN
can return either E
or Exception
(no throwing).
Also, as per your requirement, everyone ProcessN
should receive the current result of the running aggregation as their input. So, like your definition above, should we replace? with a list of return values ββof the functions ProcessN
called before it.
Either<E, Exception> Process(IList<Either<E, Exception>> results);
Now we can define the return type of the aggregator (defined above):
IList<Either<E, Exception>> Process(E e)
{
// Process1, Process2, ...ProcessN
}
The body of the aggregator can be implemented as follows:
IList<Either<E, Exception>> Process(E e)
{
var results = new List<Either<E, Exception>>();
results.Add(Process1(results.AsReadOnly()));
results.Add(Process2(results.AsReadOnly()));
...
results.Add(ProcessN(results.AsReadOnly()));
return results.AsReadOnly();
}
Rx error handling
Start by reading the Rx Design Guide .
Rx Input contains a section on error handling operations.
Here are some detailed discussions / comments on the semantics of error handling and contracts in Rx:
- About contracts in Rx
- Rx ++ error ?! Catching OnNext Exceptions (Discussion is broader than Rx ++)
- Reactive extensions catch exceptions from OnNext () thrown by a thread pool thread.
- Exception Handling and EventArgs.Error (WCF, but with a broader application)
(Full disclosure: I've contributed to all of these specific discussions.)
source to share
When I asked this question, I was not aware of the class Notification<T>
and method Materialize
in Rx. Here is the solution I came up with - it primarily solves the "pipelined" aspect of the problem, but I can solve the "depends on intermediate results" aspect using tuples:
private static void Main()
{
var source = new Subject<int>();
source
.Materialize()
.SelectIfOk(Process1)
.SelectIfOk(Process2)
.Subscribe(it =>
Console.WriteLine(it.HasValue
? it.Value.ToString()
: it.Exception != null ? it.Exception.Message : "Completed."));
source.OnNext(1);
source.OnNext(2);
source.OnNext(3);
source.OnNext(4);
source.OnNext(5);
source.OnCompleted();
Console.ReadLine();
}
private static int Process1(int value)
{
if (value == 3)
throw new Exception("error 1");
// do some processing
return value * 2;
}
private static string Process2(int value)
{
if (value == 4)
throw new Exception("error 2");
// do some processing
return value + " good";
}
private static IObservable<Notification<TR>> SelectIfOk<T, TR>(this IObservable<Notification<T>> stream,
Func<T, TR> selector)
{
Func<T, Notification<TR>> trySelector = it =>
{
try
{
var value = selector(it);
return Notification.CreateOnNext(value);
}
catch (Exception ex)
{
return Notification.CreateOnError<TR>(ex);
}
};
return stream.Select(it =>
it.HasValue
? trySelector(it.Value)
: it.Exception != null
? Notification.CreateOnError<TR>(it.Exception)
: Notification.CreateOnCompleted<TR>());
}
If I want to use intermediate results, then, as I said, I will use tuples:
private static Tuple<int, string> Process2(int value)
{
if (value == 4)
throw new Exception("error 2");
// do some processing
return Tuple.Create(value, value * 3 + " good");
}
private static string Process3(Tuple<int, string> value)
{
return value.Item1 + " -> " + value.Item2;
}
(I need to add .SelectIfOk(Process3)
to the pipeline.)
I don't want to mark my own answer as correct, so I'll leave this open for a while; however, as far as I can tell, it fulfills my requirements.
source to share