Split IObservable <byte []> into characters, then string
Rx is great, but sometimes it's hard to find an elegant way to do things. The idea is pretty simple. I am receiving events with byte [], this array can contain part of a string, multiple lines, or one line. I want to find a way to have an IObservable of Line, so IObservable<String>
where each element of the sequence will be a line.
After a few hours, the closest solution I found is quite ugly and of course doesn't work because the scan runs OnNext on each char:
//Intermediate subject use to transform byte[] into char
var outputStream = new Subject<char>();
_reactiveSubcription = outputStream
//Scan doesn't work it trigger OnNext on every char
//Aggregate doesn't work neither as it doesn't return intermediate result
.Scan(new StringBuilder(), (builder, c) => c == '\r' ? new StringBuilder() : builder.Append((char)c))
.Subscribe(this);
Observable.FromEventPattern<ShellDataEventArgs>(shell, "DataReceived")
//Data is a byte[]
.Select(_ => _.EventArgs.Data)
.Subscribe(array => array.ToObservable()
//Convert into char
.ForEach(c => outputStream.OnNext((char)c)));
Note: _reactiveSubcription
should be IObservable<String>
.
Without considering character encoding issues, what am I missing to make this work?
source to share
This works for me.
First convert the byte [] to string and split the string by \r
(Regex Split preserves the delimiters).
There is now a stream of lines, some of which end in \r
.
Then Konkat to keep them in order. Also, since it strings
needs to be hot for the next step, post them.
var strings = bytes.
Select(arr => (Regex.Split(Encoding.Default.GetString(arr, 0, arr.Length - 1), "(\r)")).
Where(s=> s.Length != 0).
ToObservable()).
Concat().
Publish().
RefCount();
Make a line box that ends when the line ends with \r
. strings
should be hot as it is used for both the window content and the end of the window trigger.
var linewindows = strings.Window(strings.Where(s => s.EndsWith("\r")));
The collection of each window in one line.
var lines = linewindows.SelectMany(w => w.Aggregate((l, r) => l + r));
lines
- IObservable<String>
, and each line contains one line.
To test this, I used the following generator to create IObservable<byte[]>
var bytes = Observable.
Range(1, 10).
SelectMany(i => Observable.
Return((byte)('A' + i)).
Repeat(24).
Concat(Observable.
Return((byte)'\r'))).
Window(17).
SelectMany(w => w.ToArray());
source to share