Correct pattern for chain of transformations (pipeline) on observable stream?

I have the following scenario:

  • taking into account the flow of objects IObservable<E>

  • handle each E

    one to get either an E1

    error state, in which case I need an error messageM1

  • process each E1

    one to get either E2

    an error messageM2

  • ....

There is an additional complication is that the result of En and / or an error message Mn

may depend on the values of E

, E1

, ..., En-1

- not only from En-1

.

With all of this in mind, is there a better sample than what I am using?

[Edit] As requested, I've added a fully working example; unfortunately made this post quite large.

internal class Program
{
  private static void Main()
  {
    var stream = Enumerable.Range(1, 10).Select(i => new Record { Id = i }).ToObservable();

    stream
      .Select(it => new ComplexType { Item = it })
      .SelectIfOk(Process1)
      .SelectIfOk(Process2)
      .SelectIfOk(ProcessN)
      .Subscribe(DisplayResult);

    Console.ReadLine();
  }

  private static ComplexType Process1(ComplexType data)
  {
    // do some processing
    data.E1 = data.Item.Id * 10;

    // check for errors in output
    if (data.E1 == 30 || data.E1 == 70)
    {
      data.Errors.Add("Error");
    }

    return data;
  }

  private static ComplexType Process2(ComplexType data)
  {
    // do some processing
    data.E2 = (data.E1 - 3).ToString();

    // check for errors in output
    // can generate multiple errors for the same item
    if (data.E2.StartsWith("4"))
    {
      // does not only depend on the immediate precursor, E1 in this case
      data.Errors.Add("Starts with 4 -- " + data.Item.Id);
    }

    if (data.E2.StartsWith("8"))
    {
      data.Errors.Add("Starts with 8");
    }

    return data;
  }

  private static ComplexType ProcessN(ComplexType data)
  {
    // do some processing
    data.EN = "Success " + data.E2;

    // this one doesn't generate errors
    return data;
  }

  private static void DisplayResult(ComplexType data)
  {
    if (data.Errors.Any())
    {
      Console.WriteLine("{0:##0} has errors: " + string.Join(",", data.Errors));
    }
    else
    {
      Console.WriteLine("{0:##0}: {1}", data.Item.Id, data.EN);
    }
  }
}

      

These are the classes used in the above code example:

public class Record
{
  public int Id { get; set; }
  public string FullName { get; set; }
  public string OtherStuff { get; set; }
}

public class ComplexType
{
  public Record Item { get; set; }

  // intermediary results
  public int E1 { get; set; }
  public string E2 { get; set; }

  // final result
  public string EN { get; set; }

  public List<string> Errors { get; set; }

  public ComplexType()
  {
    Errors = new List<string>();
  }
}

      

Note that between the types E1

, E2

..., En

no links between them (in particular, they did not all inherit the same general type).

SelectIfOk

is an extension method:

public static IObservable<T> SelectIfOk<T>(this IObservable<T> observable,
  Func<T, T> selector)
  where T : ComplexType
{
  return observable.Select(item => item.Errors.Any() ? item : selector(item));
}

      

The result of running this code is:

1: Success 7
2: Success 17
3 has errors: Error
4: Success 37
5 has errors: Starts with 4 -- 5
6: Success 57
7 has errors: Error
8: Success 77
9 has errors: Starts with 8
10: Success 97

      

I am using ComplexType so that I can carry both intermediate results and error state and it just looks like ... fishy. I've been looking at this code for a week (it was a hobby project) and I continue to feel like I'm missing the right way to do things with Rx.

[Edit] I forgot to mention a very important thing: it is very important that I process all elements in the stream, even if some of them generate errors; so I can't just use an overload Subscribe

that takes an exception - it will terminate the thread. Abandoning one item when the error is good (if Process1

generates an error, then Process2

, ..., are ProcessN

no longer executed), but does not leave the whole thread.

[Edit] One more clarification: if it helps, the processing I mean is more natural for the TPL DataFlow library, except that I'm limited to .NET 4.0 so I can't use it.

BTW, I couldn't find any serious discussion of error handling in Rx, it usually mentions the Subscribe

overload / the call OnError

and what it is. Does anyone have any guidelines for an in-depth approach to the topic?

+3


source to share


2 answers


Knowing everyone ProcessN

ahead of time means you can get rid of multiple operators Select

, simplifying the request and revealing your true intent.

stream.Select(e => Process(e));

...

? Process(E e)
{
  // Process1, Process2, ...ProcessN
}

      

Now we can see that this is not a reactive problem. It looks more like an interactive aggregation problem.

You also haven't specified which result you should ultimately see in Subscribe , although I'm just assuming it's an aggregated result of the processes.

To determine the return type, you first need to determine the return type ProcessN

. Instead of using, ComplexType

I will use a type with better semantics:

Either<E, Exception> Process(?);

      

So each function ProcessN

can return either E

or Exception

(no throwing).

Also, as per your requirement, everyone ProcessN

should receive the current result of the running aggregation as their input. So, like your definition above, should we replace? with a list of return values ​​of the functions ProcessN

called before it.

Either<E, Exception> Process(IList<Either<E, Exception>> results);

      

Now we can define the return type of the aggregator (defined above):



IList<Either<E, Exception>> Process(E e)
{
  // Process1, Process2, ...ProcessN
}

      

The body of the aggregator can be implemented as follows:

IList<Either<E, Exception>> Process(E e)
{
  var results = new List<Either<E, Exception>>();
  results.Add(Process1(results.AsReadOnly()));
  results.Add(Process2(results.AsReadOnly()));
  ...
  results.Add(ProcessN(results.AsReadOnly()));
  return results.AsReadOnly();
}

      

Rx error handling

Start by reading the Rx Design Guide .

Rx Input contains a section on error handling operations.

Here are some detailed discussions / comments on the semantics of error handling and contracts in Rx:

(Full disclosure: I've contributed to all of these specific discussions.)

0


source


When I asked this question, I was not aware of the class Notification<T>

and method Materialize

in Rx. Here is the solution I came up with - it primarily solves the "pipelined" aspect of the problem, but I can solve the "depends on intermediate results" aspect using tuples:

private static void Main()
{
  var source = new Subject<int>();

  source
    .Materialize()
    .SelectIfOk(Process1)
    .SelectIfOk(Process2)
    .Subscribe(it =>
      Console.WriteLine(it.HasValue
        ? it.Value.ToString()
        : it.Exception != null ? it.Exception.Message : "Completed."));

  source.OnNext(1);
  source.OnNext(2);
  source.OnNext(3);
  source.OnNext(4);
  source.OnNext(5);
  source.OnCompleted();

  Console.ReadLine();
}

private static int Process1(int value)
{
  if (value == 3)
    throw new Exception("error 1");

  // do some processing
  return value * 2;
}

private static string Process2(int value)
{
  if (value == 4)
    throw new Exception("error 2");

  // do some processing
  return value + " good";
}

private static IObservable<Notification<TR>> SelectIfOk<T, TR>(this IObservable<Notification<T>> stream,
  Func<T, TR> selector)
{
  Func<T, Notification<TR>> trySelector = it =>
  {
    try
    {
      var value = selector(it);
      return Notification.CreateOnNext(value);
    }
    catch (Exception ex)
    {
      return Notification.CreateOnError<TR>(ex);
    }
  };

  return stream.Select(it =>
    it.HasValue
      ? trySelector(it.Value)
      : it.Exception != null
        ? Notification.CreateOnError<TR>(it.Exception)
        : Notification.CreateOnCompleted<TR>());
}

      

If I want to use intermediate results, then, as I said, I will use tuples:



private static Tuple<int, string> Process2(int value)
{
  if (value == 4)
    throw new Exception("error 2");

  // do some processing
  return Tuple.Create(value, value * 3 + " good");
}

private static string Process3(Tuple<int, string> value)
{
  return value.Item1 + " -> " + value.Item2;
}

      

(I need to add .SelectIfOk(Process3)

to the pipeline.)

I don't want to mark my own answer as correct, so I'll leave this open for a while; however, as far as I can tell, it fulfills my requirements.

0


source







All Articles