Retrofit2 + RxJava2, Invalid token, how to refresh stream on retry () re-subscribe
I have this simple code below that mimics the scenario Im currently trying to execute
mApiService.api().postSomethingWithAccessToken(request, "some_invalid_access_token")
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<AccessToken>>() {
@Override
public ObservableSource<AccessToken> apply(Observable<Throwable> throwableObservable) throws Exception {
return mApiService.api().getAccessToken();
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Void value) {
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
onError(e);
}
@Override
public void onComplete() {
}
});
I am just listing it to make my purpose clear:
- make a POST call with an access token
- if it gets a matching error (404, 403, 401 or such)
- make a GET call to have a new access token
- repeat the whole sequence using the new access token
based on the above code and my understanding so far with .retryWhen () is that it will execute if an error occurred on the original Observable (. postSomethingWithAccessToken () ) and retry if necessary (based on your conditions inside the retry). what's going on here so that .retryWhen () is executed first before the outer Observable, causing an unwanted duplicate request, how can I achieve the things I mentioned above based on my current understanding (code)? Any help would be appreciated. :(
Edit: current workaround:
mApiService.api().postSomethingWithAccessToken(request, preferences.getString("access_token", ""))
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken()
.doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
Method that updates the token using general settings
public void update(Authentication authentication) {
preferences.edit().putString("access_token", authentication.getAccessToken()).commit();
}
I noticed (I put Log) an external observable subscription and retryWhen was executed on the main thread, but the re-send / re-subscribe thread is jumping over another scheduler thread, it looks like a race condition :(
onSubscrbie_outer_observable: Thread[main,5,main]
RetryWhen: Thread[main,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-2,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-2,5,main]
Throwable_FlatMap: Thread[RxCachedThreadScheduler-1,5,main]
doOnNext(Token_Refresh): Thread[RxCachedThreadScheduler-1,5,main]
// and so on...
source to share
There are several problems here:
- you need to pass the access token to the method
postSomethingWithAccessToken
on restart, otherwise you just try again with the same old invalid access token. - try again when the logic is wrong you should answer the errors
Observable
you get and put your retry logic there. as you said that this method is executed first, not when an error occursthrowableObservable
- this is an error response, it will reflect errors as outliers (onNext()
), you canflatMap()
each error and response either with an error (to deliver the error to the original stream) completed or usingonNext()
with some object to signal its restart.
Great blog bankrupt Dan Liu on this topic.
So
what you need is: 1) to save the access token somewhere where you can change it with an update to the access token.
2) fix repetition when logic reacts correctly to errors
Here is the suggestion code:
postSomethingWithAccessToken(request, accessToken)
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(
@NonNull Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(
new Function<Throwable, ObservableSource<? extends R>>() {
@Override
public ObservableSource<? extends R> apply(
@NonNull Throwable throwable) throws Exception {
if (throwable.code == 401) { //or 404/403, just a pseudo-code, put your real error comparing logic here
return getAccessToken()
.doOnNext(refreshedToken -> accessToken.updateToken(refreshedToken));
//or keep accessToken on some field, the point to have mutable
//var that you can change and postSomethingWithAccessToken can see
}
return Observable.error(throwable);
}
});
}
}
)
.subscribeOn(Schedulers.io())
.subscribe(new Consumer<Result>() {
@Override
public void accept(@NonNull Result result) throws Exception {
//handle result
}
}
);
source to share
BIG. Thanks to yosriz when he pointed me in the right direction to solve the tooth grinding problem I have to use defer
. So I ended up with this issue on GitHub, Why does re-subscribing the observable source emit the same output when I use the retryWhen operator?
This is exactly the same problem as it is now, for anyone experiencing the same problem here, this is my solution.
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
// return an observable source here, the observable that will be the source of the entire stream;
}
})
.subscribeOn( /*target thread to run*/ )
.retryWhen( {
// return a throwable observable here that will perform the logic when an error occurred
})
.subscribe( /*subscription here*/ )
or here is the complete non-lambda of my solution
Observable
.defer(new Callable<ObservableSource<?>>() {
@Override
public ObservableSource<?> call() throws Exception {
return mApiService.api().postSomethingWithAccessToken(
request, preferences.getString("access_token", ""));
}
})
.subscribeOn(Schedulers.io())
.retryWhen(new Function<Observable<Throwable>, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(final Observable<Throwable> throwableObservable) throws Exception {
return throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(Throwable throwable) throws Exception {
if (throwable instanceof HttpException) {
HttpException httpException = (HttpException) throwable;
if (httpException.code() == 401) {
return mApiService.api().getAccessToken().doOnNext(new Consumer<Authentication>() {
@Override
public void accept(Authentication authentication) throws Exception {
update(authentication);
}
});
}
}
return Observable.error(throwable);
}
});
}
})
.subscribe(new Observer<Void>() {
@Override
public void onSubscribe(Disposable d) {
Log.e("subscribe", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onNext(Void value) {
Log.e("onNext", "TOKEN : " + preferences.getString("access_token", ""));
}
@Override
public void onError(Throwable e) {
e.printStackTrace();
}
@Override
public void onComplete() {
Log.e("Complete", "____ COMPLETE");
}
});
The key point here is "how to modify / update an existing source observable when the operator .retryWhen()
renews the subscription to the observable source"
source to share
I am trying to solve the same problem here, I tried to reproduce the above solution that it updated the token but did not try to retry the request when my token was updated.
Here is my code without the lambda:
public Observable<Estabelecimento> listarEstabelecimentos() {
return Observable.defer(new Callable<ObservableSource<? extends Estabelecimento>>() {
@Override
public ObservableSource<? extends Estabelecimento> call() throws Exception {
return mGetNetAPI.listarEstabelecimento()
.map(mNetworkErrorHandler::processError);
}
}).retryWhen(throwableObservable -> throwableObservable.flatMap(new Function<Throwable, ObservableSource<?>>() {
@Override
public ObservableSource<?> apply(@NonNull Throwable throwable) throws Exception {
if (throwable instanceof UnauthorizedException) {
return mRequestManager.getTokenObservable(AutoAtendimentoApplication.getContext())
.doOnNext(new Consumer<AuthResponse>() {
@Override
public void accept(@NonNull AuthResponse response) throws Exception {
Log.i("NEXT", "OK");
}
}).doOnError(new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Log.i("ONERROR", "NOT OK");
}
});
}
return Observable.error(throwable);
}
}
));
}
Any ideas what I might be doing wrong?
source to share