RxJava Chained Observables and NetworkMainThreadException

So, I have this code:

public Observable<AbstractXMPPConnection> connect(final AbstractXMPPConnection connection) {
    return Observable.<AbstractXMPPConnection>create(subscriber -> {
        try {
            AbstractXMPPConnection connection2 = connection.connect();
            if (connection2.isConnected()) {
                subscriber.onNext(connection2);
                subscriber.onCompleted();
            }
        } catch (SmackException | IOException | XMPPException e) {
            e.printStackTrace();
            subscriber.onError(e);
        }
    })
    .doOnError(throwable -> LOGI("111", "Connection OnError called"));
}


public Observable<AbstractXMPPConnection> connectWithRetry(final AbstractXMPPConnection connection) {
       return connect(connection)
               .retryWhen(attempts -> attempts.zipWith(Observable.range(1, MAX_CONNECTION_TRIES), (throwable, integer) -> new Pair<>(throwable, integer))
                       .flatMap(pair -> {
                           if (pair.second == MAX_LOGIN_TRIES)
                               return Observable.error(pair.first);
                           return Observable.timer(pair.second, TimeUnit.SECONDS);
                       }));
    }


public void connect() {
        assertTrue("To start a connection to the server, you must first call init() method!",
                this.connectionConfig != null);

        connectionHelper.connectWithRetry(connection)
                .observeOn(Schedulers.newThread())
                .subscribeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<AbstractXMPPConnection>() {
                    @Override
                    public void onCompleted() {
                    }

                    @Override
                    public void onError(Throwable e) {
                        LOGI(TAG, "ConnectionHelper Connection onError\n");

                        /**{@link LoginActivity#onConnectionFailure(OnConnectionFailureEvent)} */
                        MainApplication.getInstance().getBusInstance().post(new OnConnectionFailureEvent());
                    }

                    @Override
                    public void onNext(AbstractXMPPConnection connection) {
                        LOGI(TAG, "ConnectionHelper Connection onNext");
//                        onConnected();
                    }
                });
    }

      

I have some questions about chain of observables. Imagining this scenario where I have an Observable connection that I use sometimes, but I mostly use connectWithRetry()

Observable.

My question is what happens if this is added:

.observeOn(Schedulers.newThread())
.subscribeOn(AndroidSchedulers.mainThread())

      

Both for connect()

and for connectWithRetry()

? In this case, when I call public void connect and specify the scheduler, are the previous ones being ignored?

And why am I getting NetworkOnMainThreadException

? Explicit observeOn(Schedulers.newThread())

is, it shouldn't give me this error

+3


source to share


2 answers


I'll go over your problem first NetworkOnMainThread

.

observeOn(Schedulers.newThread())

means that the output will be watched on a new thread, that is, your subscriber ( onComplete/Error/Next

) code will run on that thread.

subscribeOn(AndroidSchedulers.mainThread()

means the subscription will happen on the main thread - the code in your created observable ( connection.connect()

and so on) is what gets executed when you subscribe.

So just change the schedulers:

.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())

      

So, to answer your first question, they are not ignored, they are simply misused. Hopefully from this you can see what happens if you chained similar calls inside your methods that return observables: nothing is different from what you've already done. The calls will simply be elsewhere.



So where to put the scheduler choice? This is for you. You can get more clarity by not having a call subscribeOn

inside the create methods of your observables:

 connectionHelper.connectWithRetry(connection)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())

      

However, if you feel like you are calling this everywhere for no reason, you can instead move the call subscribeOn

inside your methods:

return connect(connection)
           .retryWhen(...)
           .flatMap(...)
           .subscribeOn(Schedulers.io())
           .observeOn(AndroidSchedulers.mainThread());

      

Note that they don't need to be chained together like this: you can subscribeOn

inside your method, but leave it observeOn

up to any callers who want their results in a particular thread.

+1


source


Please try Schedulers.io () there might be a problem.



0


source







All Articles