How to use the same thread pool

I found a good boost based thread pool implementation which is an improvement over this and this . it is very easy to understand and test. It looks like this:

#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
// the actual thread pool
struct ThreadPool {
   ThreadPool(std::size_t);
   template<class F>
   void enqueue(F f);
   ~ThreadPool();    

   // the io_service we are wrapping
   boost::asio::io_service io_service;
   // dont let io_service stop
   boost::shared_ptr<boost::asio::io_service::work> work;
   //the threads
   boost::thread_group threads;
};

// the constructor just launches some amount of workers
ThreadPool::ThreadPool(size_t nThreads)
   :io_service()
   ,work(new boost::asio::io_service::work(io_service))
{
   for ( std::size_t i = 0; i < nThreads; ++i ) {
    threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
   }
}

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f) {
   io_service.post(f);
}

// the destructor joins all threads
ThreadPool::~ThreadPool() {
work.reset();
io_service.run();
}

//tester: 
void f(int i)
{
    std::cout << "hello " << i << std::endl;
    boost::this_thread::sleep(boost::posix_time::milliseconds(300));
    std::cout << "world " << i << std::endl;
}

//it can be tested via:

int main() {
   // create a thread pool of 4 worker threads
   ThreadPool pool(4);

   // queue a bunch of "work items"
   for( int i = 0; i < 8; ++i ) {
      std::cout << "task " << i << " created" << std::endl;
      pool.enqueue(boost::bind(&f,i));
   }
}

      

g++ ThreadPool-4.cpp -lboost_system -lboost_thread

Now the question is: I need to know how I can modify the implementation to be able to use this thread pool only if the first set of my work is completely full of the thread pool, I need to provide a second set, and so on. I tried playing with .run()

and .reset()

(found in destructor) between batch jobs, but no luck:

//adding methods to the tread pool :
//reset the asio work and thread
void ThreadPool::reset(size_t nThreads){

work.reset(new boost::asio::io_service::work(io_service));
   for ( std::size_t i = 0; i < nThreads; ++i ) {
    threads.create_thread(boost::bind(&boost::asio::io_service::run, &io_service));
   }
    std::cout << "group size : " << threads.size() << std::endl;
}

//join, and even , interrupt
void ThreadPool::joinAll(){   
  threads.join_all();
  threads.interrupt_all();
}

//tester
int main() {
   // create a thread pool of 4 worker threads
   ThreadPool pool(4);

   // queue a bunch of "work items"
   for( int i = 0; i < 20; ++i ) {
      std::cout << "task " << i << " created" << std::endl;
          pool.enqueue(boost::bind(&f,i));
   }
   //here i play with the asio work , io_service and and the thread group
   pool.work.reset();
   pool.io_service.run();
   std::cout << "after run" << std::endl; 
   pool.joinAll();
   std::cout << "after join all" << std::endl; 
   pool.reset(4);
   std::cout << "new thread group size: " << pool.threads.size() << std::endl;///btw: new threa group size is 8. I expected 4! 
    // second batch... never completes
   for( int i = 20; i < 30; ++i ) {
          pool.enqueue(boost::bind(&f,i));
   }
}

      

The second batch has not been completed. I would appreciate it if you can help me fix this. thank

UPDATE- Solution:

based on Nik's solution , I developed a solution using a condition variable. Just add the following code to your original class:

// add new work item to the pool
template<class F>
void ThreadPool::enqueue(F f) {
    {
        boost::unique_lock<boost::mutex> lock(mutex_);
        nTasks ++;
    }
    //forwarding the job to wrapper()
    void (ThreadPool::*ff)(boost::tuple<F>) = &ThreadPool::wrapper<F>;
    io_service.post(boost::bind(ff, this, boost::make_tuple(f))); //using a tuple seems to be the only practical way. it is mentioned in boost examples.
}
//run+notfiy
template<class F>
void ThreadPool::wrapper(boost::tuple<F> f) {
    boost::get<0>(f)();//this is the task (function and its argument) that has to be executed by a thread
    {
        boost::unique_lock<boost::mutex> lock(mutex_);
        nTasks --;
        cond.notify_one();
    }
}

void ThreadPool::wait(){
    boost::unique_lock<boost::mutex> lock(mutex_);
    while(nTasks){
        cond.wait(lock);
    }
}

      

Now you can call the method wait()

between batches of work. one problem: Even after the last batch I have to call pool.wait()

because the thread pool scope will run out after that and call the thread pool destructor. During the destruction, some of the tasks are completed, and it is time to summon .notify()

. Since Threadpool::mutex

it is invalidated during destruction, exceptions occur during blocking. your offer will be highly appreciated.

+1


source to share


2 answers


A condition variable can be used to achieve the desired result.

Implement the function responsible for the call to the queue for executing tasks and waiting for the condition variable. The condition variable is notified when all tasks assigned to the pool have been completed.

Each thread checks if the jobs are completed or not. After all tasks are completed, the condition variable will be notified.

//An example of what you could try, this just an hint for what could be explored.

     void jobScheduler()
    {
      int jobs = numberOfJobs; //this could vary and can be made shared memory

       // queue a bunch of "work items"
       for( int i = 0; i < jobs; ++i ) 
       {
          std::cout << "task " << i << " created" << std::endl;
          pool.enqueue(boost::bind(&f,i));
       }
       //wait on a condition variable
      boost::mutex::scoped_lock lock(the_mutex);
      conditionVariable.wait(lock); //Have this varibale notified from any thread which realizes that all jobs are complete.
     }

      


Solution 2



I have a new working solution with some assumption about the syntax of the returned functions, but this can be changed as per requirement.

Continuing on the lines above, I use a condition variable to manage my tasks, but with a difference.

  • Create a job queue.
  • A manager waiting in line for a new JOBS.
  • After receiving the job, a notification is sent to the wait manager about the same.
  • The worker maintains a handle to the manager. When all the assigned tasks are complete, Mandr is informed.
  • The manager, when receiving a call to complete, stops waiting for new JOBS to appear in the queue and exits.

#include <iostream>
#include <queue>
#include <boost/thread/thread.hpp>
#include <boost/asio.hpp>
#include <boost/tuple/tuple.hpp> 
#include <boost/tuple/tuple_io.hpp> 
#include <boost/function.hpp> 

///JOB Queue hold all jobs required to be executed
template<typename Job>
class JobQueue
{
  private:

    std::queue<Job> _queue;
    mutable boost::mutex _mutex;
    boost::condition_variable _conditionVariable;

  public:
    void push(Job const& job)
    {
      boost::mutex::scoped_lock lock(_mutex);
      _queue.push(job);
      lock.unlock();
      _conditionVariable.notify_one();
    }

    bool empty() const
    {
      boost::mutex::scoped_lock lock(_mutex);
      return _queue.empty();
    }

    bool tryPop(Job& poppedValue)
    {
      boost::mutex::scoped_lock lock(_mutex);
      if(_queue.empty())
      {
        return false;
      }

      poppedValue = _queue.front();
      _queue.pop();
      return true;
    }

    void waitAndPop(Job& poppedValue)
    {
      boost::mutex::scoped_lock lock(_mutex);
      while(_queue.empty())
      {
        _conditionVariable.wait(lock);
      }

      poppedValue = _queue.front();
      _queue.pop();
    }

};

///Thread pool for posting jobs to io service
class ThreadPool
{
  public :
    ThreadPool( int noOfThreads = 1) ;
    ~ThreadPool() ;

    template< class func >
      void post( func f ) ;

    boost::asio::io_service &getIoService() ;

  private :
    boost::asio::io_service _ioService;
    boost::asio::io_service::work _work ;
    boost::thread_group _threads;
};

  inline ThreadPool::ThreadPool( int noOfThreads )
: _work( _ioService )
{
  for(int i = 0; i < noOfThreads ; ++i) // 4
    _threads.create_thread(boost::bind(&boost::asio::io_service::run, &_ioService));
}

inline ThreadPool::~ThreadPool()
{
  _ioService.stop() ;
  _threads.join_all() ;
}

inline boost::asio::io_service &ThreadPool::getIoService()
{
  return _ioService ;
}

  template< class func >
void ThreadPool::post( func f )
{
  _ioService.post( f ) ;
}


template<typename T>
class Manager;

///Worker doing some work.
template<typename T>
class Worker{

    T _data;
    int _taskList;
    boost::mutex _mutex;
    Manager<T>* _hndl;

  public:

    Worker(T data, int task, Manager<T>* hndle):
    _data(data),
    _taskList(task),
    _hndl(hndle)
    {
    }

    bool job()
    {
      boost::mutex::scoped_lock lock(_mutex);
      std::cout<<"...Men at work..."<<++_data<<std::endl;
      --_taskList;
      if(taskDone())
       _hndl->end();
    } 

    bool taskDone()
    {
      std::cout<<"Tasks  "<<_taskList<<std::endl<<std::endl;
      if(_taskList == 0)
      {
        std::cout<<"Tasks done "<<std::endl;
        return true;
      }
      else false;
    }

};

///Job handler waits for new jobs and
///execute them as when a new job is received using Thread Pool.
//Once all jobs are done hndler exits.
template<typename T>
class Manager{

 public:

   typedef boost::function< bool (Worker<T>*)> Func;

   Manager(int threadCount):
   _threadCount(threadCount),
   _isWorkCompleted(false)
   {
     _pool = new ThreadPool(_threadCount);

     boost::thread jobRunner(&Manager::execute, this);
   }

   void add(Func f, Worker<T>* instance)
   {
     Job job(instance, f);
     _jobQueue.push(job);
   }

   void end()
   {
     boost::mutex::scoped_lock lock(_mutex);
     _isWorkCompleted = true;
     //send a dummy job
     add( NULL, NULL);
   }

   void workComplete()
   {
     std::cout<<"Job well done."<<std::endl;
   }

   bool isWorkDone()
   {
     boost::mutex::scoped_lock lock(_mutex);
     if(_isWorkCompleted)
       return true;
     return false;
   }

   void execute()
   {
      Job job;

     while(!isWorkDone())
     {
       _jobQueue.waitAndPop(job);

        Func f  = boost::get<1>(job);
        Worker<T>* ptr = boost::get<0>(job);

        if(f)
        {
          _pool->post(boost::bind(f, ptr));
        }
        else
          break;
     }

     std::cout<<"Complete"<<std::endl;
   }


 private:

  ThreadPool *_pool;
  int _threadCount;
  typedef boost::tuple<Worker<T>*, Func > Job;
  JobQueue<Job> _jobQueue;
  bool _isWorkCompleted;
  boost::mutex _mutex;
};

typedef boost::function< bool (Worker<int>*)> IntFunc;
typedef boost::function< bool (Worker<char>*)> CharFunc;


int main()
{
  boost::asio::io_service ioService;

  Manager<int> jobHndl(2);
  Worker<int> wrk1(0,4, &jobHndl);

  IntFunc f= &Worker<int>::job;

  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);
  jobHndl.add(f, &wrk1);

  Manager<char> jobHndl2(2);
  Worker<char> wrk2(0,'a', &jobHndl2);

  CharFunc f2= &Worker<char>::job;

  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);
  jobHndl2.add(f2, &wrk2);

  ioService.run();
  while(1){}
  return 0;
}

      

+1


source


the third solution is the best (the simplest IMHO), one of the fathers of asio;

You need to understand that you will remain blocked in the "Threads.join_all ()" statement while there is no thread yet. Then you can call again with another job.

Maybe an alternative is to use the taskqueue "A task queue in which a pool of threads to run parallel tasks", you fill the work on the queue with your jobs ensures that "x" tasks will run in parallel. The example is easy to understand.



Perhaps you need to add this member function to the TaskQueue class to solve the pool issue: pool.wait ():

void WaitForEmpty(){
    while( NumPendingTasks() || threads_.size() ){
      boost::wait_for_any(futures_.begin(), futures_.end());
    }
}

      

Enjoy!

+1


source







All Articles