Parallel. For the next free theme / worker

I have the following parallelized code. I'm not sure how to set the workerIndex variable:

// Initializing Worker takes time & must be done before the actual work
Worker[] w = new Worker[3]; // I would like to limit the parallelism to 3
for (int i = 0; i < 3; i++)
  w[i] = new Worker();
...
element[] elements = GetArrayOfElements(); // elements.Length > 3
ParallelOptions options = new ParallelOption();
options.MaxDegreeOfParallelism = 3;
Parallel.For(0, elements.Length, options, i =>
{
  element e = elements[i];
  w[workerIndex].Work(e); // how to set "workerIndex"?
});

      

Is there some mechanism that says id worker thread is free further?

+3


source to share


2 answers


How about you just new Worker()

in a loop Parallel.For

and add them in w

(you will need to change w to a parallel list).

Perhaps (if it's not too complex) move the content of the method .Work(e)

into the body of the loop, eliminating the need for a class Worker

.



Edit:

If you change the array Worker

to IEnumerable

(i.e. a List<Worker>

) you can use .AsParallel()

to make it parallel. Then you can use . ForAll (worker -> worker.Work ()) for parallel work. This will require you to pass element

to the worker via the constructor.

0


source


It sounds like the object pool pattern works best for you. You can write your code like this:

const int Limit = 3;
using (var pool = new QueueObjectPool<Worker>(a => new Worker(), Limit)) {
    element[] elements = GetArrayOfElements();
    var options = new ParallelOptions { MaxDegreeOfParallelism = Limit };

    Parallel.For(0, elements.Length, options, i => {
        element e = elements[i];
        Worker worker = null;
        try {
            worker = pool.Acquire();
            worker.Work(e);
        } finally {
            pool.Release(worker);
        }
    });
}

      

During startup, each element will wait for an available worker, and only three workers will be initialized at the beginning of the form. Here's a simplified implementation of a pool of basic queue objects:



public sealed class QueueObjectPool<TObject> : IDisposable {
        private readonly Queue<TObject> _poolQueue;
        private readonly Func<QueueObjectPool<TObject>, TObject> _factory;
        private readonly int _capacity;
        private readonly SemaphoreSlim _throttler;
        public QueueObjectPool(Func<QueueObjectPool<TObject>, TObject> factory, int capacity) {
            _factory = factory;
            _capacity = capacity;
            _throttler = new SemaphoreSlim(initialCount: capacity, maxCount: capacity);
            _poolQueue = CreatePoolQueue();
        }
        public TObject Acquire() {
            _throttler.Wait();

            lock (_poolQueue) {
                return _poolQueue.Dequeue();
            }
        }
        public void Release(TObject poolObject) {
            lock (_poolQueue) {
                _poolQueue.Enqueue(poolObject);
            }

            _throttler.Release();
        }
        private Queue<TObject> CreatePoolQueue() {
            var queue = new Queue<TObject>(_capacity);
            int itemsLeft = _capacity;

            while (itemsLeft > 0) {
                TObject queueObject = _factory(this);
                queue.Enqueue(queueObject);
                itemsLeft -= 1;
            }

            return queue;
        }
        public void Dispose() {
            throw new NotImplementedException();
        }
    }

      

This code is for demonstration purposes. In real work it is better to use async / await logic which is easily achieved with SemaphoreSlim.WaitAsync

and you can replace Parallel.For with a simple loop.

0


source







All Articles