RxJava + Retrofit + Realm makes an unlimited get request

I am completely new to rxJava and it is really confusing, I want to shutdown my application first and I decided to use Realm and Retrofit. First I want to get data from upgrade and then get data from my remote Then, webservice, use realm insertOrUpdate

to merge remote objects with local. I can go through this process so far, but when I browse my web requests on stetho this method ends endless requests. Where am I wrong? Here the function

public Observable<RealmResults<Event>> all() {
    Realm realm = Realm.getDefaultInstance();

    return realm.where(Event.class).findAllAsync()
            .asObservable()
            .filter(new Func1<RealmResults<Event>, Boolean>() {
                @Override
                public Boolean call(RealmResults<Event> events) {
                    return events.isLoaded();
                }
            })
            .doOnNext(new Action1<RealmResults<Event>>() {
                @Override
                public void call(RealmResults<Event> events) {
                    service.getEvents()
                            .subscribeOn(Schedulers.io())
                            .subscribe(new Action1<List<Event>>() {
                                @Override
                                public void call(final List<Event> events) {
                                    try(Realm realm = Realm.getDefaultInstance()) {
                                        realm.executeTransaction(new Realm.Transaction() {
                                            @Override
                                            public void execute(Realm realm) {
                                                realm.insertOrUpdate(events);
                                            }
                                        });
                                    } // auto-close
                                }
                            });
                }
            });
}

      

and here's the function in my activity where i am using it

private void getEvents() {
    Log.i("EVENTSELECTION", "STARTING");
    repository.all()
            .subscribe(new Subscriber<List<Event>>() {
                @Override
                public void onCompleted() {
                    Log.i("EVENTSELECTION", "Task Completed");
                    swipeRefreshLayout.setRefreshing(false);
                }

                @Override
                public void onError(Throwable e) {
                    Log.e("EVENTSELECTION", e.getMessage());
                    swipeRefreshLayout.setRefreshing(false);
                    e.printStackTrace();
                }

                @Override
                public void onNext(List<Event> events) {
                    Log.i("EVENTSELECTION", String.valueOf(events.size()));
                }
            });
}

      

Thank you very much.

+3


source to share


2 answers


Where did I go wrong?

Pass this:

1.

public Observable<RealmResults<Event>> all() {
    Realm realm = Realm.getDefaultInstance(); 

      

This opens a Realm instance that will never be closed. So Realm lifecycle management is wrong, refer to documentation for best practices .

2.

return realm.where(Event.class).findAllAsync()
        .asObservable() // <-- listens for changes in the Realm
// ...
        .doOnNext(new Action1<RealmResults<Event>>() {
            @Override
            public void call(RealmResults<Event> events) {
                service.getEvents() // <-- downloads data
                        .subscribeOn(Schedulers.io())
                        .subscribe(new Action1<List<Event>>() {

      

Basically you are saying that "if you make any changes to the data in the Realm, download the data from the service and write it to the Realm."

This will call the RealmChangeListener function, which will trigger the download, etc.



This is a conceptual error, you are using Area Notifications incorrectly.


RealmResults<T>

is not just a list of objects, it is also a subscription to changes. Therefore, you need to save it as a field reference and "stay connected with changes in the database".

RealmResults<Sth> results;
RealmChangeListener<RealmResults<Sth>> changeListener = (element) -> {
    if(element.isLoaded()) {
        adapter.updateData(element);
    }
};

void sth() {
    results = realm.where(Sth.class).findAllSortedAsync("id");
    results.addChangeListener(changeListener);
}

void unsth() {
    if(results != null && results.isValid()) {
        results.removeChangeListener(changeListener);
        results = null;
    }
}

      

In your case RealmResults<T>

, which symbolizes a subscription, and also provides access to current / new data, ends up as Observable<T>

to which you can create subscribers.

Observable<List<<Sth>> results;
Subscription subscription;
Action1<List<Sth>> changeListener = (element) -> {
    if(element.isLoaded()) {
        adapter.updateData(element);
    }
};

void sth() {
    results = realm.where(Sth.class).findAllSortedAsync("id").asObservable();
    subscription = results.subscribe(changeListener);
}

void unsth() {
    if(subscription != null && !subscription.isUnsubscribed()) {
        subscription.unsubscribe();
        subscription = null; 
        results = null;
    }
}

      

As you can see, you have a subscription at the beginning of the component and an unsubscribe at the end of the component.

The call is Observable.first()

wrong, there is no point in doing it for that. If you've seen this in any tutorial (I've seen it before ...) then this tutorial was wrong.

+2


source


So this is really by design

in scope and it won't call onCompleted, I added .first()

at the end of my function getEvents

to only get the first result.



0


source







All Articles