Retrofit2 + RxJava2, Invalid token, how to refresh stream on retry () re-subscribe

I have this simple code below that mimics the scenario Im currently trying to execute

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
            .subscribe(new Observer<Void>() {
                public void onSubscribe(Disposable d) {

                public void onNext(Void value) {

                public void onError(Throwable e) {


                public void onComplete() {


I am just listing it to make my purpose clear:

  • make a POST call with an access token
  • if it gets a matching error (404, 403, 401 or such)
  • make a GET call to have a new access token
  • repeat the whole sequence using the new access token

based on the above code and my understanding so far with .retryWhen () is that it will execute if an error occurred on the original Observable (. postSomethingWithAccessToken () ) and retry if necessary (based on your conditions inside the retry). what's going on here so that .retryWhen () is executed first before the outer Observable, causing an unwanted duplicate request, how can I achieve the things I mentioned above based on my current understanding (code)? Any help would be appreciated. :(

Edit: current workaround:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                public void accept(Authentication authentication) throws Exception {

                            return Observable.error(throwable);
            .subscribe(new Observer<Void>() {
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));

                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));

                public void onError(Throwable e) {

                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");


Method that updates the token using general settings

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();


I noticed (I put Log) an external observable subscription and retryWhen was executed on the main thread, but the re-send / re-subscribe thread is jumping over another scheduler thread, it looks like a race condition :(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...



source to share

3 answers

There are several problems here:

  • you need to pass the access token to the method postSomethingWithAccessToken

    on restart, otherwise you just try again with the same old invalid access token.
  • try again when the logic is wrong you should answer the errors Observable

    you get and put your retry logic there. as you said that this method is executed first, not when an error occurs throwableObservable

    - this is an error response, it will reflect errors as outliers ( onNext()

    ), you can flatMap()

    each error and response either with an error (to deliver the error to the original stream) completed or using onNext()

    with some object to signal its restart.
    Great blog bankrupt Dan Liu on this topic.

what you need is: 1) to save the access token somewhere where you can change it with an update to the access token.
2) fix repetition when logic reacts correctly to errors

Here is the suggestion code:

postSomethingWithAccessToken(request, accessToken)
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                   public ObservableSource<?> apply(
                           @NonNull Observable<Throwable> throwableObservable) throws Exception {
                       return throwableObservable.flatMap(
                               new Function<Throwable, ObservableSource<? extends R>>() {
                                   public ObservableSource<? extends R> apply(
                                           @NonNull Throwable throwable) throws Exception {
                                       if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                           return getAccessToken()
                                                           .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                   //or keep accessToken on some field, the point to have mutable
                                                   //var that you can change and postSomethingWithAccessToken can see
                                       return Observable.error(throwable);
        .subscribe(new Consumer<Result>() {
                       public void accept(@NonNull Result result) throws Exception {
                           //handle result




BIG. Thanks to yosriz when he pointed me in the right direction to solve the tooth grinding problem I have to use defer

. So I ended up with this issue on GitHub, Why does re-subscribing the observable source emit the same output when I use the retryWhen operator?

This is exactly the same problem as it is now, for anyone experiencing the same problem here, this is my solution.

    .defer(new Callable<ObservableSource<?>>() {
        public ObservableSource<?> call() throws Exception {
            // return an observable source here, the observable that will be the source of the entire stream;
    .subscribeOn( /*target thread to run*/ )
    .retryWhen( {
        // return a throwable observable here that will perform the logic when an error occurred
    .subscribe( /*subscription here*/ )


or here is the complete non-lambda of my solution

    .defer(new Callable<ObservableSource<?>>() {
        public ObservableSource<?> call() throws Exception {
            return mApiService.api().postSomethingWithAccessToken(
                request, preferences.getString("access_token", ""));
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    if (throwable instanceof HttpException) {
                        HttpException httpException = (HttpException) throwable;
                        if (httpException.code() == 401) {
                            return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
                                    public void accept(Authentication authentication) throws Exception {
                    return Observable.error(throwable);
    .subscribe(new Observer<Void>() {
        public void onSubscribe(Disposable d) {
            Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));

        public void onNext(Void value) {
            Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));

        public void onError(Throwable e) {

        public void onComplete() {
            Log.e("Complete", "____ COMPLETE");


The key point here is "how to modify / update an existing source observable when the operator .retryWhen()

renews the subscription to the observable source"



I am trying to solve the same problem here, I tried to reproduce the above solution that it updated the token but did not try to retry the request when my token was updated.

Here is my code without the lambda:

public Observable<Estabelecimento> listarEstabelecimentos() {
    return Observable.defer(new Callable<ObservableSource<? extends Estabelecimento>>() {
        public ObservableSource<? extends Estabelecimento> call() throws Exception {
            return mGetNetAPI.listarEstabelecimento()
    }).retryWhen(throwableObservable -> throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                                                                        public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                                                                            if (throwable instanceof UnauthorizedException) {
                                                                                return mRequestManager.getTokenObservable(AutoAtendimentoApplication.getContext())
                                                                                        .doOnNext(new Consumer<AuthResponse>() {
                                                                                            public void accept(@NonNull AuthResponse response) throws Exception {
                                                                                                Log.i("NEXT", "OK");
                                                                                        }).doOnError(new Consumer<Throwable>() {
                                                                                            public void accept(@NonNull Throwable throwable) throws Exception {
                                                                                                Log.i("ONERROR", "NOT OK");


                                                                            return Observable.error(throwable);


Any ideas what I might be doing wrong?



All Articles