RxJava collect () & takeUntil ()

I have a list of users of unknown size. I want to query the first 30 and update the interface. Then I want to query all the others by offsetting in increments of 100 until I get the last batch of users. Should I use it takeUntil

here?) And when I get - update the UI adding the rest of the users (in conjunction with reduce()

I belive).

This is my code:

final int INITIAL_OFFSET = 0;
final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", INITIAL_OFFSET, INITIAL_LIMIT)
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(Schedulers.io())
                    .collect(() -> new ArrayList<User>(), (b, s) -> {
                        b.addAll(s);
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> friends.size() == 0);
        })
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(users -> getView().appendAllFriends(users),
                throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false));

      

But it seems that I am doing something wrong because onNext is called every time a refit is done.

+3


source to share


2 answers


Answering my question. Adels answer is good, but I needed one subscription (I'm using the Nucleus MVP library ) and I wanted to use collect () and takeUntil () instead of a while loop (which requires blocking the retrofit interface method).

Worked for a few hours and finally got it:



final int INITIAL_LIMIT = 30;
// Loading first 30 users to immediately update UI (better UX)
getServerApi().getAllFriends(userId, "photo_50", null, INITIAL_LIMIT)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        // Updating UI 1st time or show error
        .doOnNext(users -> getView().appendAllFriends(users))
        .doOnError(throwable -> getView().setError(processFail(throwable, ServerApi.Action.GET_ALL_FRIENDS), false))
        // Loading remaining users 100 by 100 and updating UI after all users been loaded
        .flatMap(users -> {
            AtomicInteger newOffset = new AtomicInteger(INITIAL_LIMIT);
            ArrayList<User> remainingUsers = new ArrayList<>();
            AtomicBoolean hasMore = new AtomicBoolean(true);
            return Observable.just(users)
                    .flatMap(users1 -> getServerApi().getAllFriends(userId, "photo_50", newOffset.get(), Config.DEFAULT_FRIEND_REQUEST_COUNT))
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .collect(() -> remainingUsers, (b, s) -> {
                        // Needed for takeUntil
                        hasMore.set(b.addAll(s));
                        newOffset.set(newOffset.get() + Config.DEFAULT_FRIEND_REQUEST_COUNT);
                    })
                    .repeat()
                    .takeUntil(friends -> !hasMore.get())
                    // Grab all items emitted by collect()
                    .last()
                    // Updating UI last time
                    .doOnNext(users2 -> getView().appendAllFriends(users2));
        })
        .subscribe();

      

It might be helpful for other people who also use Nucleus .

+2


source


// cache() will ensure that we load the first pack only once
Observable<Users> firstPack = firstPack().cache();

// this subscription is for updating the UI on the first batch
firstPack
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> draw(x), e -> whoops(e));

// this subscription is for collecting all the stuff
// do whatever tricks you need to do with your backend API to get the full list of stuff
firstPack
  .flatMap(fp -> rest(fp))
  .observeOn(AndroidSchedulers.mainThread())
  .subscribe(x -> allUsers(x), e -> whoops(e));

// I would do this in a simple while loop
Observable<List<User>> rest(List<User> firstPack) {
  return Observable.create(sub -> {
    final List<User> total = firstPack;
    try {
      while (!sub.isUnsubscribed()) {
        final List<User> friends = api.getFriendsBlocking(total.size());
        if (friends.isEmpty()) {
          sub.onNext(total);
          sub.onCompleted();
        } else {
          total.addAll(friends);
        }
      }
    } catch(IOException e) {
      sub.onError(e);
    }
  })
}

      



+1


source







All Articles