What's the best way to limit concurrency when using ES6 Promise.all ()?
I have code that iterates over a list that was requested from a database and makes an HTTP request for each item in that list. This list can sometimes be quite large (in the thousands) that I would like to make sure I am not on a web server with thousands of concurrent HTTP requests.
An abbreviated version of this code currently looks something like this:
function getCounts() {
return users.map(user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
});
}
Promise.all(getCounts()).then(() => { /* snip */});
This code works on Node 4.3.2. To recap, is it possible to Promise.all
manage so that only a certain number of Promises are executed at any given time?
source to share
Note that it Promise.all()
does not start promises to start its work by creating the promise itself.
With that in mind, one solution would be to check when the promise is resolved, whether you need to start a new promise or if you're already at the limit.
However, there is no need to reinvent the wheel here. One library you could use for this purpose ises6-promise-pool
. From their examples:
// On the Web, leave out this line and use the script tag above instead.
var PromisePool = require('es6-promise-pool')
var promiseProducer = function () {
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
}
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function () {
console.log('All promises fulfilled')
}, function (error) {
console.log('Some promise rejected: ' + error.message)
})
source to share
S-Terminal
I have compared the promise concurrency limit with custom script, bluebird, es6-promise-pool and p-limit. I believe p-limit has the simplest, stripped-down implementation for this need. See their documentation .
Requirements
Be async compliant in the example
- ECMAScript 2017 (version 8)
- Node version> 8.2.1
My example
In this example, we need to run a function for each URL in the array (for example, maybe an API request). It's called here fetchData()
. If we had an array of thousands of items to process, parallelism would definitely be useful to save CPU and memory resources.
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
});
(async () => {
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
})();
The console log result is an array of response data for your resolved promises.
source to share
Bluebird Promise.map can use the concurrency option to control how many promises should be executed in parallel. Sometimes it's easier than .all
because you don't need to create an array of promises.
const Promise = require('bluebird')
function getCounts() {
return Promise.map(users, user => {
return new Promise(resolve => {
remoteServer.getCount(user) // makes an HTTP request
.then(() => {
/* snip */
resolve();
});
});
}, {concurrency: 10}); // <---- at most 10 http requests at a time
}
source to share
Instead of using promises to restrict http requests, use the built-in http.Agent.maxSockets host . This removes the need to use a library or write your own pooling code and has the added benefit of giving you better control over what you are limiting.
agent.maxSockets
The default is infinity. Determines how many concurrent sockets the agent can open for each source. The source is either a host: port or host: port: local address combination.
For example:
var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);
If you are making multiple requests to the same source, you may also find it helpful to set this to keepAlive
true (see the documentation above for more information).
source to share
If you know how iterators work and how they are used, you do not need an additional library, since it is very easy to create your own parallelism. Let me demonstrate:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator) {
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator) {
console.log('y:', y)
}
}
We can use the same iterator and share it among workers.
If you used .entries()
instead .values()
you would get a two-dimensional array with [index, value]
which I will demonstrate below with parallelism 2
const sleep = n => new Promise(rs => setTimeout(rs,n))
async function doWork(iterator) {
for (let [index, item] of iterator) {
await sleep(1000)
console.log(index + ': ' + item)
}
}
const arr = Array.from('abcdefghij')
const workers = new Array(2).fill(arr.entries()).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.all(workers).then(() => console.log('done'))
Note: The difference from this compared to the asynchronous pool example is that it spawns two workers, so if one worker throws an error for some reason, say at index 5, it won't stop the other worker from executing the others. So you go from doing 2 parallel operations to 1. (so you don't stop there). And then it will be more difficult to know when all the workers are done, as it Promise.all
will be released early if a failure occurs. Therefore, I advise you to catch all errors inside the function.doWork
source to share
Here is a basic example for streaming and "p-limit". This is a streaming reading of an HTTP stream in the Mongo database.
const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const pipeline = util.promisify(stream.pipeline)
const outputDBConfig = {
dbURL: 'yr-db-url',
collection: 'some-collection'
};
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) => {
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) => {
let someDataPromise = limit(() => yr_async_call_to_somewhere())
someDataPromise.then(
function handleResolve(someData) {
data.someData = someData;
done(null, data);
},
function handleError(error) {
done(error)
}
);
})
await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
}
source to share
This can be solved using recursion.
The idea is that initially you are sending the maximum number of requests allowed, and each of these requests should recursively continue to send itself after it completes.
function batchFetch(urls, concurrentRequestsLimit) {
return new Promise(resolve => {
var documents = [];
var index = 0;
function recursiveFetch() {
if (index === urls.length) {
return;
}
fetch(urls[index++]).then(r => {
documents.push(r.text());
if (documents.length === urls.length) {
resolve(documents);
} else {
recursiveFetch();
}
});
}
for (var i = 0; i < concurrentRequestsLimit; i++) {
recursiveFetch();
}
});
}
var sources = [
'http://www.example_1.com/',
'http://www.example_2.com/',
'http://www.example_3.com/',
...
'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
console.log(documents);
});
source to share
So I tried to get some of the examples shown to work for my code, but since this was only for the import script and not for production code, using the npm package promises was undoubtedly the easiest route for me
NOTE. It takes a runtime to support the Promise or fill it.
Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee will be called after every batch.
Using:
batch-promises
Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
Use:
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() => {
resolve(i);
}, 100);
}))
.then(results => {
console.log(results); // [1,2,3,4,5]
});
source to share
This is what I did with Promise.race
, inside my code here
const identifyTransactions = async function() {
let promises = []
let concurrency = 0
for (let tx of this.transactions) {
if (concurrency > 4)
await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
promises.push(tx.identifyTransaction())
concurrency++
}
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
}
If you want to see an example: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
source to share
Recursion is the answer if you don't want to use external libraries
downloadAll(someArrayWithData){
var self = this;
var tracker = function(next){
return self.someExpensiveRequest(someArrayWithData[next])
.then(function(){
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length){//Did I finish processing all my data?
return tracker(next);//Go to the next promise
}
});
}
return tracker(0);
}
source to share
I suggest the async-pool library: https://github.com/rxaviers/async-pool
Fulfill multiple promises returning & bounded concurrency async functions using native ES6 / ES7
asyncPool performs several return-promising & asynchronous functions in a limited concurrency pool. He rejects as soon as one of the promises is rejected. This decides when all the promises are completed. It calls the iterator function as soon as possible (while limiting concurrency).
Installation:
npm install tiny-async pool
source to share
This becomes relatively trivial with async / await, whichever you want, this translates well to a delayed map or forEach, here is the map implementation.
const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))
const delayMap = async (ms, arr, f) => {
const results = []
let i = 0
for (const item of arr) {
results.push(await f(item, i++, arr))
await sleep(ms)
}
return results
}
// Example use - delaying 1 second between each call
delayMap(1000, [ 1, 2, 3 ], id =>
fetch(`https://jsonplaceholder.typicode.com/posts/${id}`)
)
.then(posts => posts.map(post => post.json()))
.then(Promise.all.bind(Promise))
.then(posts => console.log('Posts', posts))
source to share