What's the best way to limit concurrency when using ES6 Promise.all ()?

I have code that iterates over a list that was requested from a database and makes an HTTP request for each item in that list. This list can sometimes be quite large (in the thousands) that I would like to make sure I am not on a web server with thousands of concurrent HTTP requests.

An abbreviated version of this code currently looks something like this:

function getCounts() {
  return users.map(user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
      });
    });
  });
}

Promise.all(getCounts()).then(() => { /* snip */});

      

This code works on Node 4.3.2. To recap, is it possible to Promise.all

manage so that only a certain number of Promises are executed at any given time?

+59


source to share


12 replies


Note that it Promise.all()

does not start promises to start its work by creating the promise itself.

With that in mind, one solution would be to check when the promise is resolved, whether you need to start a new promise or if you're already at the limit.



However, there is no need to reinvent the wheel here. One library you could use for this purpose ises6-promise-pool

. From their examples:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () {
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 
}

// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () {
  console.log('All promises fulfilled')
}, function (error) {
  console.log('Some promise rejected: ' + error.message)
})

      

+39


source


S-Terminal

I have compared the promise concurrency limit with custom script, bluebird, es6-promise-pool and p-limit. I believe p-limit has the simplest, stripped-down implementation for this need. See their documentation .

Requirements

Be async compliant in the example



My example

In this example, we need to run a function for each URL in the array (for example, maybe an API request). It's called here fetchData()

. If we had an array of thousands of items to process, parallelism would definitely be useful to save CPU and memory resources.

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => {

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
});

(async () => {
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
})();

      

The console log result is an array of response data for your resolved promises.

+33


source


Bluebird Promise.map can use the concurrency option to control how many promises should be executed in parallel. Sometimes it's easier than .all

because you don't need to create an array of promises.

const Promise = require('bluebird')

function getCounts() {
  return Promise.map(users, user => {
    return new Promise(resolve => {
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => {
        /* snip */
        resolve();
       });
    });
  }, {concurrency: 10}); // <---- at most 10 http requests at a time
}

      

+13


source


Instead of using promises to restrict http requests, use the built-in http.Agent.maxSockets host . This removes the need to use a library or write your own pooling code and has the added benefit of giving you better control over what you are limiting.

agent.maxSockets

The default is infinity. Determines how many concurrent sockets the agent can open for each source. The source is either a host: port or host: port: local address combination.

For example:

var http = require('http');
var agent = new http.Agent({maxSockets: 5}); // 5 concurrent connections per origin
var request = http.request({..., agent: agent}, ...);

      

If you are making multiple requests to the same source, you may also find it helpful to set this to keepAlive

true (see the documentation above for more information).

+5


source


If you know how iterators work and how they are used, you do not need an additional library, since it is very easy to create your own parallelism. Let me demonstrate:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) {
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) {
    console.log('y:', y)
  }
}
      

Run codeHide result


We can use the same iterator and share it among workers.
If you used .entries()

instead .values()

you would get a two-dimensional array with [index, value]

which I will demonstrate below with parallelism 2

const sleep = n => new Promise(rs => setTimeout(rs,n))

async function doWork(iterator) {
  for (let [index, item] of iterator) {
    await sleep(1000)
    console.log(index + ': ' + item)
  }
}

const arr = Array.from('abcdefghij')
const workers = new Array(2).fill(arr.entries()).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.all(workers).then(() => console.log('done'))
      

Run codeHide result



Note: The difference from this compared to the asynchronous pool example is that it spawns two workers, so if one worker throws an error for some reason, say at index 5, it won't stop the other worker from executing the others. So you go from doing 2 parallel operations to 1. (so you don't stop there). And then it will be more difficult to know when all the workers are done, as it Promise.all

will be released early if a failure occurs. Therefore, I advise you to catch all errors inside the function.doWork

+5


source


Here is a basic example for streaming and "p-limit". This is a streaming reading of an HTTP stream in the Mongo database.

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = {
    dbURL: 'yr-db-url',
    collection: 'some-collection'
};
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => {
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => {
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) {

                            data.someData = someData;    
                            done(null, data);
                        },
                        function handleError(error) {
                            done(error)
                        }
                    );
                })

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        }

      

+1


source


This can be solved using recursion.

The idea is that initially you are sending the maximum number of requests allowed, and each of these requests should recursively continue to send itself after it completes.

function batchFetch(urls, concurrentRequestsLimit) {
    return new Promise(resolve => {
        var documents = [];
        var index = 0;

        function recursiveFetch() {
            if (index === urls.length) {
                return;
            }
            fetch(urls[index++]).then(r => {
                documents.push(r.text());
                if (documents.length === urls.length) {
                    resolve(documents);
                } else {
                    recursiveFetch();
                }
            });
        }

        for (var i = 0; i < concurrentRequestsLimit; i++) {
            recursiveFetch();
        }
    });
}

var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => {
   console.log(documents);
});

      

+1


source


So I tried to get some of the examples shown to work for my code, but since this was only for the import script and not for production code, using the npm package promises was undoubtedly the easiest route for me

NOTE. It takes a runtime to support the Promise or fill it.

Api batchPromises (int: batchSize, array: Collection, i => Promise: Iteratee) Promise: Iteratee will be called after every batch.

Using:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => {
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => {
    resolve(i);
  }, 100);
}))
.then(results => {
  console.log(results); // [1,2,3,4,5]
});
      

Run codeHide result


0


source


This is what I did with Promise.race

, inside my code here

const identifyTransactions = async function() {
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) {
    if (concurrency > 4)
      await Promise.race(promises).then(r => { promises = []; concurrency = 0 })
    promises.push(tx.identifyTransaction())
    concurrency++
  }
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest
}

      

If you want to see an example: https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

0


source


Recursion is the answer if you don't want to use external libraries

downloadAll(someArrayWithData){
  var self = this;

  var tracker = function(next){
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function(){
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length){//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      }
    });
  }

  return tracker(0); 
}

      

0


source


I suggest the async-pool library: https://github.com/rxaviers/async-pool

Fulfill multiple promises returning & bounded concurrency async functions using native ES6 / ES7

asyncPool performs several return-promising & asynchronous functions in a limited concurrency pool. He rejects as soon as one of the promises is rejected. This decides when all the promises are completed. It calls the iterator function as soon as possible (while limiting concurrency).

Installation:

npm install tiny-async pool

0


source


This becomes relatively trivial with async / await, whichever you want, this translates well to a delayed map or forEach, here is the map implementation.

const sleep = ms => new Promise(resolve => setTimeout(resolve, ms))

const delayMap = async (ms, arr, f) => {
  const results = []
  let i = 0
  for (const item of arr) {
    results.push(await f(item, i++, arr))
    await sleep(ms)
  }
  return results
}

// Example use - delaying 1 second between each call
delayMap(1000, [ 1, 2, 3 ], id => 
  fetch(`https://jsonplaceholder.typicode.com/posts/${id}`)
)
  .then(posts => posts.map(post => post.json()))
  .then(Promise.all.bind(Promise))
  .then(posts => console.log('Posts', posts))

      

-1


source







All Articles