What is a good way to implement an object pool?

I have a third party class, let's call it Analyser

. This class is really good at parsing, but costs (in seconds) to build and does not support multithreading.

My application needs to serve call related requests Analyser

. These requests will run concurrently.

I think I need to create a generic class, something like

public class Pool<T>
{
    public Pool(Func<T> instantiator, int size)
    {
        ...
    }

    public async Task<TResult> Invoke<TResult>(
            Func<T, TResult> target,
            CancellationToken cancellationToken)
    {
        // await the first available T,
        // lock the T,
        // invoke the target, return the result
        // release the lock
    }
}

      

This class generally encapsulates union functionality.

My question is what is the correct way to implement this class. Does it already exist with a different name? Should I use TPL.DataFlow

? Should I manually flip it over?

The good is defined as reliable thread safe, the easier it is to maintain it the better.


If general Pool

is the wrong approach to the problem, suggest a correct alternative.


The class Pool

will be used something like this.

private readonly Pool<Analyser> pool = new Pool<Analyser>(
        () => new Analyser(a, b, c),
        100);

public async Task<string> ProcessRequest(
        string raw,
        CancellationToken cancellationToken)
{
    return await this.pool.Invoke(
        analyser => analyser.Analyse(raw),
        cancellationToken);
}

      

+3


source to share


3 answers


I think creating a shared pool is going to be quite a challenge and therefore I will have a lot of fun with it :-)

NOTE. The most important thing that differs from my vision to yours is that I don't want the pool to handle thread-bound problems with the objects it manages. The pool has some code associated with a thread, but only to manage its own state (a list of instances). Starting a thread, stopping / and / or canceling is a problem for the pool client and the constructed objects, not for the pool itself.

I would start with:

  • One-time wrapper for objects to be maintained by the pool, which will return the object to the pool when placed
  • A pool that creates or reuses available instances and transfers them before returning instances to the client.

Super simplified implementation:

class PoolItem<T> : IDisposable
{
    public event EventHandler<EventArgs> Disposed;


    public PoolItem(T wrapped)
    {
        WrappedObject = wrapped;
    }


    public T WrappedObject { get; private set; }


    public void Dispose()
    {
        Disposed(this, EventArgs.Empty);
    }
}

      



Now the pool:

class Pool<T> where T : class
{
    private static readonly object m_SyncRoot = new object();

    private readonly Func<T> m_FactoryMethod;
    private List<T> m_PoolItems = new List<T>();


    public Pool(Func<T> factoryMethod)
    {
        m_FactoryMethod = factoryMethod;
    }


    public PoolItem<T> Get()
    {
        T target = null;

        lock (m_SyncRoot)
        {
            if (m_PoolItems.Count > 0)
            {
                target = m_PoolItems[0];
                m_PoolItems.RemoveAt(0);
            }
        }

        if (target == null)
            target = m_FactoryMethod();

        var wrapper = new PoolItem<T>(target);
        wrapper.Disposed += wrapper_Disposed;

        return wrapper;
    }


    void wrapper_Disposed(object sender, EventArgs e)
    {
        var wrapper = sender as PoolItem<T>;

        lock (m_SyncRoot)
        {
            m_PoolItems.Add(wrapper.WrappedObject);
        }
    }
}

      

Using:

class ExpensiveConstructionObject
{
    public ExpensiveConstructionObject()
    {
        Console.WriteLine("Executing the expensive constructor...");
    }

    public void Do(string stuff)
    {
        Console.WriteLine("Doing: " + stuff);
    }
}

    class Program
{
    static void Main(string[] args)
    {
        var pool = new Pool<ExpensiveConstructionObject>(() => new ExpensiveConstructionObject());

        var t1 = pool.Get();
        t1.WrappedObject.Do("task 1");

        using (var t2 = pool.Get())
            t2.WrappedObject.Do("task 2");

        using (var t3 = pool.Get())
            t3.WrappedObject.Do("task 3");

        t1.Dispose();

        Console.ReadLine();
    }
}

      

Next steps:

  • classic pool functions such as: initial size, maximum size
  • dynamic proxying allowing Pool :: Get to return T rather than PoolItem
  • maintain a list of wrappers, dispose of it if the caller doesn't, when the pool gets the location
+3


source


The IIUC you are trying to achieve is a shared object pool where when you have no resource to use, you wait asynchronously until you do so.

The simplest solution would be to use TPL Dataflow

BufferBlock

to hold the elements and wait when they are empty. In your API you will get a delegate and run it, but I would recommend returning the actual item from the pool and letting it decide what to do with it:

public class ObjectPool<TItem>
{
    private readonly BufferBlock<TItem> _bufferBlock;
    private readonly int _maxSize;
    private readonly Func<TItem> _creator;
    private readonly CancellationToken _cancellationToken;
    private readonly object _lock;
    private int _currentSize;

    public ObjectPool(int maxSize, Func<TItem> creator, CancellationToken cancellationToken)
    {
        _lock = new object();
        _maxSize = maxSize;
        _currentSize = 1;
        _creator = creator;
        _cancellationToken = cancellationToken;
        _bufferBlock = new BufferBlock<TItem>(new DataflowBlockOptions{CancellationToken = cancellationToken});
    }

    public void Push(TItem item)
    {
        if (!_bufferBlock.Post(item) || _bufferBlock.Count > _maxSize)
        {
            throw new Exception();
        }
    }

    public Task<TItem> PopAsync()
    {
        TItem item;
        if (_bufferBlock.TryReceive(out item))
        {
            return Task.FromResult(item);
        }
        if (_currentSize < _maxSize)
        {
            lock (_lock)
            {
                if (_currentSize < _maxSize)
                {
                    _currentSize++;
                    _bufferBlock.Post(_creator());
                }
            }
        }

        return _bufferBlock.ReceiveAsync();
    }
}

      

Explanations:



  • I use a lock to make sure you only create a new item at a time, this can be easily replaced with AsyncLock

    if it takes a long time.
  • I am using Double Check Locking to optimize for the normal case where all elements are already created.
  • PopAsync

    returns Task

    , but is not an asynchronous method, so it completes synchronously while the item is returned. It only waits for the pool to be empty and the limit is reached.

You can add a method that returns IDisposable

so you can just put it into use scope

without issue:

public async Task<Disposable> GetDisposableAsync()
{
    return new Disposable(this, await PopAsync());
}

public class Disposable : IDisposable
{
    private readonly ObjectPool<TItem> _pool;
    public TItem Item { get; set; }

    public Disposable(ObjectPool<TItem> pool, TItem item)
    {
        Item = item;
        _pool = pool;
    }
    public void Dispose()
    {
        _pool.Push(Item);
    }
}

      

+3


source


Pool is a good solution. After all, a pool is used for this purpose (maintain a set of objects that are too expensive to instantiate every time: database connection, threads, etc.).

If you want to create a shared pool, however, you have to be very careful: users of your code can do "unexpected" things and end up shooting themselves in the foot.

Blocking, for example: you should really check that this does not result in a deadlock. Expanding the pool on the fly if needed, or throwing if the delegate asks for more objects ... Exceptions should also be treated with caution.

Therefore, the "wait for the first available T" and "block T" steps must be completely handled by the pool, and it must do all the necessary checks to avoid awkward situations. You might consider giving your "client code" (target) a pool reference to require additional locking capabilities if you want (like nested locking or something).

More practical: can you start with a solution that works specifically for your class Analyser

and then work from there to a shared pool when you need it?

-1


source







All Articles