Design API using Java RX statements
I am new to RX-Java and I am trying to create an API where the flow is below:
Make REST call A to load data
|
|
data not found? | data found
------------------------------------
| |
| |
| |
Make REST Call B Load DB Data 1
| |
| |
| _________________________
| | Parallel |
| | |
| | |
| (condition using DB data 1) (condition using DB data 1)
| Load REST Data C Load DB Data 2
| | |
| |________________________|
| |
| |
Build Response Build Response
Assuming DB methods and service calls are returning. Observable, need clarity with skeleton flow for above using rx statements?
I'll cover the blocking pseudo code below:
Response = REST_Call_1(); // on error throw Exception
if (isResponseValid(response)) { // returns Boolean
if (responseUnderReview(response)) { // validation func
throw Exception;
} else {
//db_data_1 and db_data_2 can be parallel
db_data_1 = Load_DB_Data_1();
// Load data_3 and data_2 based on db_data_1
if (is_data_3_required(db_data_1)) {
data_3 = REST_call_2();
}
if (is_data_2_required(db_data_1)) {
db_data_2 = REST_call_2();
}
buildResponse(db_data_1, db_data_2, data_3, Response);
}
} else {
Response = REST_Call_3(); // on error throw Exception
buildResponse(response);
}
I am considering a complete non-blocking asynchronous approach.
source to share
Tassos answer looks reasonable to me and I don't see any other problems, but
// dataSource.getItemDetails(activityId returns List<ItemDetail> and is a blocking call
// So, I want to run it on a separate IO thread.
return Observable.from(dataSource.getItemDetails(activityId)).observeOn(Schedulers.io());
If so, including a blocking singleton call in an out-of-thread call can be done like this:
Observable.fromCallable(() -> yourBlockingCall())
.subscribeOn(Schedulers.io())
.flatMapIterable(v -> v)
...
or
Observable.defer(() -> Observable.from(yourBlockingCall()))
.subscribeOn(Schedulers.io())
...
Edit . Based on the diagram, I have established the following flow:
serviceCallA()
.flatMap(a -> {
if (dataFound(a)) {
return dbCall1()
.flatMap(db1 -> {
Observable o1 = shouldCallServiceC(db1)
? serviceCallC() : just(placeholderC);
Observable o2 = shouldCallDB2(db1)
? dbCall2() ? just(placeHolderDb2);
return zip(o1, o2, (c, d) -> createResult(c, d));
});
}
return serviceCallB()
.map(c -> mapToResultType(c));
});
source to share
The general flow of logic can be as follows:
retrofitClient
.loadData()...
.onErrorResumeNext(Observable.empty()) // or handle specific errors only
.flatMap(foundData ->
Observable.zip(
database.call1(foundData),
database.call2(foundData),
(call1, call2) -> buildResponse(call1,call2)
)
)
.switchIfEmpty(() ->
retrofitClient
.callB()
.map(response -> buildResponse(response))
)
Please note that if there is complex logic in the stream, I always try to extract it into separate methods. In your case, a database call based on REST calls might involve some transformations - if the resulting logic is larger than a line or two, I would move it to a separate method and use the method reference in the RX stream.
The idea is to have a stream somehow that can be looked at and analyzed on the same page, and hide the implementation details in the methods.
Edit: After editing, maybe this makes sense:
REST_Call_1()
.filter(response -> isResponseValid(response))
.flatMap(response ->
isResponseUnderReview(response)
? Observable.error(new Exception())
: Observable.just(response)
)
.flatMap(foundData ->
Observable.zip(
fetchData13(foundData),
Load_DB_Data_2(foundData),
(data13, call2) -> buildResponse(data13.getLeft(),call2,data13.getRight())
)
)
.switchIfEmpty(() ->
REST_Call_3()
.flatMap(response -> buildResponse(response))
)
.subscribe(....)
private Observable<Pair<DbData1, DbData3>> fetchData13(foundData) {
return
Load_DB_Data_1()
.flatMap(data1 -> is_data_3_required(data1)
? REST_call_2().map(data3 -> Pair.of(data1, data3))
: Pair.of(data1, null));
}
source to share