Using RxJava to Handle a Changing Object Stream
I am trying to process a stream of objects (via a JSon HTTP request).
The observer returns items like this:
"2015-05-06T13: 24: 20Z", Foo, Foo, 1, 2, 3, Foo, Foo
The first item is the timestamp, then the Foo objects to store in the db, then the IDs that represent the Foo objects that need to be removed from the db, and finally the Foo objects that need to be updated (I'm going to make an upsert for them).
My current implementation looks like this:
public void updateFoos(final CallBack callBack) {
final String lastFooUpdateTimestamp = localStorage.getLastFooUpdateTimestamp();
fooService.getFoos(lastFooUpdateTimestamp)
.subscribe(new Subscriber<Object>() {
@Override
public void onCompleted() {
callBack.onSuccess();
}
@Override
public void onError(Throwable e) {
}
@Override
public void onNext(Object o) {
if (o instanceof String) {
localStorage.setLastFooUpdateTimestamp((String) o);
}
if (o instanceof Foo) {
databaseManager.save((Foo) o);
}
}
});
}
There are a number of problems:
- checking for instances is not very RxJavay, is there a better way?
- the timestamp is always the first field, is there anyway to express this cleanly?
- I also want batch db inserts, so having a separate block for working with Foo objects that will also use them would be good.
- Is there a better design where I am emitting multiple observables by type? But how can I subscribe to multiple observers?
+3
source to share
1 answer
Here's an example of how this can be done using RxJava:
public class MultikindSource {
enum ValueType {
TIMESTAMP,
NUMBER,
FOO
}
static final class Foo { }
static Observable<Object> source(String timestamp) {
return Observable.from(Arrays.asList(timestamp, new Foo(), new Foo(),
1, 2, 3, new Foo()));
}
public static void main(String[] args) {
Func1<Object, ValueType> keySelector = o -> {
if (o instanceof String) {
return ValueType.TIMESTAMP;
} else
if (o instanceof Foo) {
return ValueType.FOO;
}
return ValueType.NUMBER;
};
AtomicReference<String> lastTimestamp = new AtomicReference<>(
"2015-05-08T11:38:00.000Z");
source(lastTimestamp.get())
.groupBy(keySelector)
.flatMap(g -> {
if (g.getKey() == ValueType.TIMESTAMP) {
g.subscribe(v -> {
System.out.println("Updating timestamp to " + v);
lastTimestamp.set((String)v);
});
} else
if (g.getKey() == ValueType.FOO) {
g.buffer(2).subscribe(v ->
System.out.println("Batch inserting " + v));
} else {
g.subscribe(v ->
System.out.println("Got some number: " + v));
}
return Observable.just(1);
}).count().subscribe(v ->
System.out.println("Got " + v + " kinds of events."));
}
}
Essentially, you group the original data with some kind of enumeration, then link to those groups and subscribe to them to get the job done.
+4
source to share