RxJava onErrorResumeNext scheduler

I have an observable that might terminate with a special exception, in which case I want to show a dialog with a redo button. I've seen this answer, but it doesn't quite do what I want. I was unable to use retryWhen

to solve my problem, so I used instead onErrorResumeNext

. If you can think of a way to do the same with retryWhen

, please advise.

Now I have this piece of code:

public Observable<Order> proceedWithOrdering(Activity activity) {
    return apiService.createOrder()

public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .onErrorResumeNext(retry(observable, activity))

public <T> Func1<Throwable, ? extends Observable<? extends T>> retry(Observable toRetry, Activity activity) {
    return throwable -> {
        if (throwable instanceof NetworkException) {
            MaterialDialog dialog = retryDialog(activity);
            View retry = dialog.getActionButton(DialogAction.POSITIVE);
            View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
            Observable<Object> retryClick = RxView.clicks(retry).map(o -> {
                return o;
            Observable<Object> cancelClick = RxView.clicks(cancel).flatMap(o -> {
                return Observable.error(throwable);


            return Observable.amb(retryClick, cancelClick)
                    .flatMap(o -> toRetry.compose(applyRetryLogic(activity)));
        } else {
            return Observable.error(throwable);


The problem is that call

internally it retry

is not executed in the main thread, but throws an exception Can't create handler inside thread that has not called Looper.prepare()


The question is: how to make it execute on the main thread? As you can see, I've already tried doing it subscribeOn(AndroidSchedulers.mainThread())

right after compose

and onErrorResumeNext

with no luck.

I tested my code with simple observables that don't work on separate threads and it works fine.


You can accomplish this with flatMap

ping a PublishSubject

, which is then refreshed upon clicking the appropriate button. Here's a classic Java Swing example.

public class RetryWhenEnter {
    public static void main(String[] args) {
        AtomicInteger d = new AtomicInteger();
        Observable<Integer> source = Observable.just(1);

        source.flatMap(v -> {
            if (d.incrementAndGet() < 3) {
                return Observable.error(new RuntimeException());
            return Observable.just(v);
        .retryWhen(err -> {
            return err.flatMap(e -> {
                System.out.println(Thread.currentThread() + " Error!");
                PublishSubject<Integer> choice = PublishSubject.create();
                SwingUtilities.invokeLater(() -> {
                    int c = JOptionPane.showConfirmDialog(null, 
                        e.toString() + "\r\nRetry?", "Error",
                    if (c == JOptionPane.YES_OPTION) {
                    } else {
                return choice;



Or use observeOn(AndroidSchedulers.mainThread())

immediately before onErrorResumeNext

or during use retryWhen

: retryWhen(o -> o.observeOn(AndroidSchedulers.mainThread())...)


Edit 2 I reversed the change, so the answer makes sense again.



There is a way to solve my problem using retryWhen

(thanks @akarnokd):

public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .retryWhen(err -> err.flatMap(throwable -> {
                L.d(Thread.currentThread() + " Error!");
                if (throwable instanceof NetworkException) {
                    PublishSubject<Integer> choice = PublishSubject.create();
                    activity.runOnUiThread(() -> {
                        MaterialDialog dialog = retryDialog(activity);
                        View retry = dialog.getActionButton(DialogAction.POSITIVE);
                        View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
                        RxView.clicks(retry).subscribe(o -> {
                        RxView.clicks(cancel).subscribe(o -> {

                    return choice;
                } else {
                    return Observable.error(throwable);




