How to implement counter locking with std :: atomic?

In my program, multiple threads (checkers) request web pages, and if those pages contain some data, other threads (consumers) process the data. I only need a predefined number of consumers to start processing (not all). I am trying to use std :: atomic counter and fetch_add to limit the number of consumers running. But even though the counter stays within the bounds, consumers receive the same counter values, and the actual processing users exceed the limit. The behavior depends on the duration of the treatment. The simplified code contains sleep_for instead of getting pages and processing page functions.

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>

class cConsumer
{
public:

    cConsumer::cConsumer(
        const size_t aNumber,
        std::atomic<bool> &aFire,
        std::atomic<size_t> &aCounter) :
        mNumber(aNumber),
        mFire(aFire),
        mCounter(aCounter){}

    void cConsumer::operator ()()
    {
        while (true)
        {
            while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);

            size_t vCounter = mCounter.fetch_add(1);
            if (vCounter < 5)
            {
                std::cout << "      FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
                std::this_thread::sleep_for(mWorkDuration);
            }
            if (vCounter == 5)
            {
                mFire.store(false);
                mCounter.store(0);
            }
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mWorkDuration;

    const size_t mNumber;

    std::atomic<bool> &mFire;
    std::atomic<size_t> &mCounter;
};

const std::chrono::milliseconds 
    cConsumer::mMillisecond(1),
    cConsumer::mWorkDuration(1300);

class cChecker
{
public:

    cChecker(
        const size_t aNumber,
        std::atomic<bool> &aFire) :
        mNumber(aNumber),
        mFire(aFire),
        mStep(1){ }

    void cChecker::operator ()()
    {
        while (true)
        {
            while (mFire.load()) std::this_thread::sleep_for(mMillisecond);

            std::cout << "checker " << mNumber << " step " << mStep << "\n";
            std::this_thread::sleep_for(mCheckDuration);
            if (mStep % 20 == 1) mFire.store(true);         
            mStep++;
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mCheckDuration;

    const size_t mNumber;

    size_t mStep;

    std::atomic<bool> &mFire;
};

const std::chrono::milliseconds 
    cChecker::mMillisecond(1),
    cChecker::mCheckDuration(500);

void main()
{
    std::atomic<bool> vFire(false);
    std::atomic<size_t> vCounter(0);

    std::thread vConsumerThreads[16];

    for (size_t i = 0; i < 16; i++)
    {
        std::thread vConsumerThread((cConsumer(i, vFire, vCounter)));
        vConsumerThreads[i] = std::move(vConsumerThread);       
    }

    std::chrono::milliseconds vNextCheckerDelay(239);

    std::thread vCheckerThreads[3];

    for (size_t i = 0; i < 3; i++)
    {
        std::thread vCheckerThread((cChecker(i, vFire)));
        vCheckerThreads[i] = std::move(vCheckerThread);
        std::this_thread::sleep_for(vNextCheckerDelay);
    }

    for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();

    for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();
}

      

Output example (partial)

...
checker 1 step 19
checker 0 step 20
checker 2 step 19
checker 1 step 20
checker 0 step 21
checker 2 step 20
checker 1 step 21
      FIRE! consumer 10, counter 0
      FIRE! consumer 13, counter 4
      FIRE! consumer 6, counter 1
      FIRE! consumer 0, counter 2
      FIRE! consumer 2, counter 3
checker 0 step 22
checker 2 step 21
      FIRE! consumer 5, counter 3
      FIRE! consumer 7, counter 4
      FIRE! consumer 4, counter 1
      FIRE! consumer 15, counter 2
      FIRE! consumer 8, counter 0
checker 1 step 22
      FIRE! consumer 9, counter 0
      FIRE! consumer 11, counter 1
      FIRE! consumer 3, counter 2
      FIRE! consumer 14, counter 3
      FIRE! consumer 1, counter 4
checker 0 step 23
checker 2 step 22
checker 1 step 23
checker 2 step 23
checker 0 step 24
checker 1 step 24

      

I found one solution that works, but not gracefully: wait for all consumers to try to work and realize the fire is off.

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>

class cConsumer
{
public:

    cConsumer::cConsumer(
        const size_t aNumber,
        const size_t aConsumerCount,
        std::atomic<bool> &aFire,
        std::atomic<size_t> &aCounter) :
        mNumber(aNumber),
        mConsumerCount(aConsumerCount),
        mFire(aFire),
        mCounter(aCounter){}

    void cConsumer::operator ()()
    {
        while (true)
        {
            while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);

            const size_t vCounter = mCounter.fetch_add(1);

            if (vCounter < 5)
            {
                std::cout << "      FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
                std::this_thread::sleep_for(mWorkDuration); //stub for process function
            }

            if (vCounter >= 5)
            {
                std::this_thread::sleep_for(mWorkDuration); //wait for other threads to increase counter
                std::this_thread::sleep_for(mWorkDuration); //double wait for long processing
                mFire.store(false);
            }

            if (vCounter == mConsumerCount)
            {               
                mCounter.store(0);
            }
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mWorkDuration;

    const size_t 
        mNumber,
        mConsumerCount;

    std::atomic<bool> &mFire;
    std::atomic<size_t> &mCounter;
};

const std::chrono::milliseconds 
    cConsumer::mMillisecond(1),
    cConsumer::mWorkDuration(1300);

class cChecker
{
public:

    cChecker(
        const size_t aNumber,
        std::atomic<bool> &aFire) :
        mNumber(aNumber),
        mFire(aFire),
        mStep(1){ }

    void cChecker::operator ()()
    {
        while (true)
        {
            while (mFire.load()) std::this_thread::sleep_for(mMillisecond);

            std::cout << "checker " << mNumber << " step " << mStep << "\n";
            std::this_thread::sleep_for(mCheckDuration);
            if (mStep % 20 == 1) mFire.store(true);         
            mStep++;
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mCheckDuration;

    const size_t mNumber;

    size_t mStep;

    std::atomic<bool> &mFire;
};

const std::chrono::milliseconds 
    cChecker::mMillisecond(1),
    cChecker::mCheckDuration(500);

void main()
{
    std::atomic<bool> vFire(false);
    std::atomic<size_t> vCouter(0);

    std::thread vConsumerThreads[16];

    for (size_t i = 0; i < 16; i++)
    {
        vConsumerThreads[i] = std::move(std::thread(cConsumer(i, 16, vFire, vCouter)));
    }

    std::chrono::milliseconds vNextCheckerDelay(239);

    std::thread vCheckerThreads[3];

    for (size_t i = 0; i < 3; i++)
    {
        vCheckerThreads[i] = std::move(std::thread(cChecker(i, vFire)));
        std::this_thread::sleep_for(vNextCheckerDelay);
    }

    for (size_t i = 0; i < 16; i++) vConsumerThreads[i].join();

    for (size_t i = 0; i < 3; i++) vCheckerThreads[i].join();

      

I think there is a better solution.

+3


source to share


2 answers


What's going on here?

With a little luck, once you've set fire to it, there can be a lot more worker than 5 passing along this line:

    while(!mFire.load()) std::this_thread::sleep_for(mMillisecond);

      

Suppose 10 workers are awakened and this counter is 0. Every 10 workers will do this:

    size_t vCounter = mCouter.fetch_add(1);

      

And each of the 10 workers now has a different counter between 1 and 11.5 will first execute the if statement:

        if(vCounter < 5)

      

Any thread with a higher count will continue. Among them is the 6th thread, which will be reset by fire and reset by counter:

        if(vCounter == 5)
        {
            mFire.store(false);
            mCouter.store(0);
            cout << "RESET!!!!!! by consume "<<mNumber << endl; // useful to understand
        }

      



All these sludge streams will continue to cycle, waiting for the next fire.

But now bad things can happen because you have workers still running and you have a bunch of checkers waiting to be re-started:

while(mFire.load()) std::this_thread::sleep_for(mMillisecond);
...   // now that fire is reset, they will go on

      

and some of them can reach the following line:

        if(mStep % 20 == 1) {
            mFire.store(true); 
            cout << "SET FIRE" << endl;   // to make the problem visual
        }

      

Since the atomic counter is 0, you will immediately have 5 new workers starting a new job in addition to those who are still working.

What can you do about it?

It is not entirely clear to me what you intend to do:

  • Do you want 5 workers to be active for every new fire? In this case, it's okay how you are. Then the total number of workers can exceed 5.
  • Do you want a maximum of 5 workers to be involved at any time? In this case, you should never reset the number of workers to 0, as you do, but you should decrement the counter for all threads that incremented it. Thus, conter will contain the number of threads that are currently in the fire handling section:

    while(true)
    {
        while(!mFire.load()) std::this_thread::sleep_for(mMillisecond);
    
        size_t vCounter = mCouter.fetch_add(1);   // FIRE PROCESSING: INCREMENT COUNTER
        if(vCounter < 5)
        {
            std::cout << "      FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
            std::this_thread::sleep_for(mWorkDuration);
            std::cout << "         finished consumer "<< mNumber<<endl;
        }
        if(vCounter == 5)
        {
            mFire.store(false);
            //mCouter.store(0);
            cout << "RESET!!!!!! by consumer "<<mNumber << endl; 
        }
        mCouter.fetch_sub(1);                    // END OF PROCESSING: DECREMENT COUNTER 
    
          

0


source


A possible solution is to use an auxiliary array for consumer-made checkboxes. When end consumer processing stores true for its made array cell. The extra control flow checks the executed array for all cells that are true and resets the program state.

#include <iostream>
#include <thread>
#include <atomic>
#include <chrono>

class cConsumer
{
public:

    cConsumer::cConsumer(
        const size_t aNumber,
        const size_t aFiresLimit,
        std::atomic<bool> &aFire,
        std::atomic<bool> &aDone,
        std::atomic<size_t> &aCounter) :
        mNumber(aNumber),
        mFiresLimit(aFiresLimit),
        mFire(aFire),
        mDone(aDone),
        mCounter(aCounter){}

    void cConsumer::operator ()()
    {
        while (true)
        {
            while (!mFire.load()) std::this_thread::sleep_for(mMillisecond);

            const size_t vCounter = mCounter.fetch_add(1);

            if (vCounter < mFiresLimit)
            {
                std::cout << "      FIRE! consumer " << mNumber << ", counter " << vCounter << "\n";
                std::this_thread::sleep_for(mWorkDuration); // instead real processing
            }   

            mDone.store(true);

            while (mDone.load()) std::this_thread::sleep_for(mMillisecond);
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mWorkDuration;

    const size_t 
        mNumber,
        mFiresLimit;

    std::atomic<bool> 
        &mFire,
        &mDone;

    std::atomic<size_t> &mCounter;
};

const std::chrono::milliseconds 
    cConsumer::mMillisecond(1),
    cConsumer::mWorkDuration(1300);

class cChecker
{
public:

    cChecker(
        const size_t aNumber,
        std::atomic<bool> &aFire) :
        mNumber(aNumber),
        mFire(aFire),
        mStep(1){ }

    void cChecker::operator ()()
    {
        while (true)
        {
            while (mFire.load()) std::this_thread::sleep_for(mMillisecond);

            std::cout << "checker " << mNumber << " step " << mStep << "\n";
            std::this_thread::sleep_for(mCheckDuration);
            if (mStep % 20 == 1) // dummy condition instead real checker function
            {
                mFire.store(true);
            }
            mStep++;
        }
    }

private:

    static const std::chrono::milliseconds 
        mMillisecond,
        mCheckDuration;

    const size_t mNumber;

    size_t mStep;

    std::atomic<bool> &mFire;
};

const std::chrono::milliseconds 
    cChecker::mMillisecond(1),
    cChecker::mCheckDuration(500);

class cController
{
public:

    cController(
        const size_t aConsumerCount,
        std::atomic<bool> &aFire,
        std::atomic<bool> * const aConsumersDone,
        std::atomic<size_t> &aCounter) :
        mConsumerCount(aConsumerCount),
        mFire(aFire),
        mConsumersDone(aConsumersDone),
        mCounter(aCounter){}

    void cController::operator ()()
    {
        while (true)
        {       
            while(!mFire.load()) std::this_thread::sleep_for(mMillisecond);

            bool vAllConsumersDone = false;

            while (!vAllConsumersDone)
            {
                size_t i = 0;
                while ((i < mConsumerCount) && (mConsumersDone[i].load())) i++;
                vAllConsumersDone = (i == mConsumerCount);
                std::this_thread::sleep_for(mMillisecond);
            }

            mFire.store(false);
            for (size_t i = 0; i < mConsumerCount; i++) mConsumersDone[i].store(false);
            mCounter.store(0);
        }
    }

private:

    const size_t mConsumerCount;

    static const std::chrono::milliseconds mMillisecond;

    std::atomic<bool> 
        &mFire,
        * const mConsumersDone;

    std::atomic<size_t> &mCounter;
};

const std::chrono::milliseconds cController::mMillisecond(1);

void main()
{
    static const size_t 
        vCheckerCount = 3,
        vConsumersCount = 16,
        vFiresLimit = 5;

    std::atomic<bool> vFire(false);

    std::atomic<bool> vConsumersDone[vConsumersCount];
    for (size_t i = 0; i < vConsumersCount; i++) vConsumersDone[i].store(false);

    std::atomic<size_t> vCounter(0);    

    std::thread vControllerThread(cController(vConsumersCount, vFire, vConsumersDone, vCounter));

    std::thread vConsumerThreads[vConsumersCount];

    for (size_t i = 0; i < vConsumersCount; i++)
    {
        vConsumerThreads[i] = std::move(std::thread(cConsumer(i, vFiresLimit, vFire, vConsumersDone[i], vCounter)));
    }

    std::chrono::milliseconds vNextCheckerDelay(239);

    std::thread vCheckerThreads[vCheckerCount];

    for (size_t i = 0; i < vCheckerCount; i++)
    {
        vCheckerThreads[i] = std::move(std::thread(cChecker(i, vFire)));
        std::this_thread::sleep_for(vNextCheckerDelay);
    }

    for (size_t i = 0; i < vConsumersCount; i++) vConsumerThreads[i].join();

    for (size_t i = 0; i < vCheckerCount; i++) vCheckerThreads[i].join();

    vControllerThread.join();
}

      



Output example (partial):

...
checker 2 step 19
checker 1 step 19
checker 0 step 19
checker 2 step 20
checker 0 step 20
checker 1 step 20
checker 2 step 21
checker 0 step 21
checker 1 step 21
      FIRE! consumer 11, counter 0
      FIRE! consumer 3, counter 2
      FIRE! consumer 4, counter 3
      FIRE! consumer 10, counter 4
      FIRE! consumer 14, counter 1
checker 0 step 22
checker 2 step 22
checker 1 step 22
checker 2 step 23
checker 0 step 23
checker 1 step 23
checker 2 step 24
checker 0 step 24

      

0


source







All Articles