RxJava collect () & takeUntil ()
I have a list of users of unknown size. I want to query the first 30 and update the interface. Then I want to query all the others by offsetting in increments of 100 until I get the last batch of users. Should I use it takeUntil
here?) And when I get - update the UI adding the rest of the users (in conjunction with reduce()
I belive).
This is my code:
final int INITIAL_OFFSET = 0;
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", INITIAL_OFFSET, INITIAL_LIMIT)
// Loading remaining users 100 by 100 and updating UI after all users been loaded
.flatMap(users -> {
AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
return Observable.just(users)
.flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.collect(() -> new ArrayList<User>(), (b, s) -> {
b.addAll(s);
newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
})
.repeat()
.takeUntil(friends -> friends.size() == 0);
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(users -> getView().appendAllFriends(users),
throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false));
But it seems that I am doing something wrong because onNext is called every time a refit is done.
source to share
Answering my question. Adels answer is good, but I needed one subscription (I'm using the Nucleus MVP library ) and I wanted to use collect () and takeUntil () instead of a while loop (which requires blocking the retrofit interface method).
Worked for a few hours and finally got it:
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", null, INITIAL_LIMIT)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
// Updating UI 1st time or show error
.doOnNext(users -> getView().appendAllFriends(users))
.doOnError(throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false))
// Loading remaining users 100 by 100 and updating UI after all users been loaded
.flatMap(users -> {
AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
ArrayList<User> remainingUsers = new ArrayList<>();
AtomicBoolean hasMore = new AtomicBoolean(true);
return Observable.just(users)
.flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.collect(() -> remainingUsers, (b, s) -> {
// Needed for takeUntil
hasMore.set(b.addAll(s));
newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
})
.repeat()
.takeUntil(friends -> !hasMore.get())
// Grab all items emitted by collect()
.last()
// Updating UI last time
.doOnNext(users2 -> getView().appendAllFriends(users2));
})
.subscribe();
It might be helpful for other people who also use Nucleus .
source to share
// cache() will ensure that we load the first pack only once
Observable<Users> firstPack = firstPack().cache();
// this subscription is for updating the UI on the first batch
firstPack
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> draw(x), e -> whoops(e));
// this subscription is for collecting all the stuff
// do whatever tricks you need to do with your backend API to get the full list of stuff
firstPack
.flatMap(fp -> rest(fp))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(x -> allUsers(x), e -> whoops(e));
// I would do this in a simple while loop
Observable<List<User>> rest(List<User> firstPack) {
return Observable.create(sub -> {
final List<User> total = firstPack;
try {
while (!sub.isUnsubscribed()) {
final List<User> friends = api.getFriendsBlocking(total.size());
if (friends.isEmpty()) {
sub.onNext(total);
sub.onCompleted();
} else {
total.addAll(friends);
}
}
} catch(IOException e) {
sub.onError(e);
}
})
}
source to share