Moving from streaming pipelining to parallelism tasks? (C ++)

I am considering how to migrate some existing C ++ code from thread based parallelism tasks, and whether this migration is needed. Here's my script:

Suppose I have some kind of function to execute on an event. Let's say I have a camera and every time a frame comes in I want to do heavy processing and save the results. Some of the processing is sequential, so if I just process each frame sequentially in one thread, I don't get full CPU usage. Let's say frames arrive every 33ms and the processing latency for a frame is close to 100ms.

So, in my current implementation, I create the word "3 threads" that process frames and assign each new frame to one of these worker threads in a round robin fashion. Thus, thread T0 can handle frames F0, F3, F6, etc. Now I get full CPU utilization and I don't need to drop frames to maintain realtime rates.

Since the processing requires various large time resources, I can allocate them to each worker thread. Therefore, they do not need to be reallocated for every frame. This resource-on-threads strategy works well for granularity: if they were allocated for every frame it would take too long, but with a lot of worker threads we would run out of resources.

I see no way to replace this task-based parallelism parallelism using the C ++ 11 Standard Library or Microsoft PPL. If there is a pattern for this to sketch out below, I'd be very happy to know it.

The question is where to store the state - allocated temporary resources (such as GPU memory) that can be reused for subsequent frames, but should not conflict with resources for the current frame being processed.

Is it generally desirable to port to task-based parallelism in this view?

+3


source to share


1 answer


I understood that. Here's an example solution:



#include <iostream>
#include <ppltasks.h>
#include <thread>
#include <vector>

using PipelineState = int;
using PipelineStateArg = std::shared_ptr<PipelineState>;
using FrameState = int;
struct Pipeline
{
    PipelineStateArg state;
    concurrency::task<void> task;
};
std::vector<Pipeline> pipelines;

void proc(const FrameState& fs, PipelineState& ps)
{ 
    std::cout << "Process frame " << fs << " in pipeline " << ps << std::endl; 
}

void on_frame(int index)
{
    FrameState frame = index;
    if (index < 2)
    {
        // Start a new pipeline
        auto state = std::make_shared<PipelineState>(index);
        pipelines.push_back({state, concurrency::create_task([=]() 
        { 
            proc(frame, *state); 
        })});
    }
    else
    {
        // Use an existing pipeline
        auto& pipeline = pipelines[index & 1];
        auto state = pipeline.state;
        pipeline.task = pipeline.task.then([=]() 
        { 
            proc(frame, *state); 
        });
    }
}

void main()
{
    for (int i = 0; i < 100; ++i)
    {
        on_frame(i);
        std::this_thread::sleep_for(std::chrono::milliseconds(33));
    }
    for (auto& pipeline : pipelines)
        pipeline.tail.wait();
}

      

0


source







All Articles