OpenMP Single Producer Multiple Consumer

I'm trying to achieve something contrived with OpenMP.

I have a multi-core system with N

available processors. I want a vector of length objects to k*P

be filled with packets in P

one stream (by reading a file), i.e. one thread reads this file and writes to vecObj[0 to P-1]

, then vecObj[p to 2P-1]

, etc. To keep things simple, this vector is pre-modified (i.e. insertion using operator =

, no pushbacks, constant length as far as we know).

After the batch is written to the vector, I want the remaining streams to N-1

work with the available data. Since each object can take a different time to process, it would be good to have dynamic scheduling for the rest of the threads. The following snippet works great when all streams are working with data.

#pragma omp parallel for schedule(dynamic, per_thread)
    for(size_t i = 0; i < dataLength(); ++i) {
        threadWorkOnElement(vecObj, i);
    }

      

Now, in my opinion, the main problem I faced while developing the solution is the question of how can I dynamically schedule streams N-1

over the range of available data while another stream just continues reading and filling the vector with data?

My guess is that the problem of writing new data and exchanging messages with the rest of the threads can be achieved with std atomic.

I think what I am trying to achieve corresponds to the following pseudocode

std::atomic<size_t> freshDataEnd;
size_t dataWorkStart = 0;
size_t dataWorkEnd;
#pragma omp parallel
{
    #pragma omp task
    {
        //increment freshDataEnd atomically upon reading every P objects
        //return when end of file is reached
        readData(vecObj, freshDataEnd);
    }
    #pragma omp task
    {
        omp_set_num_threads(N-1);           
        while(freshDataEnd <= MAX_VEC_LEN) {
            if (dataWorkStart < freshDataEnd) {
                dataWorkEnd = freshDataEnd;
                #pragma omp parallel for schedule(dynamic, per_thread)
                for(size_t i = dataWorkStart; i < dataWorkEnd; ++i) {
                    threadWorkOnElement(vecObj, i);
                }
                dataWorkStart = dataWorkEnd;
            }
        }
    }
}

      

Is this the correct approach to achieve what I am trying to do? How can I handle this type of nested parallelism? Not so important: I would rather stick to openmp directives and not use std atomics, is that possible? How?

+3


source to share





All Articles