Parallel. For the next free theme / worker
I have the following parallelized code. I'm not sure how to set the workerIndex variable:
// Initializing Worker takes time & must be done before the actual work
Worker[] w = new Worker[3]; // I would like to limit the parallelism to 3
for (int i = 0; i < 3; i++)
w[i] = new Worker();
...
element[] elements = GetArrayOfElements(); // elements.Length > 3
ParallelOptions options = new ParallelOption();
options.MaxDegreeOfParallelism = 3;
Parallel.For(0, elements.Length, options, i =>
{
element e = elements[i];
w[workerIndex].Work(e); // how to set "workerIndex"?
});
Is there some mechanism that says id worker thread is free further?
source to share
How about you just new Worker()
in a loop Parallel.For
and add them in w
(you will need to change w to a parallel list).
Perhaps (if it's not too complex) move the content of the method .Work(e)
into the body of the loop, eliminating the need for a class Worker
.
Edit:
If you change the array Worker
to IEnumerable
(i.e. a List<Worker>
) you can use .AsParallel()
to make it parallel. Then you can use . ForAll (worker -> worker.Work ()) for parallel work. This will require you to pass element
to the worker via the constructor.
source to share
It sounds like the object pool pattern works best for you. You can write your code like this:
const int Limit = 3;
using (var pool = new QueueObjectPool<Worker>(a => new Worker(), Limit)) {
element[] elements = GetArrayOfElements();
var options = new ParallelOptions { MaxDegreeOfParallelism = Limit };
Parallel.For(0, elements.Length, options, i => {
element e = elements[i];
Worker worker = null;
try {
worker = pool.Acquire();
worker.Work(e);
} finally {
pool.Release(worker);
}
});
}
During startup, each element will wait for an available worker, and only three workers will be initialized at the beginning of the form. Here's a simplified implementation of a pool of basic queue objects:
public sealed class QueueObjectPool<TObject> : IDisposable {
private readonly Queue<TObject> _poolQueue;
private readonly Func<QueueObjectPool<TObject>, TObject> _factory;
private readonly int _capacity;
private readonly SemaphoreSlim _throttler;
public QueueObjectPool(Func<QueueObjectPool<TObject>, TObject> factory, int capacity) {
_factory = factory;
_capacity = capacity;
_throttler = new SemaphoreSlim(initialCount: capacity, maxCount: capacity);
_poolQueue = CreatePoolQueue();
}
public TObject Acquire() {
_throttler.Wait();
lock (_poolQueue) {
return _poolQueue.Dequeue();
}
}
public void Release(TObject poolObject) {
lock (_poolQueue) {
_poolQueue.Enqueue(poolObject);
}
_throttler.Release();
}
private Queue<TObject> CreatePoolQueue() {
var queue = new Queue<TObject>(_capacity);
int itemsLeft = _capacity;
while (itemsLeft > 0) {
TObject queueObject = _factory(this);
queue.Enqueue(queueObject);
itemsLeft -= 1;
}
return queue;
}
public void Dispose() {
throw new NotImplementedException();
}
}
This code is for demonstration purposes. In real work it is better to use async / await logic which is easily achieved with SemaphoreSlim.WaitAsync
and you can replace Parallel.For with a simple loop.
source to share