Split IObservable <byte []> into characters, then string

Rx is great, but sometimes it's hard to find an elegant way to do things. The idea is pretty simple. I am receiving events with byte [], this array can contain part of a string, multiple lines, or one line. I want to find a way to have an IObservable of Line, so IObservable<String>

where each element of the sequence will be a line.

After a few hours, the closest solution I found is quite ugly and of course doesn't work because the scan runs OnNext on each char:

//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
    //Scan doesn't work it trigger OnNext on every char
    //Aggregate doesn't work neither as it doesn't return intermediate result
    .Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
    .Subscribe(this);


Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
            //Data is a byte[]
            .Select(_ => _.EventArgs.Data)
            .Subscribe(array => array.ToObservable()
            //Convert into char
            .ForEach(c => outputStream.OnNext((char)c)));

      

Note: _reactiveSubcription

should be IObservable<String>

.

Without considering character encoding issues, what am I missing to make this work?

+3


source to share


1 answer


This works for me.

First convert the byte [] to string and split the string by \r

(Regex Split preserves the delimiters).

There is now a stream of lines, some of which end in \r

.

Then Konkat to keep them in order. Also, since it strings

needs to be hot for the next step, post them.

var strings = bytes.
  Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
    Where(s=> s.Length != 0).
    ToObservable()).
  Concat().
  Publish().
  RefCount();

      

Make a line box that ends when the line ends with \r

. strings

should be hot as it is used for both the window content and the end of the window trigger.



var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));

      

The collection of each window in one line.

var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));

      

lines

- IObservable<String>

, and each line contains one line.

To test this, I used the following generator to create IObservable<byte[]>

var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
    Return((byte)('A' + i)).
    Repeat(24).
    Concat(Observable.
        Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());

      

+6


source







All Articles