File: src\libraries\System.Private.CoreLib\src\System\Threading\PortableThreadPool.Blocking.cs
Web Access
Project: src\src\coreclr\System.Private.CoreLib\System.Private.CoreLib.csproj (System.Private.CoreLib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
 
namespace System.Threading
{
    internal sealed partial class PortableThreadPool
    {
        public short MinThreadsGoal
        {
            get
            {
                _threadAdjustmentLock.VerifyIsLocked();
                return Math.Min(_separated.counts.NumThreadsGoal, TargetThreadsGoalForBlockingAdjustment);
            }
        }
 
        private short TargetThreadsGoalForBlockingAdjustment
        {
            get
            {
                _threadAdjustmentLock.VerifyIsLocked();
 
                return
                    _numBlockedThreads <= 0
                        ? _minThreads
                        : (short)Math.Min((ushort)(_minThreads + _numBlockedThreads), (ushort)_maxThreads);
            }
        }
 
        public bool NotifyThreadBlocked()
        {
            if (!BlockingConfig.IsCooperativeBlockingEnabled || !Thread.CurrentThread.IsThreadPoolThread)
            {
                return false;
            }
 
            bool wakeGateThread = false;
            _threadAdjustmentLock.Acquire();
            try
            {
                _numBlockedThreads++;
                Debug.Assert(_numBlockedThreads > 0);
 
                if (_pendingBlockingAdjustment != PendingBlockingAdjustment.WithDelayIfNecessary &&
                    _separated.counts.NumThreadsGoal < TargetThreadsGoalForBlockingAdjustment)
                {
                    if (_pendingBlockingAdjustment == PendingBlockingAdjustment.None)
                    {
                        wakeGateThread = true;
                    }
                    _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
                }
            }
            finally
            {
                _threadAdjustmentLock.Release();
            }
 
            if (wakeGateThread)
            {
                GateThread.Wake(this);
            }
            return true;
        }
 
        public void NotifyThreadUnblocked()
        {
            Debug.Assert(BlockingConfig.IsCooperativeBlockingEnabled);
            Debug.Assert(Thread.CurrentThread.IsThreadPoolThread);
 
            bool wakeGateThread = false;
            _threadAdjustmentLock.Acquire();
            try
            {
                Debug.Assert(_numBlockedThreads > 0);
                _numBlockedThreads--;
 
                if (_pendingBlockingAdjustment != PendingBlockingAdjustment.Immediately &&
                    _numThreadsAddedDueToBlocking > 0 &&
                    _separated.counts.NumThreadsGoal > TargetThreadsGoalForBlockingAdjustment)
                {
                    wakeGateThread = true;
                    _pendingBlockingAdjustment = PendingBlockingAdjustment.Immediately;
                }
            }
            finally
            {
                _threadAdjustmentLock.Release();
            }
 
            if (wakeGateThread)
            {
                GateThread.Wake(this);
            }
        }
 
        private uint PerformBlockingAdjustment(bool previousDelayElapsed)
        {
            uint nextDelayMs;
            bool addWorker;
            _threadAdjustmentLock.Acquire();
            try
            {
                nextDelayMs = PerformBlockingAdjustment(previousDelayElapsed, out addWorker);
            }
            finally
            {
                _threadAdjustmentLock.Release();
            }
 
            if (addWorker)
            {
                WorkerThread.MaybeAddWorkingWorker(this);
            }
            return nextDelayMs;
        }
 
        private uint PerformBlockingAdjustment(bool previousDelayElapsed, out bool addWorker)
        {
            _threadAdjustmentLock.VerifyIsLocked();
            Debug.Assert(_pendingBlockingAdjustment != PendingBlockingAdjustment.None);
 
            _pendingBlockingAdjustment = PendingBlockingAdjustment.None;
            addWorker = false;
 
            short targetThreadsGoal = TargetThreadsGoalForBlockingAdjustment;
            ThreadCounts counts = _separated.counts;
            short numThreadsGoal = counts.NumThreadsGoal;
            if (numThreadsGoal == targetThreadsGoal)
            {
                return 0;
            }
 
            if (numThreadsGoal > targetThreadsGoal)
            {
                // The goal is only decreased by how much it was increased in total due to blocking adjustments. This is to
                // allow blocking adjustments to play well with starvation and hill climbing, either of which may increase the
                // goal independently for other reasons, and blocking adjustments should not undo those changes.
                if (_numThreadsAddedDueToBlocking <= 0)
                {
                    return 0;
                }
 
                short toSubtract = Math.Min((short)(numThreadsGoal - targetThreadsGoal), _numThreadsAddedDueToBlocking);
                _numThreadsAddedDueToBlocking -= toSubtract;
                numThreadsGoal -= toSubtract;
                _separated.counts.InterlockedSetNumThreadsGoal(numThreadsGoal);
                HillClimbing.ThreadPoolHillClimber.ForceChange(
                    numThreadsGoal,
                    HillClimbing.StateOrTransition.CooperativeBlocking);
                return 0;
            }
 
            short configuredMaxThreadsWithoutDelay =
                (short)Math.Min((ushort)(_minThreads + BlockingConfig.ThreadsToAddWithoutDelay), (ushort)_maxThreads);
 
            do
            {
                // Calculate how many threads can be added without a delay. Threads that were already created but may be just
                // waiting for work can be released for work without a delay, but creating a new thread may need a delay.
                short maxThreadsGoalWithoutDelay =
                    Math.Max(configuredMaxThreadsWithoutDelay, Math.Min(counts.NumExistingThreads, _maxThreads));
                short targetThreadsGoalWithoutDelay = Math.Min(targetThreadsGoal, maxThreadsGoalWithoutDelay);
                short newNumThreadsGoal;
                if (numThreadsGoal < targetThreadsGoalWithoutDelay)
                {
                    newNumThreadsGoal = targetThreadsGoalWithoutDelay;
                }
                else if (previousDelayElapsed)
                {
                    newNumThreadsGoal = (short)(numThreadsGoal + 1);
                }
                else
                {
                    // Need to induce a delay before adding a thread
                    break;
                }
 
                do
                {
                    if (newNumThreadsGoal <= counts.NumExistingThreads || BlockingConfig.IgnoreMemoryUsage)
                    {
                        break;
                    }
 
                    //
                    // Threads would likely need to be created to compensate for blocking, so check memory usage and limits
                    //
 
                    long memoryLimitBytes = _memoryLimitBytes;
                    if (memoryLimitBytes <= 0)
                    {
                        break;
                    }
 
                    // Memory usage is updated after gen 2 GCs, and roughly represents how much physical memory was in use at
                    // the time of the last gen 2 GC. When new threads are also blocking, they may not have used their typical
                    // amount of stack space, and gen 2 GCs may not be happening to update the memory usage. Account for a bit
                    // of extra stack space usage in the future for each thread.
                    long memoryUsageBytes =
                        _memoryUsageBytes +
                        counts.NumExistingThreads * (long)WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
 
                    // The memory limit may already be less than the total amount of physical memory. We are only accounting for
                    // thread pool worker threads above, and after fallback starvation may have to continue creating threads
                    // slowly to prevent a deadlock, so calculate a threshold before falling back by giving the memory limit
                    // some additional buffer.
                    long memoryThresholdForFallbackBytes = memoryLimitBytes * 8 / 10;
                    if (memoryUsageBytes >= memoryThresholdForFallbackBytes)
                    {
                        return 0;
                    }
 
                    // Determine how many threads can be added without exceeding the memory threshold
                    long achievableNumThreadsGoal =
                        counts.NumExistingThreads +
                        (memoryThresholdForFallbackBytes - memoryUsageBytes) /
                            WorkerThread.EstimatedAdditionalStackUsagePerThreadBytes;
                    newNumThreadsGoal = (short)Math.Min(newNumThreadsGoal, achievableNumThreadsGoal);
                    if (newNumThreadsGoal <= numThreadsGoal)
                    {
                        return 0;
                    }
                } while (false);
 
                _numThreadsAddedDueToBlocking += (short)(newNumThreadsGoal - numThreadsGoal);
                counts = _separated.counts.InterlockedSetNumThreadsGoal(newNumThreadsGoal);
                HillClimbing.ThreadPoolHillClimber.ForceChange(
                    newNumThreadsGoal,
                    HillClimbing.StateOrTransition.CooperativeBlocking);
                if (counts.NumProcessingWork >= numThreadsGoal && _separated.numRequestedWorkers > 0)
                {
                    addWorker = true;
                }
 
                numThreadsGoal = newNumThreadsGoal;
                if (numThreadsGoal >= targetThreadsGoal)
                {
                    return 0;
                }
            } while (false);
 
            // Calculate how much delay to induce before another thread is created. These operations don't overflow because of
            // limits on max thread count and max delays.
            _pendingBlockingAdjustment = PendingBlockingAdjustment.WithDelayIfNecessary;
            int delayStepCount = 1 + (numThreadsGoal - configuredMaxThreadsWithoutDelay) / BlockingConfig.ThreadsPerDelayStep;
            return Math.Min((uint)delayStepCount * BlockingConfig.DelayStepMs, BlockingConfig.MaxDelayMs);
        }
 
        private enum PendingBlockingAdjustment : byte
        {
            None,
            Immediately,
            WithDelayIfNecessary
        }
 
        private static class BlockingConfig
        {
            public static readonly bool IsCooperativeBlockingEnabled =
                AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.CooperativeBlocking", true);
            public static readonly bool IgnoreMemoryUsage =
                AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.Blocking.IgnoreMemoryUsage", false);
 
            public static readonly short ThreadsToAddWithoutDelay;
            public static readonly short ThreadsPerDelayStep;
            public static readonly uint DelayStepMs;
            public static readonly uint MaxDelayMs;
 
#pragma warning disable CA1810 // remove the explicit static constructor
            static BlockingConfig()
            {
                // Summary description of how blocking compensation works and how the config settings below are used:
                // - After the thread count based on MinThreads is reached, up to ThreadsToAddWithoutDelay additional threads
                //   may be created without a delay
                // - After that, before each additional thread is created, a delay is induced, starting with DelayStepMs
                // - For every ThreadsPerDelayStep threads that are added with a delay, an additional DelayStepMs is added to
                //   the delay
                // - The delay may not exceed MaxDelayMs
                // - Delays are only induced before creating threads. If threads are already available, they would be released
                //   without delay to compensate for cooperative blocking.
                // - Physical memory usage and limits are also used and beyond a threshold, the system switches to fallback mode
                //   where threads would be created if starvation is detected, typically with higher delays
 
                // After the thread count based on MinThreads is reached, this value (after it is multiplied by the processor
                // count) specifies how many additional threads may be created without a delay
                int blocking_threadsToAddWithoutDelay_procCountFactor =
                    AppContextConfigHelper.GetInt32Config(
                        "System.Threading.ThreadPool.Blocking.ThreadsToAddWithoutDelay_ProcCountFactor",
                        1,
                        false);
 
                // After the thread count based on ThreadsToAddWithoutDelay is reached, this value (after it is multiplied by
                // the processor count) specifies after how many threads an additional DelayStepMs would be added to the delay
                // before each new thread is created
                int blocking_threadsPerDelayStep_procCountFactor =
                    AppContextConfigHelper.GetInt32Config(
                        "System.Threading.ThreadPool.Blocking.ThreadsPerDelayStep_ProcCountFactor",
                        1,
                        false);
 
                // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies how much additional
                // delay to add per ThreadsPerDelayStep threads, which would be applied before each new thread is created
                DelayStepMs =
                    (uint)AppContextConfigHelper.GetInt32Config(
                        "System.Threading.ThreadPool.Blocking.DelayStepMs",
                        25,
                        false);
 
                // After the thread count based on ThreadsToAddWithoutDelay is reached, this value specifies the max delay to
                // use before each new thread is created
                MaxDelayMs =
                    (uint)AppContextConfigHelper.GetInt32Config(
                        "System.Threading.ThreadPool.Blocking.MaxDelayMs",
                        250,
                        false);
 
                int processorCount = Environment.ProcessorCount;
                ThreadsToAddWithoutDelay = (short)(processorCount * blocking_threadsToAddWithoutDelay_procCountFactor);
                if (ThreadsToAddWithoutDelay > MaxPossibleThreadCount ||
                    ThreadsToAddWithoutDelay / processorCount != blocking_threadsToAddWithoutDelay_procCountFactor)
                {
                    ThreadsToAddWithoutDelay = MaxPossibleThreadCount;
                }
 
                blocking_threadsPerDelayStep_procCountFactor = Math.Max(1, blocking_threadsPerDelayStep_procCountFactor);
                short maxThreadsPerDelayStep = (short)(MaxPossibleThreadCount - ThreadsToAddWithoutDelay);
                ThreadsPerDelayStep =
                    (short)(processorCount * blocking_threadsPerDelayStep_procCountFactor);
                if (ThreadsPerDelayStep > maxThreadsPerDelayStep ||
                    ThreadsPerDelayStep / processorCount != blocking_threadsPerDelayStep_procCountFactor)
                {
                    ThreadsPerDelayStep = maxThreadsPerDelayStep;
                }
 
                MaxDelayMs = Math.Max(1, Math.Min(MaxDelayMs, GateThread.GateActivitiesPeriodMs));
                DelayStepMs = Math.Max(1, Math.Min(DelayStepMs, MaxDelayMs));
            }
#pragma warning restore CA1810
        }
    }
}