Classic consumer thread manufacturer

The classic problem of producers-producers . the producer sleeps when itemCount == BUFFER_SIZE

amd wakes up again when it goes down. But as soon as it itemCount

grows, the production stream dies down. how can he know that he is itemCount

down and needs wakeup

?

+2


source to share


3 answers


In pseudocode, the manufacturer looks like this:

void producer_thread()
{
    while(true)
        queue.push( produce() );
}

      

so consider the push queue method (I used pthreads here, but the same logic applies to other libraries)



void SynchronizedQueue::push(Item const &i)
{
    pthread_mutex_lock(&mutex);

    // queue is full, so wait for consumer
    while (queue.size() == BUFFER_SIZE)
        pthread_cond_wait(&condition, &mutex);

    // when we get here, the queue has space
    this->queue.push_back(i);

    // make sure we wake a sleeping consumer
    if (queue.size() == 1)
        pthread_cond_signal(&condition);

    pthread_mutex_unlock(&mutex);
}

      

and the pop method used by the consumer:

Item SynchronizedQueue::pop()
{
    pthread_mutex_lock(&mutex);

    // wait for something to do
    while (queue.size() == 0)
        pthread_cond_wait(&condition, &mutex);

    // if we get here, we have some work
    Item tmp = queue.front();

    // make sure we wake a sleeping producer
    if (queue.size() == BUFFER_SIZE)
        pthread_cond_signal(&condition)

    queue.pop_front();
    pthread_mutex_unlock(&mutex);
    return tmp;
}

      

+2


source


You need condition variables.

A typical use for a conditional variable is:

//lock the mutex first!
scoped_lock myLock(myMutex); 

//wait till a condition is met
myConditionalVariable.wait(myLock, CheckCondition);

//Execute this code only if the condition is met

      

where CheckCondition

is a function (or functor) that checks a condition ( as to when, for example, to wake up ). It is called by the function wait()

internally when it wakes up falsely, and if the condition hasn't been reached yet, the function wait()

sleeps again. Before falling asleep, wait()

allocates a mutex, atomically.


If your compiler supports the std::conditional

introduced C ++ 11, you can see this for details:

If your compiler doesn't support it and you are working with win32 streams see this:



And here is a complete example.

And if you are working with POSIX streams see this:


You can see my implementation conditional_variable

using win32 primitives:

Scroll down and review it first and then review the usage in the parallel queue implementation.

+1


source


He doesn't need to know - the OS will wake him up when the consumers signals. In the PC queue code, the producer will call wait () on some OS synchronization primitive. This call will not return until the consumer thread has freed up space and displayed a synchronous OS object (if your faulty OS does not support "spurious wakes"), at which point the wait creator thread is ready to run, and if there is a kernel available, run immediately - call wait () will return.

Traditionally, PC queues are built from a simple queue without streaming, a mutex to protect its indexes / pointers, and two semaphores - one of them initialized to 0 to count the items in the queue and one of them was initialized to [queue size] before counting empty spaces. The producer waits for "empty space" and when it receives a signal, locks the mutex, places the object, locks the mutex, and signals itemCount. The user waits for "itemCount" and when he receives a signal, locks the mutex, deletes the object, locks the mutex, and the "emptySpace" signals.

0


source







All Articles