Calling IEnumerable Elements in Parallel

I have a method IEnumerable<IEnumerable<T>>

called Batch

which works like

var list = new List<int>() { 1, 2, 4, 8, 10, -4, 3 }; 
var batches = list.Batch(2); 
foreach(var batch in batches)
    Console.WriteLine(string.Join(",", batch));

      

->

1,2
4,8
10,-4
3

      

I have a problem that I have to optimize something like

foreach(var batch in batches)
    ExecuteBatch(batch);

      

by

Task[] tasks = batches.Select(batch => Task.Factory.StartNew(() => ExecuteBatch(batch))).ToArray();
Task.WaitAll(tasks);

      

or

Action[] executions = batches.Select(batch => new Action(() => ExecuteBatch(batch))).ToArray();
var options = new ParallelOptions { MaxDegreeOfParallelism = 4 };
Parallel.Invoke(options, executions);

      

(since ExecuteBatch

is a long running operation using IO)

then I notice that each one Batch

gets screwed, only 1 element, which default(int)

. Any idea what's going on or how to fix it?

Package:

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    for(var mover = source.GetEnumerator(); ;)
    {
        if(!mover.MoveNext())
            yield break;
        yield return LimitMoves(mover, size);
    }
}
private static IEnumerable<T> LimitMoves<T>(IEnumerator<T> mover, int limit)
{
    do yield return mover.Current;
    while(--limit > 0 && mover.MoveNext());
}

      

+3


source to share


1 answer


As noted in the comments, your actual problem is your implementation Batch

.

This code:

for(var mover = source.GetEnumerator(); ;)
{
    if(!mover.MoveNext())
        yield break;
    yield return LimitMoves(mover, size);
}

      

When materializes Batch

, this code will continually call MoveNext()

until the enumerated is exhausted. LimitMoves()

uses the same iterator and is lazy called. Since it Batch

exhausts the enumerated value, the LimitMoves()

element will never emit. (In fact, it will only emit default(T)

, since it always returns mover.Current

, which will be default(T)

after the enumeration is completed).



Here's an implementation Batch

that will work when materialized (and therefore when in parallel).

public static IEnumerable<IEnumerable<T>> Batch<T>(this IEnumerable<T> source, int size)
{
    var mover = source.GetEnumerator();
    var currentSet = new List<T>();
    while (mover.MoveNext())
    {
        currentSet.Add(mover.Current);
        if (currentSet.Count >= size)
        {   
            yield return currentSet;
            currentSet = new List<T>();
        }
    }
    if (currentSet.Count > 0)
        yield return currentSet;
}

      

Alternatively, you can use MoreLINQ - which comes with the implementation Batch

. You can see their implementation here

+2


source







All Articles