Calling IEnumerable Elements in Parallel
I have a method IEnumerable<IEnumerable<T>>
called Batch
which works like
var list = new List<int>() { 1, 2, 4, 8, 10, -4, 3 };
var batches = list.Batch(2);
foreach(var batch in batches)
Console.WriteLine(string.Join(",", batch));
->
1,2 4,8 10,-4 3
I have a problem that I have to optimize something like
foreach(var batch in batches)
ExecuteBatch(batch);
by
Task[] tasks = batches.Select(batch => Task.Factory.StartNew(() => ExecuteBatch(batch))).ToArray();
Task.WaitAll(tasks);
or
Action[] executions = batches.Select(batch => new Action(() => ExecuteBatch(batch))).ToArray();
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.Invoke(options, executions);
(since ExecuteBatch
is a long running operation using IO)
then I notice that each one Batch
gets screwed, only 1 element, which default(int)
. Any idea what's going on or how to fix it?
Package:
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
for(var mover = source.GetEnumerator(); ;)
{
if(!mover.MoveNext())
yield break;
yield return LimitMoves(mover, size);
}
}
private static IEnumerable<T> LimitMoves<T>(IEnumerator<T> mover, int limit)
{
do yield return mover.Current;
while(--limit > 0 && mover.MoveNext());
}
source to share
As noted in the comments, your actual problem is your implementation Batch
.
This code:
for(var mover = source.GetEnumerator(); ;)
{
if(!mover.MoveNext())
yield break;
yield return LimitMoves(mover, size);
}
When materializes Batch
, this code will continually call MoveNext()
until the enumerated is exhausted. LimitMoves()
uses the same iterator and is lazy called. Since it Batch
exhausts the enumerated value, the LimitMoves()
element will never emit. (In fact, it will only emit default(T)
, since it always returns mover.Current
, which will be default(T)
after the enumeration is completed).
Here's an implementation Batch
that will work when materialized (and therefore when in parallel).
public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
var mover = source.GetEnumerator();
var currentSet = new List<T>();
while (mover.MoveNext())
{
currentSet.Add(mover.Current);
if (currentSet.Count >= size)
{
yield return currentSet;
currentSet = new List<T>();
}
}
if (currentSet.Count > 0)
yield return currentSet;
}
Alternatively, you can use MoreLINQ - which comes with the implementation Batch
. You can see their implementation here
source to share