Executing Asynchronous Tasks in Parallel
In RxJS, when you want to run HTTP requests sequentially, you chain them. But I don't understand how I can run queries in parallel? I saw in the examples http://reactive-extensions.github.io/learnrx/ that they use Observable.zip () to run 2 requests in parallel. But how will you run 5 queries in parallel? More specifically, how can I configure so that my function is called:
- when are all 5 completed?
- at the first filling?
source to share
This is a pretty old question, but without an accepted answer. The answer you're looking for may be surprisingly simple: concatMap .
When a promise is created, it starts executing immediately, so they run in parallel; while when values ββare emitted from one observable, they are in serial mode.
So combine the two, for the next piece of code, observables from promises are executed in parallel and the result is emitted in sequential order because concatMap puts them on the same thread in the order in which they were created.
Rx.Observable.from(urls_array)
.concatMap(function(url) { return Rx.Observable.fromPromise(Promise.resolve($.get(url))) })
.subscribe(
function(jsonObj) {
// first result will arrive first
},
function(err) { },
function() {
// all completed
}
)
source to share
.zip () can help you with this!
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$)
.subscribe(v=>console.log(v))
// marble
-0-1-2-3-4-5| (a$)
--0--1--2--3--4| (b$)
---0---1---2| (c$)
zip(a$, b$)
---[0,0,0]---[1,1,1]---[2,2,2]|
// console.log
[0,0,0]
pause(400ms)
[1,1,1]
pause(400ms)
[2,2,3]
.zip (arg1, arg2, (self, arg1, arg2) => doSomething ())
const a$ = Observable.interval(200).take(6)
const b$ = Observable.interval(300).take(10)
const c$ = Observable.interval(400).take(3)
.zip(b$,a$, (c,b,a)=>a+b+c)
.subscribe(v=>console.log(v))
// console.log()
0
pause(400ms)
3 = (1+1+1)
pause(400ms)
9 = (3+3+3)
or
merge () + flatMap ()
import Rx, { Observable } from 'rxjs'
import axios from 'axios'
const promiseA = axios.get('https://jsonplaceholder.typicode.com/users/1')
, promiseB = axios.get('https://jsonplaceholder.typicode.com/users/2')
, promiseC = axios.get('https://jsonplaceholder.typicode.com/users/3')
Observable.interval(0).take(1)
.flatMap(()=>Observable.merge(promiseA, promiseB, promiseC))
// flatMap will resolve the promise for you!
.map(res=>res.data.username)
.reduce((arr,item)=>arr.concat(item),[])
.subscribe(v=>console.log(v)) // [ 'Samantha', 'Antonette', 'Bret' ]
source to share
Use kombinirovatPosledny or ForkJoin !
// Assume you have an array of urls
const urls = [
"twitter.com/puppies.json",
"google.com/puppies.json",
"facebook.com/puppies.json"
];
// Let map these urls to Ajax Observables
const requests = urls.map(url => Rx.DOM.Ajax.getJSON(url))
// Now combine the result from each request into an observable
// Here combineLatest:
const allThePuppies$ = Rx.Observable.combineLatest(...urls)
// Alternatively, here forkJoin:
const allThePuppies$ = Rx.Observable.forkJoin(urls)
// When you subscribe to 'allThePuppies$', you'll kick off all your requests in parallel, and your response will contain an array with the results from each request:
allThePuppies$.subscribe(results => {
const twitterPuppies, googlePuppies, facebookPuppies = results;
// Do what you must with the respective responses
// (Presumably in this example you'd show your users some adorable pics of puppies)
})
combineLatest
takes an arbitrary number of observables, and as soon as each of them emits at least one value, it will return an array of the last value from each observable when any of these observables fire.
It's terribly abstract. For our purposes, we know that multiple AJAX requests will only be sent once. So if we use combineLatest
ajax for multiple observables, we get an observable that produces an array of results from each of the ajax requests.
forkJoin
is similar combineLatest
, but it emits its response array only after each component of the observable completes.
source to share
Maybe this link is useful to you http://xgrommx.github.io/rx-book/content/core_objects/observable/observable_methods/forkjoin.html
source to share
You can take a look at https://www.npmjs.org/package/async
This is a node module that can also be used in the browser.
source to share