Determining the Safety of Deleting a Parallel Queue

I am programming a loose-leaf single-user single-processor growing queue in C ++ for a real-time system. The internal queue works, but it must be resilient. The producer thread is in real time, so any operation must be deterministic (so there are no waits, locks, memory allocation), and the consumer thread is not.

So the idea is that the consumer thread will sometimes increase the queue size if needed. The implementation of the queue is such that the consumer end cannot grow. So the actual queue is indirectly wrapped inside the object that dispatches the calls, and the actual growth is implemented by replacing the internal queue reference with a new one, while keeping the old branch if the producer uses it.

The problem, however, is that I cannot figure out how to prove that the producer thread stops using the old queue, and so it can be safely removed without resorting to locks. Here is a pseudo-representation of the code:

template<typename T>
class queue
{
public:

    queue()
        : old(nullptr)
    {
        current.store(nullptr);
        grow();
    }

    bool produce(const T & data)
    {
        qimpl * q = current.load();
        return q->produce(data);
    }

    bool consume(T & data)
    {
        // the queue has grown? if so, a new and an old queue exists. consume the old firstly.
        if (old)
        {
            // here is the problem. we never really know when the producer thread stops using 
            // the old queue and starts using the new. it could be concurrently halfway-through inserting items
            // now, while the following consume call fails meanwhile.
            // thus, it is not safe yet to delete the old queue.
            // we know however, that it will take at most one call to produce() after we called grow()
            // before the producer thread starts using the new queue.

            if (old->consume(data))
            {
                return true;
            }
            else
            {
                delete old;
                old = nullptr;
            }
        }

        if (current.load()->consume(data))
        {
            return true;
        }
        return false;
    }
    // consumer only as well
    void grow()
    {
        old = current.load();
        current.store(new qimlp());
    }
private:

    class qimpl
    {
    public:
        bool produce(const T & data);
        bool consume(const T & data);
    };

    std::atomic<qimpl *> current;
    qimpl * old;
};

      

Please note that ATOMIC_POINTER_LOCK_FREE == 2 is a prerequisite for compiling the code. The only provable condition I see is that when grow () is called, the next production () call will use the new internal queue. Thus, if the number of atoms inside the product increases each time, then its safe to delete the old queue in N + 1, where N is the count during the call to grow (). The problem, however, is that you then need to atomize the swap of the new pointer and store the count, which seems impossible.

Any ideas are appreciated, and for reference, this is how the system would work:

queue<int> q;

void consumer()
{
    while (true)
    {
        int data;

        if (q.consume(data))
        {
            // ..
        }
    }

}

void producer()
{
    while (true)
    {
        q.produce(std::rand());
    }
}

int main()
{
    std::thread p(producer); std::thread c(consumer);
    p.detach(); c.detach();
    std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}

      

EDIT: Ok, the problem is solved now. It became clear to me that the old queue becomes obsolete when the item is moved to the new queue. So the snippet now looks like this:

bool pop(T & data)
{
    if (old)
    {
        if (old->consume(data))
        {
            return true;
        }

    }
    // note that if the old queue is empty, and the new has an enqueued element, we can conclusively
    // prove that it is safe to delete the old queue since it is (a) empty and (b) the thread state 
    // for the producer is updated such that it uses all the new entities and will never use the old again.
    // if we successfully dequeue an element, we can delete the old (if it exists).
    if (current.load()->consume(data))
    {
        if (old)
        {
            delete old;
            old = nullptr;
        }
        return true;
    }
    return false;
}

      

+3


source to share


1 answer


I don't fully understand the usage grow()

in your algorithm, but it looks like you need some kind of Read-Copy-Update (RCU) mechanism to safely delete the queue when not required.



This article describes the various Linux-related options for this mechanism, but you can use google RCUs that are suitable for other platforms.

0


source







All Articles