RxJava + Retrofit + Realm makes an unlimited get request
I am completely new to rxJava and it is really confusing, I want to shutdown my application first and I decided to use Realm and Retrofit. First I want to get data from upgrade and then get data from my remote Then, webservice, use realm insertOrUpdate
to merge remote objects with local. I can go through this process so far, but when I browse my web requests on stetho this method ends endless requests. Where am I wrong? Here the function
public Observable<RealmResults<Event>> all() {
Realm realm = Realm.getDefaultInstance();
return realm.where(Event.class).findAllAsync()
.asObservable()
.filter(new Func1<RealmResults<Event>, Boolean>() {
@Override
public Boolean call(RealmResults<Event> events) {
return events.isLoaded();
}
})
.doOnNext(new Action1<RealmResults<Event>>() {
@Override
public void call(RealmResults<Event> events) {
service.getEvents()
.subscribeOn(Schedulers.io())
.subscribe(new Action1<List<Event>>() {
@Override
public void call(final List<Event> events) {
try(Realm realm = Realm.getDefaultInstance()) {
realm.executeTransaction(new Realm.Transaction() {
@Override
public void execute(Realm realm) {
realm.insertOrUpdate(events);
}
});
} // auto-close
}
});
}
});
}
and here's the function in my activity where i am using it
private void getEvents() {
Log.i("EVENTSELECTION", "STARTING");
repository.all()
.subscribe(new Subscriber<List<Event>>() {
@Override
public void onCompleted() {
Log.i("EVENTSELECTION", "Task Completed");
swipeRefreshLayout.setRefreshing(false);
}
@Override
public void onError(Throwable e) {
Log.e("EVENTSELECTION", e.getMessage());
swipeRefreshLayout.setRefreshing(false);
e.printStackTrace();
}
@Override
public void onNext(List<Event> events) {
Log.i("EVENTSELECTION", String.valueOf(events.size()));
}
});
}
Thank you very much.
source to share
Where did I go wrong?
Pass this:
1.
public Observable<RealmResults<Event>> all() {
Realm realm = Realm.getDefaultInstance();
This opens a Realm instance that will never be closed. So Realm lifecycle management is wrong, refer to documentation for best practices .
2.
return realm.where(Event.class).findAllAsync()
.asObservable() // <-- listens for changes in the Realm
// ...
.doOnNext(new Action1<RealmResults<Event>>() {
@Override
public void call(RealmResults<Event> events) {
service.getEvents() // <-- downloads data
.subscribeOn(Schedulers.io())
.subscribe(new Action1<List<Event>>() {
Basically you are saying that "if you make any changes to the data in the Realm, download the data from the service and write it to the Realm."
This will call the RealmChangeListener function, which will trigger the download, etc.
This is a conceptual error, you are using Area Notifications incorrectly.
RealmResults<T>
is not just a list of objects, it is also a subscription to changes. Therefore, you need to save it as a field reference and "stay connected with changes in the database".
RealmResults<Sth> results;
RealmChangeListener<RealmResults<Sth>> changeListener = (element) -> {
if(element.isLoaded()) {
adapter.updateData(element);
}
};
void sth() {
results = realm.where(Sth.class).findAllSortedAsync("id");
results.addChangeListener(changeListener);
}
void unsth() {
if(results != null && results.isValid()) {
results.removeChangeListener(changeListener);
results = null;
}
}
In your case RealmResults<T>
, which symbolizes a subscription, and also provides access to current / new data, ends up as Observable<T>
to which you can create subscribers.
Observable<List<<Sth>> results;
Subscription subscription;
Action1<List<Sth>> changeListener = (element) -> {
if(element.isLoaded()) {
adapter.updateData(element);
}
};
void sth() {
results = realm.where(Sth.class).findAllSortedAsync("id").asObservable();
subscription = results.subscribe(changeListener);
}
void unsth() {
if(subscription != null && !subscription.isUnsubscribed()) {
subscription.unsubscribe();
subscription = null;
results = null;
}
}
As you can see, you have a subscription at the beginning of the component and an unsubscribe at the end of the component.
The call is Observable.first()
wrong, there is no point in doing it for that. If you've seen this in any tutorial (I've seen it before ...) then this tutorial was wrong.
source to share