RxJava onErrorResumeNext scheduler

I have an observable that might terminate with a special exception, in which case I want to show a dialog with a redo button. I've seen this answer, but it doesn't quite do what I want. I was unable to use retryWhen

to solve my problem, so I used instead onErrorResumeNext

. If you can think of a way to do the same with retryWhen

, please advise.

Now I have this piece of code:

public Observable<Order> proceedWithOrdering(Activity activity) {
    return apiService.createOrder()
            .subscribeOn(Schedulers.io())
            .compose(applyRetryLogic(activity))
            .subscribeOn(AndroidSchedulers.mainThread())
            .observeOn(AndroidSchedulers.mainThread());
}

public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .onErrorResumeNext(retry(observable, activity))
            .subscribeOn(AndroidSchedulers.mainThread());
}

public <T> Func1<Throwable, ? extends Observable<? extends T>> retry(Observable toRetry, Activity activity) {
    return throwable -> {
        if (throwable instanceof NetworkException) {
            MaterialDialog dialog = retryDialog(activity);
            View retry = dialog.getActionButton(DialogAction.POSITIVE);
            View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
            Observable<Object> retryClick = RxView.clicks(retry).map(o -> {
                dialog.dismiss();
                return o;
            });
            Observable<Object> cancelClick = RxView.clicks(cancel).flatMap(o -> {
                dialog.dismiss();
                return Observable.error(throwable);
            });

            dialog.show();

            return Observable.amb(retryClick, cancelClick)
                    .flatMap(o -> toRetry.compose(applyRetryLogic(activity)));
        } else {
            return Observable.error(throwable);
        }
    };
}

      

The problem is that call

internally it retry

is not executed in the main thread, but throws an exception Can't create handler inside thread that has not called Looper.prepare()

.

The question is: how to make it execute on the main thread? As you can see, I've already tried doing it subscribeOn(AndroidSchedulers.mainThread())

right after compose

and onErrorResumeNext

with no luck.

I tested my code with simple observables that don't work on separate threads and it works fine.

+3


source to share


2 answers


You can accomplish this with flatMap

ping a PublishSubject

, which is then refreshed upon clicking the appropriate button. Here's a classic Java Swing example.

public class RetryWhenEnter {
    public static void main(String[] args) {
        AtomicInteger d = new AtomicInteger();
        Observable<Integer> source = Observable.just(1);

        source.flatMap(v -> {
            if (d.incrementAndGet() < 3) {
                return Observable.error(new RuntimeException());
            }
            return Observable.just(v);
        })
        .retryWhen(err -> {
            return err.flatMap(e -> {
                System.out.println(Thread.currentThread() + " Error!");
                PublishSubject<Integer> choice = PublishSubject.create();
                SwingUtilities.invokeLater(() -> {
                    int c = JOptionPane.showConfirmDialog(null, 
                        e.toString() + "\r\nRetry?", "Error",
                        JOptionPane.YES_NO_OPTION);
                    if (c == JOptionPane.YES_OPTION) {
                        choice.onNext(1);
                    } else {
                        choice.onCompleted();
                    }
                });
                return choice;
            });
        }).subscribe(System.out::println, 
                Throwable::printStackTrace);
    }
}

      

Edit:



Or use observeOn(AndroidSchedulers.mainThread())

immediately before onErrorResumeNext

or during use retryWhen

: retryWhen(o -> o.observeOn(AndroidSchedulers.mainThread())...)

.

Edit 2 I reversed the change, so the answer makes sense again.

+2


source


There is a way to solve my problem using retryWhen

(thanks @akarnokd):



public <T extends ApiResponse> Observable.Transformer<T, T> applyRetryLogic(Activity activity) {
    return observable -> observable
            .retryWhen(err -> err.flatMap(throwable -> {
                L.d(Thread.currentThread() + " Error!");
                if (throwable instanceof NetworkException) {
                    PublishSubject<Integer> choice = PublishSubject.create();
                    activity.runOnUiThread(() -> {
                        MaterialDialog dialog = retryDialog(activity);
                        View retry = dialog.getActionButton(DialogAction.POSITIVE);
                        View cancel = dialog.getActionButton(DialogAction.NEGATIVE);
                        RxView.clicks(retry).subscribe(o -> {
                            dialog.dismiss();
                            choice.onNext(1);
                        });
                        RxView.clicks(cancel).subscribe(o -> {
                            dialog.dismiss();
                            choice.onError(throwable);
                        });

                        dialog.show();
                    });
                    return choice;
                } else {
                    return Observable.error(throwable);
                }
            }));
}

      

+2


source







All Articles