File: Internals\System\Runtime\SynchronizedPool.cs
Web Access
Project: src\src\System.ServiceModel.Primitives\src\System.ServiceModel.Primitives.csproj (System.ServiceModel.Primitives)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// See the LICENSE file in the project root for more information.
 
 
using System.Collections.Generic;
using System.Security;
 
namespace System.Runtime
{
    [Fx.Tag.SynchronizationObject(Blocking = false)]
    // A simple synchronized pool would simply lock a stack and push/pop on return/take.
    //
    // This implementation tries to reduce locking by exploiting the case where an item
    // is taken and returned by the same thread, which turns out to be common in our 
    // scenarios.  
    //
    // Initially, all the quota is allocated to a global (non-thread-specific) pool, 
    // which takes locks.  As different threads take and return values, we record their IDs, 
    // and if we detect that a thread is taking and returning "enough" on the same thread, 
    // then we decide to "promote" the thread.  When a thread is promoted, we decrease the 
    // quota of the global pool by one, and allocate a thread-specific entry for the thread 
    // to store it's value.  Once this entry is allocated, the thread can take and return 
    // it's value from that entry without taking any locks.  Not only does this avoid 
    // locks, but it affinitizes pooled items to a particular thread.
    //
    // There are a couple of additional things worth noting:
    // 
    // It is possible for a thread that we have reserved an entry for to exit.  This means
    // we will still have a entry allocated for it, but the pooled item stored there 
    // will never be used.  After a while, we could end up with a number of these, and 
    // as a result we would begin to exhaust the quota of the overall pool.  To mitigate this
    // case, we throw away the entire per-thread pool, and return all the quota back to 
    // the global pool if we are unable to promote a thread (due to lack of space).  Then 
    // the set of active threads will be re-promoted as they take and return items.
    // 
    // You may notice that the code does not immediately promote a thread, and does not
    // immediately throw away the entire per-thread pool when it is unable to promote a 
    // thread.  Instead, it uses counters (based on the number of calls to the pool) 
    // and a threshold to figure out when to do these operations.  In the case where the
    // pool to misconfigured to have too few items for the workload, this avoids constant 
    // promoting and rebuilding of the per thread entries.
    //
    // You may also notice that we do not use interlocked methods when adjusting statistics.
    // Since the statistics are a heuristic as to how often something is happening, they 
    // do not need to be perfect.
    // 
    internal class SynchronizedPool<T> where T : class
    {
        private const int MaxPendingEntries = 128;
        private const int MaxPromotionFailures = 64;
        private const int MaxReturnsBeforePromotion = 64;
        private const int MaxThreadItemsPerProcessor = 16;
        private Entry[] _entries;
        private GlobalPool _globalPool;
        private int _maxCount;
        private PendingEntry[] _pending;
        private int _promotionFailures;
 
        public SynchronizedPool(int maxCount)
        {
            int threadCount = maxCount;
            int maxThreadCount = MaxThreadItemsPerProcessor + SynchronizedPoolHelper.ProcessorCount;
            if (threadCount > maxThreadCount)
            {
                threadCount = maxThreadCount;
            }
            _maxCount = maxCount;
            _entries = new Entry[threadCount];
            _pending = new PendingEntry[4];
            _globalPool = new GlobalPool(maxCount);
        }
 
        private object ThisLock
        {
            get
            {
                return this;
            }
        }
 
        public void Clear()
        {
            Entry[] entries = _entries;
 
            for (int i = 0; i < entries.Length; i++)
            {
                entries[i].value = null;
            }
 
            _globalPool.Clear();
        }
 
        private void HandlePromotionFailure(int thisThreadID)
        {
            int newPromotionFailures = _promotionFailures + 1;
 
            if (newPromotionFailures >= MaxPromotionFailures)
            {
                lock (ThisLock)
                {
                    _entries = new Entry[_entries.Length];
 
                    _globalPool.MaxCount = _maxCount;
                }
 
                PromoteThread(thisThreadID);
            }
            else
            {
                _promotionFailures = newPromotionFailures;
            }
        }
 
        private bool PromoteThread(int thisThreadID)
        {
            lock (ThisLock)
            {
                for (int i = 0; i < _entries.Length; i++)
                {
                    int threadID = _entries[i].threadID;
 
                    if (threadID == thisThreadID)
                    {
                        return true;
                    }
                    else if (threadID == 0)
                    {
                        _globalPool.DecrementMaxCount();
                        _entries[i].threadID = thisThreadID;
                        return true;
                    }
                }
            }
 
            return false;
        }
 
        private void RecordReturnToGlobalPool(int thisThreadID)
        {
            PendingEntry[] localPending = _pending;
 
            for (int i = 0; i < localPending.Length; i++)
            {
                int threadID = localPending[i].threadID;
 
                if (threadID == thisThreadID)
                {
                    int newReturnCount = localPending[i].returnCount + 1;
 
                    if (newReturnCount >= MaxReturnsBeforePromotion)
                    {
                        localPending[i].returnCount = 0;
 
                        if (!PromoteThread(thisThreadID))
                        {
                            HandlePromotionFailure(thisThreadID);
                        }
                    }
                    else
                    {
                        localPending[i].returnCount = newReturnCount;
                    }
                    break;
                }
                else if (threadID == 0)
                {
                    break;
                }
            }
        }
 
        private void RecordTakeFromGlobalPool(int thisThreadID)
        {
            PendingEntry[] localPending = _pending;
 
            for (int i = 0; i < localPending.Length; i++)
            {
                int threadID = localPending[i].threadID;
 
                if (threadID == thisThreadID)
                {
                    return;
                }
                else if (threadID == 0)
                {
                    lock (localPending)
                    {
                        if (localPending[i].threadID == 0)
                        {
                            localPending[i].threadID = thisThreadID;
                            return;
                        }
                    }
                }
            }
 
            if (localPending.Length >= MaxPendingEntries)
            {
                _pending = new PendingEntry[localPending.Length];
            }
            else
            {
                PendingEntry[] newPending = new PendingEntry[localPending.Length * 2];
                Array.Copy(localPending, newPending, localPending.Length);
                _pending = newPending;
            }
        }
 
        public bool Return(T value)
        {
            int thisThreadID = Environment.CurrentManagedThreadId;
 
            if (thisThreadID == 0)
            {
                return false;
            }
 
            if (ReturnToPerThreadPool(thisThreadID, value))
            {
                return true;
            }
 
            return ReturnToGlobalPool(thisThreadID, value);
        }
 
        private bool ReturnToPerThreadPool(int thisThreadID, T value)
        {
            Entry[] entries = _entries;
 
            for (int i = 0; i < entries.Length; i++)
            {
                int threadID = entries[i].threadID;
 
                if (threadID == thisThreadID)
                {
                    if (entries[i].value == null)
                    {
                        entries[i].value = value;
                        return true;
                    }
                    else
                    {
                        return false;
                    }
                }
                else if (threadID == 0)
                {
                    break;
                }
            }
 
            return false;
        }
 
        private bool ReturnToGlobalPool(int thisThreadID, T value)
        {
            RecordReturnToGlobalPool(thisThreadID);
 
            return _globalPool.Return(value);
        }
 
        public T Take()
        {
            int thisThreadID = Environment.CurrentManagedThreadId;
 
            if (thisThreadID == 0)
            {
                return null;
            }
 
            T value = TakeFromPerThreadPool(thisThreadID);
 
            if (value != null)
            {
                return value;
            }
 
            return TakeFromGlobalPool(thisThreadID);
        }
 
        private T TakeFromPerThreadPool(int thisThreadID)
        {
            Entry[] entries = _entries;
 
            for (int i = 0; i < entries.Length; i++)
            {
                int threadID = entries[i].threadID;
 
                if (threadID == thisThreadID)
                {
                    T value = entries[i].value;
 
                    if (value != null)
                    {
                        entries[i].value = null;
                        return value;
                    }
                    else
                    {
                        return null;
                    }
                }
                else if (threadID == 0)
                {
                    break;
                }
            }
 
            return null;
        }
 
        private T TakeFromGlobalPool(int thisThreadID)
        {
            RecordTakeFromGlobalPool(thisThreadID);
 
            return _globalPool.Take();
        }
 
        private struct Entry
        {
            public int threadID;
            public T value;
        }
 
        private struct PendingEntry
        {
            public int returnCount;
            public int threadID;
        }
 
        internal static class SynchronizedPoolHelper
        {
            public static readonly int ProcessorCount = GetProcessorCount();
 
            [Fx.Tag.SecurityNote(Critical = "Asserts in order to get the processor count from the environment", Safe = "This data isn't actually protected so it's ok to leak")]
            [SecuritySafeCritical]
            private static int GetProcessorCount()
            {
                return Environment.ProcessorCount;
            }
        }
        [Fx.Tag.SynchronizationObject(Blocking = false)]
 
        internal class GlobalPool
        {
            private Stack<T> _items;
 
            private int _maxCount;
 
            public GlobalPool(int maxCount)
            {
                _items = new Stack<T>();
                _maxCount = maxCount;
            }
 
            public int MaxCount
            {
                get
                {
                    return _maxCount;
                }
                set
                {
                    lock (ThisLock)
                    {
                        while (_items.Count > value)
                        {
                            _items.Pop();
                        }
                        _maxCount = value;
                    }
                }
            }
 
            private object ThisLock
            {
                get
                {
                    return this;
                }
            }
 
            public void DecrementMaxCount()
            {
                lock (ThisLock)
                {
                    if (_items.Count == _maxCount)
                    {
                        _items.Pop();
                    }
                    _maxCount--;
                }
            }
 
            public T Take()
            {
                if (_items.Count > 0)
                {
                    lock (ThisLock)
                    {
                        if (_items.Count > 0)
                        {
                            return _items.Pop();
                        }
                    }
                }
                return null;
            }
 
            public bool Return(T value)
            {
                if (_items.Count < MaxCount)
                {
                    lock (ThisLock)
                    {
                        if (_items.Count < MaxCount)
                        {
                            _items.Push(value);
                            return true;
                        }
                    }
                }
                return false;
            }
 
            public void Clear()
            {
                lock (ThisLock)
                {
                    _items.Clear();
                }
            }
        }
    }
}