A counter wrapper that pre-preloads multiple items from the main enumerator

Suppose I have some IEnumerator<T>

that does fair handling inside a method MoveNext()

.

The code consumed by this enumerator is not just consumed as quickly as data is available, but sometimes waits (the specifics of which are not relevant to my question) to synchronize the time it takes to resume consumption. But when he makes the next call MoveNext()

, he needs the data as quickly as possible.

One way is to pre-use the entire stream in some sort of list or array structure for instant enumeration. However, this would be a waste of memory, since only one element is used at any one time, and it would be prohibitive in cases where all the data does not fit in memory.

So, there is something in common in .net that wraps an enumerator / enumerator in such a way that it asynchronously pre-initializes the underlying enumerator with multiple elements in advance and buffers the results so that it always has multiple elements available in its buffer and calling MoveNext will not force wait for yourself? Obviously, the items consumed, i.e. Iterations followed by MoveNext from the caller will be removed from the buffer.

NB Some of what I'm trying to do is also called Backpressure and in the Rx world is already implemented in RxJava and discussed in Rx.NET . Rx (observables that push data) can be thought of as the opposite approach of counters (enumerators allow data to be pulled). Backpressure is relatively straightforward in the pull-up approach, as my answer shows: just pause consumption. Harder to press, requiring additional feedback mechanism.

+3


source to share


2 answers


A more concise alternative to your custom enumerated class is this:

public static IEnumerable<T> Buffer<T>(this IEnumerable<T> source, int bufferSize)
{
    var queue = new BlockingCollection<T>(bufferSize);

    Task.Run(() => {
        foreach(var i in source) queue.Add(i);
        queue.CompleteAdding();
    });

    return queue.GetConsumingEnumerable();
}

      



This can be used like:

var slowEnumerable = GetMySlowEnumerable();
var buffered = slowEnumerable.Buffer(10); // Populates up to 10 items on a background thread

      

+2


source


There are various ways to implement this myself and I decided to use

  • separate dedicated thread for each counter that performs asynchronous pre-buffering
  • fixed number of elements for pre-buffer

This is perfect for my case (only a few, very long counters), but for example, creating a stream can be too heavy if you use many and many counters, and a fixed number of items can be too inflexible if you need something more. dynamic, possibly based on the actual content of the elements.

I have only tested its main function so far and some rough edges may remain. It can be used like this:



int bufferSize = 5;
IEnumerable<int> en = ...;
foreach (var item in new PreBufferingEnumerable<int>(en, bufferSize))
{
    ...

      

Here's the essence of the Enumerator:

class PreBufferingEnumerator<TItem> : IEnumerator<TItem>
{
    private readonly IEnumerator<TItem> _underlying;
    private readonly int _bufferSize;
    private readonly Queue<TItem> _buffer;
    private bool _done;
    private bool _disposed;

    public PreBufferingEnumerator(IEnumerator<TItem> underlying, int bufferSize)
    {
        _underlying = underlying;
        _bufferSize = bufferSize;
        _buffer = new Queue<TItem>();
        Thread preBufferingThread = new Thread(PreBufferer) { Name = "PreBufferingEnumerator.PreBufferer", IsBackground = true };
        preBufferingThread.Start();
    }

    private void PreBufferer()
    {
        while (true)
        {
            lock (_buffer)
            {
                while (_buffer.Count == _bufferSize && !_disposed)
                    Monitor.Wait(_buffer);
                if (_disposed)
                    return;
            }
            if (!_underlying.MoveNext())
            {
                lock (_buffer)
                    _done = true;
                return;
            }
            var current = _underlying.Current; // do outside lock, in case underlying enumerator does something inside get_Current()
            lock (_buffer)
            {
                _buffer.Enqueue(current);
                Monitor.Pulse(_buffer);
            }
        }
    }

    public bool MoveNext()
    {
        lock (_buffer)
        {
            while (_buffer.Count == 0 && !_done && !_disposed)
                Monitor.Wait(_buffer);
            if (_buffer.Count > 0)
            {
                Current = _buffer.Dequeue();
                Monitor.Pulse(_buffer); // so PreBufferer thread can fetch more
                return true;
            }
            return false; // _done || _disposed
        }
    }

    public TItem Current { get; private set; }

    public void Dispose()
    {
        lock (_buffer)
        {
            if (_disposed)
                return;
            _disposed = true;
            _buffer.Clear();
            Current = default(TItem);
            Monitor.PulseAll(_buffer);
        }
    }

      

0


source







All Articles