Creating objects in parallel with RxJava

I wrote a Spring Boot Microserver using RxJava (Aggregated Service) to implement the following simplified usecase. The big picture is when the instructor downloads a course content document, a set of questions must be generated and saved.

  • The user uploads the document to the system.
  • The system calls the document service to convert the document to text.
  • It then brings up another question creating a service to generate a lot of questions given the above text content.
  • Finally, these questions are sent to the main CRUD service for saving.

When a user uploads a document, many questions are generated from it (maybe hundreds or so). The problem here is that I am posting questions one at a time for the CRUD service to keep them. This slows things down dramatically due to heavy network calls using IO, so it takes about 20 seconds to complete the whole process. Here is the current code, assuming all questions are formulated.

questions.flatMapIterable(list -> list).flatMap(q -> createQuestion(q)).toList();

private Observable<QuestionDTO> createQuestion(QuestionDTO question) {
    return Observable.<QuestionDTO> create(sub -> {
        QuestionDTO questionCreated = restTemplate.postForEntity(QUESTIONSERVICE_API,
                new org.springframework.http.HttpEntity<QuestionDTO>(question), QuestionDTO.class).getBody();
        sub.onNext(questionCreated);
        sub.onCompleted();
    }).doOnNext(s -> log.debug("Question was created successfully."))
            .doOnError(e -> log.error("An ERROR occurred while creating a question: " + e.getMessage()));
}

      

Now my requirement is to post all questions in parallel with the CRUD service and merge the results on completion. Also note that the CRUD service will only accept one question object at a time and cannot be changed. I know that I can use an operator for this purpose Observable.zip

, but I don't know how to apply it in this context, since the actual number of questions is not predefined. How do I change the code on line 1 so that I can improve the performance of the application. Any help is appreciated.

+3


source to share


1 answer


By default, observers in flatMap

work on the same scheduler that you signed it on. To run watched observers at the same time, createQuestion

you need to subscribe to a compute scheduler.

questions.flatMapIterable(list -> list)
        .flatMap(q -> createQuestion(q).subscribeOn(Schedulers.computation()))
        .toList();

      



Check out this article for a full explanation.

+1


source







All Articles