Given a container with futures, how to accomplish everything without blocking?

So I'm trying to create a generic way to create a futures container and also execute all future.get () in a non-blocking way.

I expect the completion times for tasks to range from a few hundred milliseconds, typically up to two minutes. Some, however, may not end at all. In a typical run mode, there will be at least 10,000 tasks running.

I want results with a fast returnable task to return without supporting other longer tasks in the futures container.

Here's what I have so far only used dummy timing to simulate task completion delays (the design is largely due to the good posts made here, like this and this ):

#include <future>
#include <vector>
#include <iostream>
#include <random>
#include <chrono>
#include <ratio>
#include <thread>
#include <algorithm>

size_t rand_from_range(const size_t, const size_t);
int rand_sleep_range(const size_t, const size_t);
template<class CT> size_t get_async_all( CT& );

// Given a function and a collection,
//  return a vector of futures.
template<class Function, class CT>
auto async_all( Function f, CT coll )
    -> std::vector<decltype(std::async(f, *std::begin(coll)))>
{
  std::vector<decltype(std::async(f, *std::begin(coll)))> futures;
  futures.reserve(coll.size());
  for (auto& element : coll)
    futures.push_back(std::async(f, element));
  return futures;
}

// Given the beginning and end of a number
//  range, return a random number therein.
size_t rand_from_range( const size_t range_begin, 
                        const size_t range_end )
{
  std::uniform_int_distribution<size_t> 
    distr(range_begin, range_end);
  std::random_device dev;
  return distr(dev);
} 

// Given a shortest and longest duration, put the calling
//  thread to sleep for a random duration therein. 
// (in milliseconds)
int rand_sleep_range( const size_t shortest_time, 
                      const size_t longest_time )
{
  std::chrono::milliseconds 
    sleep_time(rand_from_range(shortest_time, longest_time));
  std::this_thread::sleep_for(sleep_time);
  return (int)sleep_time.count();
} 

// Given a container of futures, perform all
//  get()'s.
template<class CT>
size_t get_async_all( CT& async_coll )
{
  size_t get_ctr(0);
  const size_t future_cnt = async_coll.size();
  std::vector<size_t> completed;
  completed.reserve(future_cnt);

  while (true) {
    for (size_t ndx = 0; ndx < future_cnt; ++ndx) {
      // Check to see if this ndx' future has completed already.
      if (std::none_of(std::begin(completed), std::end(completed), 
            [=](size_t x) {
              return (x == ndx);
            }))
      { // No, this one hasn't completed 
        //  yet, attempt to process it.
        auto& f = async_coll[ndx];
        if (f.wait_for(std::chrono::milliseconds(10)) 
              == std::future_status::ready) 
        {
          f.get(); // The future work gets done here.
          ++get_ctr;
          completed.push_back(ndx);
          if (completed.size() == future_cnt) 
            break; // for()
        }
      }
    }
    if (completed.size() == future_cnt) 
      break; // while()
  }
  return get_ctr;
}

int main()
{
  // A dummy container of ints.
  std::vector<int> my_vec(100);
  for (auto& elem : my_vec)
    elem = rand_from_range(1, 100);

  // A dummy function lambda.
  auto my_func = [](int x) { 
    int x_ = x;
    int sleep_time = rand_sleep_range(100, 20000); // in ms.
    x *= 2;
    std::cout << " after sleeping " << sleep_time << "ms \t"
              << "f(" << x_ << ") = " << x << std::endl;
  };

  // Create and execute the container of futures.
  auto async_coll = async_all(my_func, my_vec);
  size_t count = get_async_all(async_coll);

  std::cout << std::endl << count << " items completed. \n";
}

      

So my questions are:

  • Are there any errors for the approach I am using?
  • Is there a better / more elegant approach for get_async_all () than the one I'm using? Or something else I do, for that matter.

Thanks for taking the time to review the code and give me constructive criticism or feedback.

+3


source to share


1 answer


There at least one got. You are calling std::async

without specifying a startup policy, which means that some or all of the tasks may be delayed. But in your test, to check if the job completed, you only check for std::future_status_ready

. If the job is pending you will always get std::future_status_deferred

, which means your test will never return true.

The simplest solution to this problem is to specify a startup policy std::launch::async

, but then you risk reassigning your system. The alternative is to modify your test to check pending tasks, but then the question arises what to do with them. If you call get

or wait

on them, you block an arbitrary amount of time.



Regarding your general approach, rather than blocking for 10ms to wait for each task to complete as you oppose them, you might think of waiting for 0ms, i.e. conduct a clean survey to see if the task is complete. This can reduce the latency between when the task ends and when you process it, but it can cause polling to be tried to the point where your overall system is slower.

A completely different approach might be to refuse to poll each task, and instead, each task writes the "I'm done" flag to a common data structure (like a std::deque

), and then periodically check that data structure to see if there is anything. -i.e. If so, process the completed tasks, remove them from the data structure, and then go back to sleep until it's time to poll again. If your tasks are performed push_back

in a data structure, you can naturally process them in the order in which they are populated. The downside to this project is that the shared data structure can be a performance bottleneck.

+3


source







All Articles