Retrofit2 + RxJava2, Invalid token, how to refresh stream on retry () re-subscribe

I have this simple code below that mimics the scenario Im currently trying to execute

mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {

                @Override
                public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
                    return mApiService.api().getAccessToken();
                }
            })
            .subscribeOn(Schedulers.io())
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                }

                @Override
                public void onNext(Void value) {
                }

                @Override
                public void onError(Throwable e) {

                    e.printStackTrace();
                    onError(e);
                }

                @Override
                public void onComplete() {
                }
            });

      

I am just listing it to make my purpose clear:

  • make a POST call with an access token
  • if it gets a matching error (404, 403, 401 or such)
  • make a GET call to have a new access token
  • repeat the whole sequence using the new access token

based on the above code and my understanding so far with .retryWhen () is that it will execute if an error occurred on the original Observable (. postSomethingWithAccessToken () ) and retry if necessary (based on your conditions inside the retry). what's going on here so that .retryWhen () is executed first before the outer Observable, causing an unwanted duplicate request, how can I achieve the things I mentioned above based on my current understanding (code)? Any help would be appreciated. :(

Edit: current workaround:

mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
            .subscribeOn(Schedulers.io())
            .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {

                @Override
                public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {

                    return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {

                        @Override
                        public ObservableSource<?> apply(Throwable throwable) throws Exception {

                            if (throwable instanceof HttpException) {

                                HttpException httpException = (HttpException) throwable;

                                if (httpException.code() == 401) {

                                    return mApiService.api().getAccessToken()
                                            .doOnNext(new Consumer<Authentication>() {
                                                @Override
                                                public void accept(Authentication authentication) throws Exception {
                                                    update(authentication);
                                                }
                                            });
                                }
                            }

                            return Observable.error(throwable);
                        }
                    });
                }
            })
            .subscribe(new Observer<Void>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onNext(Void value) {
                    Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
                }

                @Override
                public void onError(Throwable e) {
                    e.printStackTrace();
                }

                @Override
                public void onComplete() {
                    Log.e("Complete", "____ COMPLETE");
                }
            });

      

Method that updates the token using general settings

public void update(Authentication authentication) {
    preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}

      

I noticed (I put Log) an external observable subscription and retryWhen was executed on the main thread, but the re-send / re-subscribe thread is jumping over another scheduler thread, it looks like a race condition :(

    onSubscrbie_outer_observable: Thread[main,5,main]
    RetryWhen: Thread[main,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
    Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
    doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
    // and so on...

      

+3


source to share


3 answers


There are several problems here:

  • you need to pass the access token to the method postSomethingWithAccessToken

    on restart, otherwise you just try again with the same old invalid access token.
  • try again when the logic is wrong you should answer the errors Observable

    you get and put your retry logic there. as you said that this method is executed first, not when an error occurs throwableObservable

    - this is an error response, it will reflect errors as outliers ( onNext()

    ), you can flatMap()

    each error and response either with an error (to deliver the error to the original stream) completed or using onNext()

    with some object to signal its restart.
    Great blog bankrupt Dan Liu on this topic.

So
what you need is: 1) to save the access token somewhere where you can change it with an update to the access token.
2) fix repetition when logic reacts correctly to errors



Here is the suggestion code:

postSomethingWithAccessToken(request, accessToken)
        .subscribeOn(Schedulers.io())
        .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
                   @Override
                   public ObservableSource<?> apply(
                           @NonNull Observable<Throwable> throwableObservable) throws Exception {
                       return throwableObservable.flatMap(
                               new Function<Throwable, ObservableSource<? extends R>>() {
                                   @Override
                                   public ObservableSource<? extends R> apply(
                                           @NonNull Throwable throwable) throws Exception {
                                       if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
                                           return getAccessToken()
                                                           .doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
                                                   //or keep accessToken on some field, the point to have mutable
                                                   //var that you can change and postSomethingWithAccessToken can see
                                       }
                                       return Observable.error(throwable);
                                   }
                               });
                       }
                   }
        )
        .subscribeOn(Schedulers.io())
        .subscribe(new Consumer<Result>() {
                       @Override
                       public void accept(@NonNull Result result) throws Exception {
                           //handle result
                       }
                   }
        );

      

+1


source


BIG. Thanks to yosriz when he pointed me in the right direction to solve the tooth grinding problem I have to use defer

. So I ended up with this issue on GitHub, Why does re-subscribing the observable source emit the same output when I use the retryWhen operator?

This is exactly the same problem as it is now, for anyone experiencing the same problem here, this is my solution.

Observable
    .defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            // return an observable source here, the observable that will be the source of the entire stream;
        }
    })
    .subscribeOn( /*target thread to run*/ )
    .retryWhen( {
        // return a throwable observable here that will perform the logic when an error occurred
    })
    .subscribe( /*subscription here*/ )

      



or here is the complete non-lambda of my solution

Observable
    .defer(new Callable<ObservableSource<?>>() {
        @Override
        public ObservableSource<?> call() throws Exception {
            return mApiService.api().postSomethingWithAccessToken(
                request, preferences.getString("access_token", ""));
        }
    })
    .subscribeOn(Schedulers.io())
    .retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
        @Override
        public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
            return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                @Override
                public ObservableSource<?> apply(Throwable throwable) throws Exception {
                    if (throwable instanceof HttpException) {
                        HttpException httpException = (HttpException) throwable;
                        if (httpException.code() == 401) {
                            return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
                                    @Override
                                    public void accept(Authentication authentication) throws Exception {
                                        update(authentication);
                                    }
                                });
                        }
                    }
                    return Observable.error(throwable);
                }
            });
        }
    })
    .subscribe(new Observer<Void>() {
        @Override
        public void onSubscribe(Disposable d) {
            Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
        }

        @Override
        public void onNext(Void value) {
            Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
        }

        @Override
        public void onError(Throwable e) {
            e.printStackTrace();
        }

        @Override
        public void onComplete() {
            Log.e("Complete", "____ COMPLETE");
        }
    });

      

The key point here is "how to modify / update an existing source observable when the operator .retryWhen()

renews the subscription to the observable source"

+1


source


I am trying to solve the same problem here, I tried to reproduce the above solution that it updated the token but did not try to retry the request when my token was updated.

Here is my code without the lambda:

public Observable<Estabelecimento> listarEstabelecimentos() {
    return Observable.defer(new Callable<ObservableSource<? extends Estabelecimento>>() {
        @Override
        public ObservableSource<? extends Estabelecimento> call() throws Exception {
            return mGetNetAPI.listarEstabelecimento()
                    .map(mNetworkErrorHandler::processError);
        }
    }).retryWhen(throwableObservable -> throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
                                                                        @Override
                                                                        public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
                                                                            if (throwable instanceof UnauthorizedException) {
                                                                                return mRequestManager.getTokenObservable(AutoAtendimentoApplication.getContext())
                                                                                        .doOnNext(new Consumer<AuthResponse>() {
                                                                                            @Override
                                                                                            public void accept(@NonNull AuthResponse response) throws Exception {
                                                                                                Log.i("NEXT", "OK");
                                                                                            }
                                                                                        }).doOnError(new Consumer<Throwable>() {
                                                                                            @Override
                                                                                            public void accept(@NonNull Throwable throwable) throws Exception {
                                                                                                Log.i("ONERROR", "NOT OK");
                                                                                            }
                                                                                        });

                                                                            }

                                                                            return Observable.error(throwable);
                                                                        }
                                                                    }
    ));
}

      

Any ideas what I might be doing wrong?

0


source







All Articles