Executing Asynchronous Tasks in Parallel

In RxJS, when you want to run HTTP requests sequentially, you chain them. But I don't understand how I can run queries in parallel? I saw in the examples http://reactive-extensions.github.io/learnrx/ that they use Observable.zip () to run 2 requests in parallel. But how will you run 5 queries in parallel? More specifically, how can I configure so that my function is called:

  • when are all 5 completed?
  • at the first filling?
+4


source to share


5 answers


This is a pretty old question, but without an accepted answer. The answer you're looking for may be surprisingly simple: concatMap .

When a promise is created, it starts executing immediately, so they run in parallel; while when values ​​are emitted from one observable, they are in serial mode.



So combine the two, for the next piece of code, observables from promises are executed in parallel and the result is emitted in sequential order because concatMap puts them on the same thread in the order in which they were created.

Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
  function(jsonObj) {
    // first result will arrive first
  },
  function(err) { },
  function() {
    // all completed
  }
)

      

+1


source


.zip () can help you with this!

const a$ = Observable.interval(200).take(6) 
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$)
  .subscribe(v=>console.log(v))


// marble 
-0-1-2-3-4-5|    (a$)
--0--1--2--3--4| (b$)
---0---1---2|    (c$)
  zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|

// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]

      

.zip (arg1, arg2, (self, arg1, arg2) => doSomething ())

const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
  .zip(b$,a$, (c,b,a)=>a+b+c)
  .subscribe(v=>console.log(v))

// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)

      



or

merge () + flatMap ()

import Rx, { Observable } from 'rxjs'
import axios from 'axios'

const promiseA = axios.get('https://jsonplaceholder.typicode.com/users/1')
    , promiseB = axios.get('https://jsonplaceholder.typicode.com/users/2')
    , promiseC = axios.get('https://jsonplaceholder.typicode.com/users/3')

Observable.interval(0).take(1)
  .flatMap(()=>Observable.merge(promiseA, promiseB, promiseC)) 
   // flatMap will resolve the promise for you!
  .map(res=>res.data.username)
  .reduce((arr,item)=>arr.concat(item),[])
  .subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]

      

0


source


Use kombinirovatPosledny or ForkJoin !

// Assume you have an array of urls
const urls = [
  "twitter.com/puppies.json", 
  "google.com/puppies.json", 
  "facebook.com/puppies.json"
];

// Let map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))

// Now combine the result from each request into an observable
// Here combineLatest:
const allThePuppies$ = Rx.Observable.combineLatest(...urls)
// Alternatively, here forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)


// When you subscribe to 'allThePuppies$', you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
  const twitterPuppies, googlePuppies, facebookPuppies = results;
  // Do what you must with the respective responses
  // (Presumably in this example you'd show your users some adorable pics of puppies)
})

      

combineLatest

takes an arbitrary number of observables, and as soon as each of them emits at least one value, it will return an array of the last value from each observable when any of these observables fire.

It's terribly abstract. For our purposes, we know that multiple AJAX requests will only be sent once. So if we use combineLatest

ajax for multiple observables, we get an observable that produces an array of results from each of the ajax requests.

forkJoin

is similar combineLatest

, but it emits its response array only after each component of the observable completes.

0


source


-1


source


You can take a look at https://www.npmjs.org/package/async

This is a node module that can also be used in the browser.

-4


source







All Articles