Design API using Java RX statements

I am new to RX-Java and I am trying to create an API where the flow is below:

            Make REST call A to load data
                        |
                        |
        data not found? |  data found  
    ------------------------------------    
    |                                  |           
    |                                  |            
    |                                  |
Make REST Call B                  Load DB Data 1     
    |                                  |            
    |                                  |
    |                              _________________________
    |                             |       Parallel         |
    |                             |                        |
    |                             |                        |
    |                  (condition using DB data 1)   (condition using DB data 1)
    |                      Load REST Data C                Load DB Data 2
    |                             |                        |
    |                             |________________________|
    |                                         |
    |                                         |
Build Response                            Build Response

      

Assuming DB methods and service calls are returning. Observable, need clarity with skeleton flow for above using rx statements?

I'll cover the blocking pseudo code below:

Response = REST_Call_1(); // on error throw Exception

if (isResponseValid(response)) { // returns Boolean
    if (responseUnderReview(response)) { // validation func 
        throw Exception;
    } else {
        //db_data_1 and db_data_2 can be parallel
        db_data_1 = Load_DB_Data_1();

        // Load data_3 and data_2 based on db_data_1
        if (is_data_3_required(db_data_1)) {
            data_3 = REST_call_2();
        }
        if (is_data_2_required(db_data_1)) {
            db_data_2 = REST_call_2();
        }
        buildResponse(db_data_1, db_data_2, data_3, Response);
    }
} else {
    Response = REST_Call_3(); // on error throw Exception
    buildResponse(response);
}

      

I am considering a complete non-blocking asynchronous approach.

+3


source to share


2 answers


Tassos answer looks reasonable to me and I don't see any other problems, but

// dataSource.getItemDetails(activityId returns List<ItemDetail> and is a blocking call
// So, I want to run it on a separate IO thread.
return Observable.from(dataSource.getItemDetails(activityId)).observeOn(Schedulers.io());

      

If so, including a blocking singleton call in an out-of-thread call can be done like this:

Observable.fromCallable(() -> yourBlockingCall())
.subscribeOn(Schedulers.io())
.flatMapIterable(v -> v)
...

      



or

Observable.defer(() -> Observable.from(yourBlockingCall()))
.subscribeOn(Schedulers.io())
...

      

Edit . Based on the diagram, I have established the following flow:

serviceCallA()
.flatMap(a -> {
    if (dataFound(a)) {
        return dbCall1()
           .flatMap(db1 -> {
               Observable o1 = shouldCallServiceC(db1) 
                    ? serviceCallC() : just(placeholderC);
               Observable o2 = shouldCallDB2(db1) 
                    ? dbCall2() ? just(placeHolderDb2);

               return zip(o1, o2, (c, d) -> createResult(c, d));   
           });
    }
    return serviceCallB()
        .map(c -> mapToResultType(c));
});

      

+1


source


The general flow of logic can be as follows:

retrofitClient
.loadData()...
.onErrorResumeNext(Observable.empty()) // or handle specific errors only
.flatMap(foundData -> 
    Observable.zip(
       database.call1(foundData),
       database.call2(foundData),
       (call1, call2) -> buildResponse(call1,call2)
    )
 )
 .switchIfEmpty(() ->
    retrofitClient
    .callB()
    .map(response -> buildResponse(response))
 )

      

Please note that if there is complex logic in the stream, I always try to extract it into separate methods. In your case, a database call based on REST calls might involve some transformations - if the resulting logic is larger than a line or two, I would move it to a separate method and use the method reference in the RX stream.



The idea is to have a stream somehow that can be looked at and analyzed on the same page, and hide the implementation details in the methods.

Edit: After editing, maybe this makes sense:

REST_Call_1()
.filter(response -> isResponseValid(response))
.flatMap(response ->
     isResponseUnderReview(response)
     ? Observable.error(new Exception())
     : Observable.just(response)
)
.flatMap(foundData -> 
    Observable.zip(
       fetchData13(foundData),
       Load_DB_Data_2(foundData),
       (data13, call2) -> buildResponse(data13.getLeft(),call2,data13.getRight())
    )
 )
 .switchIfEmpty(() ->
    REST_Call_3()
    .flatMap(response -> buildResponse(response))
 )
 .subscribe(....)

 private Observable<Pair<DbData1, DbData3>> fetchData13(foundData) {
    return
    Load_DB_Data_1()
    .flatMap(data1 -> is_data_3_required(data1)
        ? REST_call_2().map(data3 -> Pair.of(data1, data3))
        : Pair.of(data1, null));
 }

      

+4


source







All Articles