ES6 Promise to replace async.eachLimit / async.mapLimit

In async , if I need to apply an asynchronous function to 1000 elements, I can do it with

async.mapLimit(items, 10, (item, callback) => {
    foo(item, callback);
});

      

to only process 10 items at a time, limiting overhead and allowing management.

With ES6 promise, while I can easily do:

Promise.all(items.map((item) => {
    return bar(item);
}));

      

which would process all 1000 elements at the same time, which can cause a lot of problems.

I know Bluebird has ways to handle this , but I am looking for an ES6 solution.

+3


source to share


2 answers


There's nothing built in there, but you can of course combine them into promise chains and use them Promise.all

in the resulting array of chains:

const items = /* ...1000 items... */;
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    // What chain do we add it to?
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        // New chain
        chain = promises[chainNum] = Promise.resolve();
    }
    // Add it
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []));

      

Here's an example showing how many concurrent promises there are at any given time (and also showing when each "chain" is complete and only does 200 instead of 1000):



const items = buildItems();
const concurrencyLimit = 10;
const promise = Promise.all(items.reduce((promises, item, index) => {
    const chainNum = index % concurrencyLimit;
    let chain = promises[chainNum];
    if (!chain) {
        chain = promises[chainNum] = Promise.resolve();
    }
    promises[chainNum] = chain.then(_ => foo(item));
    return promises;
}, []).map(chain => chain.then(_ => console.log("Chain done"))));
promise.then(_ => console.log("All done"));

function buildItems() {
  const items = [];
  for (let n = 0; n < 200; ++n) {
    items[n] = n;
  }
  return items;
}

var outstanding = 0;
function foo(item) {
  ++outstanding;
  console.log("Starting " + item + " (" + outstanding + ")");
  return new Promise(resolve => {
    setTimeout(_ => {
      --outstanding;
      console.log("Resolving " + item + " (" + outstanding + ")");
      resolve(item);
    }, Math.random() * 500);
  });
}
      

.as-console-wrapper {
  max-height: 100% !important;
}
      

Run codeHide result


I have to point out that if you want to track the outcome of each one, you will have to change the above; it doesn't try to track the results (!). :-)

+2


source


If you don't care about the results, then quickly hack it:

Promise.eachLimit = async (funcs, limit) => {
  let rest = funcs.slice(limit);
  await Promise.all(funcs.slice(0, limit).map(async func => {
    await func();
    while (rest.length) {
      await rest.shift()();
    }
  }));
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  await Promise.eachLimit(funcs, 5);
})();
      

Run codeHide result


A key performance property runs the next available function as soon as any function ends.



Saving results

Keeping the results in order makes it a little less elegant, perhaps, but not too bad:

Promise.mapLimit = async (funcs, limit) => {
  let results = [];
  await Promise.all(funcs.slice(0, limit).map(async (func, i) => {
    results[i] = await func();
    for (; limit < funcs.length; limit++) {
      results[limit] = await funcs[limit]();
    }
  }));
  return results;
};

// Demo:

var wait = ms => new Promise(resolve => setTimeout(resolve, ms));

async function foo(s) {
  await wait(Math.random() * 2000);
  console.log(s);
  return s.toLowerCase();
}

(async () => {
  let funcs = "ABCDEFGHIJKLMNOPQRSTUVWXYZ".split("").map(s => () => foo(s));
  console.log((await Promise.mapLimit(funcs, 5)).join(""));
})();
      

Run codeHide result


+1


source







All Articles