Console.ReadLine () is passed to C # event

I am learning RX and would like to use Console.ReadLine as source for observable sequences.

I know I can create an "IEnumerable" using "yield return", but for my specific use case, I decided to create a C # event so that potentially many observers would be able to use the same keyboard input.

Here is my code:

class Program
{
    private delegate void OnNewInputLineHandler(string line);

    private static event OnNewInputLineHandler OnNewInputLineEvent = _ => {};

    static void Main(string[] args)
    {
        Task.Run((Action) GetInput);

        var input = ConsoleInput();
        input.Subscribe(s=>Console.WriteLine("1: " + s));

        Thread.Sleep(30000);
    }

    private static void GetInput()
    {
        while (true)
            OnNewInputLineEvent(Console.ReadLine());
    }

    private static IObservable<string> ConsoleInput()
    {
        return Observable.Create<string>(
        (IObserver<string> observer) =>
        {
            OnNewInputLineHandler h = observer.OnNext;
            OnNewInputLineEvent += h;
            return Disposable.Create(() => { OnNewInputLineEvent -= h; });
        });
    }
}

      

My problem is when I run the GetInput method as shown above, the first line of input is not sent to the sequence (but it is sent to the event handler).

However, if I replace it with the next version, everything works as expected:

private static void GetInput()
{
    while (true)
    {
        var s = Console.ReadLine();
        OnNewInputLineEvent(s);
    }
}

      

Can someone shed some light on why this might be happening?

+3


source to share


2 answers


You are trying to make life difficult for yourself. There is almost always a way to keep things simple with Rx. It's just a matter of learning to think more functionally rather than procedurally.

That's all you need:



class Program
{
    static void Main(string[] args)
    {
        var subscription = ConsoleInput().Subscribe(s => Console.WriteLine("1: " + s));
        Thread.Sleep(30000);
        subscription.Dispose();
    }

    private static IObservable<string> ConsoleInput()
    {
        return
            Observable
                .FromAsync(() => Console.In.ReadLineAsync())
                .Repeat()
                .Publish()
                .RefCount()
                .SubscribeOn(Scheduler.Default);
    }
}

      

This allows multiple subscribers to share the same login through .Publish().RefCount()

. And .SubscribeOn(Scheduler.Default)

pushes the subscription to a new thread - without that, you block the subscription.

+5


source


If you move Task.Run((Action) GetInput);

, then after subscribing your code will work as desired. This is because in the original version, the first call OnNewInputEvent(Console.ReadLine())

is made before you hooked OnNewInputLineEvent to observer.OnNext

.



+1


source







All Articles