Console.ReadLine () is passed to C # event
I am learning RX and would like to use Console.ReadLine as source for observable sequences.
I know I can create an "IEnumerable" using "yield return", but for my specific use case, I decided to create a C # event so that potentially many observers would be able to use the same keyboard input.
Here is my code:
class Program
{
private delegate void OnNewInputLineHandler(string line);
private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {};
static void Main(string[] args)
{
Task.Run((Action) GetInput);
var input = ConsoleInput();
input.Subscribe(s=>Console.WriteLine("1: " + s));
Thread.Sleep(30000);
}
private static void GetInput()
{
while (true)
OnNewInputLineEvent(Console.ReadLine());
}
private static IObservable<string> ConsoleInput()
{
return Observable.Create<string>(
(IObserver<string> observer) =>
{
OnNewInputLineHandler h = observer.OnNext;
OnNewInputLineEvent += h;
return Disposable.Create(() => { OnNewInputLineEvent -= h; });
});
}
}
My problem is when I run the GetInput method as shown above, the first line of input is not sent to the sequence (but it is sent to the event handler).
However, if I replace it with the next version, everything works as expected:
private static void GetInput()
{
while (true)
{
var s = Console.ReadLine();
OnNewInputLineEvent(s);
}
}
Can someone shed some light on why this might be happening?
source to share
You are trying to make life difficult for yourself. There is almost always a way to keep things simple with Rx. It's just a matter of learning to think more functionally rather than procedurally.
That's all you need:
class Program
{
static void Main(string[] args)
{
var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s));
Thread.Sleep(30000);
subscription.Dispose();
}
private static IObservable<string> ConsoleInput()
{
return
Observable
.FromAsync(() => Console.In.ReadLineAsync())
.Repeat()
.Publish()
.RefCount()
.SubscribeOn(Scheduler.Default);
}
}
This allows multiple subscribers to share the same login through .Publish().RefCount()
. And .SubscribeOn(Scheduler.Default)
pushes the subscription to a new thread - without that, you block the subscription.
source to share