File: src\libraries\System.Private.CoreLib\src\System\Threading\PortableThreadPool.HillClimbing.cs
Web Access
Project: src\src\coreclr\System.Private.CoreLib\System.Private.CoreLib.csproj (System.Private.CoreLib)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System.Diagnostics;
using System.Diagnostics.Tracing;
 
namespace System.Threading
{
    internal sealed partial class PortableThreadPool
    {
        /// <summary>
        /// Hill climbing algorithm used for determining the number of threads needed for the thread pool.
        /// </summary>
        private sealed partial class HillClimbing
        {
            private const int LogCapacity = 200;
            private const int DefaultSampleIntervalMsLow = 10;
            private const int DefaultSampleIntervalMsHigh = 200;
 
            public static readonly bool IsDisabled = AppContextConfigHelper.GetBooleanConfig("System.Threading.ThreadPool.HillClimbing.Disable", false);
 
            // SOS's ThreadPool command depends on this name
            public static readonly HillClimbing ThreadPoolHillClimber = new HillClimbing();
 
            // SOS's ThreadPool command depends on the enum values
            public enum StateOrTransition
            {
                Warmup,
                Initializing,
                RandomMove,
                ClimbingMove,
                ChangePoint,
                Stabilizing,
                Starvation,
                ThreadTimedOut,
                CooperativeBlocking,
            }
 
            // SOS's ThreadPool command depends on the names of all fields
            private struct LogEntry
            {
                public int tickCount;
                public StateOrTransition stateOrTransition;
                public int newControlSetting;
                public int lastHistoryCount;
                public float lastHistoryMean;
            }
 
            private readonly int _wavePeriod;
            private readonly int _samplesToMeasure;
            private readonly double _targetThroughputRatio;
            private readonly double _targetSignalToNoiseRatio;
            private readonly double _maxChangePerSecond;
            private readonly double _maxChangePerSample;
            private readonly int _maxThreadWaveMagnitude;
            private readonly int _sampleIntervalMsLow;
            private readonly double _threadMagnitudeMultiplier;
            private readonly int _sampleIntervalMsHigh;
            private readonly double _throughputErrorSmoothingFactor;
            private readonly double _gainExponent;
            private readonly double _maxSampleError;
 
            private double _currentControlSetting;
            private long _totalSamples;
            private int _lastThreadCount;
            private double _averageThroughputNoise;
            private double _secondsElapsedSinceLastChange;
            private double _completionsSinceLastChange;
            private int _accumulatedCompletionCount;
            private double _accumulatedSampleDurationSeconds;
            private readonly double[] _samples;
            private readonly double[] _threadCounts;
            private int _currentSampleMs;
 
            private readonly Random.XoshiroImpl _randomIntervalGenerator = new Random.XoshiroImpl();
 
            private readonly LogEntry[] _log = new LogEntry[LogCapacity]; // SOS's ThreadPool command depends on this name
            private int _logStart; // SOS's ThreadPool command depends on this name
            private int _logSize; // SOS's ThreadPool command depends on this name
 
            public HillClimbing()
            {
                _wavePeriod = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WavePeriod", 4, false);
                _maxThreadWaveMagnitude = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxWaveMagnitude", 20, false);
                _threadMagnitudeMultiplier = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WaveMagnitudeMultiplier", 100, false) / 100.0;
                _samplesToMeasure = _wavePeriod * AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.WaveHistorySize", 8, false);
                _targetThroughputRatio = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.Bias", 15, false) / 100.0;
                _targetSignalToNoiseRatio = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.TargetSignalToNoiseRatio", 300, false) / 100.0;
                _maxChangePerSecond = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxChangePerSecond", 4, false);
                _maxChangePerSample = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxChangePerSample", 20, false);
                int sampleIntervalMsLow = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.SampleIntervalLow", DefaultSampleIntervalMsLow, false);
                int sampleIntervalMsHigh = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.SampleIntervalHigh", DefaultSampleIntervalMsHigh, false);
                if (sampleIntervalMsLow <= sampleIntervalMsHigh)
                {
                    _sampleIntervalMsLow = sampleIntervalMsLow;
                    _sampleIntervalMsHigh = sampleIntervalMsHigh;
                }
                else
                {
                    _sampleIntervalMsLow = DefaultSampleIntervalMsLow;
                    _sampleIntervalMsHigh = DefaultSampleIntervalMsHigh;
                }
                _throughputErrorSmoothingFactor = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.ErrorSmoothingFactor", 1, false) / 100.0;
                _gainExponent = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.GainExponent", 200, false) / 100.0;
                _maxSampleError = AppContextConfigHelper.GetInt32Config("System.Threading.ThreadPool.HillClimbing.MaxSampleErrorPercent", 15, false) / 100.0;
 
                _samples = new double[_samplesToMeasure];
                _threadCounts = new double[_samplesToMeasure];
 
                _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
            }
 
            public (int newThreadCount, int newSampleMs) Update(int currentThreadCount, double sampleDurationSeconds, int numCompletions)
            {
 
                //
                // If someone changed the thread count without telling us, update our records accordingly.
                //
                if (currentThreadCount != _lastThreadCount)
                    ForceChange(currentThreadCount, StateOrTransition.Initializing);
 
                //
                // Update the cumulative stats for this thread count
                //
                _secondsElapsedSinceLastChange += sampleDurationSeconds;
                _completionsSinceLastChange += numCompletions;
 
                //
                // Add in any data we've already collected about this sample
                //
                sampleDurationSeconds += _accumulatedSampleDurationSeconds;
                numCompletions += _accumulatedCompletionCount;
 
                //
                // We need to make sure we're collecting reasonably accurate data.  Since we're just counting the end
                // of each work item, we are goinng to be missing some data about what really happened during the
                // sample interval.  The count produced by each thread includes an initial work item that may have
                // started well before the start of the interval, and each thread may have been running some new
                // work item for some time before the end of the interval, which did not yet get counted.  So
                // our count is going to be off by +/- threadCount workitems.
                //
                // The exception is that the thread that reported to us last time definitely wasn't running any work
                // at that time, and the thread that's reporting now definitely isn't running a work item now.  So
                // we really only need to consider threadCount-1 threads.
                //
                // Thus the percent error in our count is +/- (threadCount-1)/numCompletions.
                //
                // We cannot rely on the frequency-domain analysis we'll be doing later to filter out this error, because
                // of the way it accumulates over time.  If this sample is off by, say, 33% in the negative direction,
                // then the next one likely will be too.  The one after that will include the sum of the completions
                // we missed in the previous samples, and so will be 33% positive.  So every three samples we'll have
                // two "low" samples and one "high" sample.  This will appear as periodic variation right in the frequency
                // range we're targeting, which will not be filtered by the frequency-domain translation.
                //
                if (_totalSamples > 0 && ((currentThreadCount - 1.0) / numCompletions) >= _maxSampleError)
                {
                    // not accurate enough yet.  Let's accumulate the data so far, and tell the ThreadPool
                    // to collect a little more.
                    _accumulatedSampleDurationSeconds = sampleDurationSeconds;
                    _accumulatedCompletionCount = numCompletions;
                    return (currentThreadCount, 10);
                }
 
                //
                // We've got enouugh data for our sample; reset our accumulators for next time.
                //
                _accumulatedSampleDurationSeconds = 0;
                _accumulatedCompletionCount = 0;
 
                //
                // Add the current thread count and throughput sample to our history
                //
                double throughput = numCompletions / sampleDurationSeconds;
                if (NativeRuntimeEventSource.Log.IsEnabled())
                {
                    NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentSample(throughput);
                }
 
                int sampleIndex = (int)(_totalSamples % _samplesToMeasure);
                _samples[sampleIndex] = throughput;
                _threadCounts[sampleIndex] = currentThreadCount;
                _totalSamples++;
 
                //
                // Set up defaults for our metrics
                //
                Complex threadWaveComponent = default;
                Complex throughputWaveComponent = default;
                double throughputErrorEstimate = 0;
                Complex ratio = default;
                double confidence = 0;
 
                StateOrTransition state = StateOrTransition.Warmup;
 
                //
                // How many samples will we use?  It must be at least the three wave periods we're looking for, and it must also be a whole
                // multiple of the primary wave's period; otherwise the frequency we're looking for will fall between two  frequency bands
                // in the Fourier analysis, and we won't be able to measure it accurately.
                //
                int sampleCount = ((int)Math.Min(_totalSamples - 1, _samplesToMeasure)) / _wavePeriod * _wavePeriod;
 
                if (sampleCount > _wavePeriod)
                {
                    //
                    // Average the throughput and thread count samples, so we can scale the wave magnitudes later.
                    //
                    double sampleSum = 0;
                    double threadSum = 0;
                    for (int i = 0; i < sampleCount; i++)
                    {
                        sampleSum += _samples[(_totalSamples - sampleCount + i) % _samplesToMeasure];
                        threadSum += _threadCounts[(_totalSamples - sampleCount + i) % _samplesToMeasure];
                    }
                    double averageThroughput = sampleSum / sampleCount;
                    double averageThreadCount = threadSum / sampleCount;
 
                    if (averageThroughput > 0 && averageThreadCount > 0)
                    {
                        //
                        // Calculate the periods of the adjacent frequency bands we'll be using to measure noise levels.
                        // We want the two adjacent Fourier frequency bands.
                        //
                        double adjacentPeriod1 = sampleCount / (((double)sampleCount / _wavePeriod) + 1);
                        double adjacentPeriod2 = sampleCount / (((double)sampleCount / _wavePeriod) - 1);
 
                        //
                        // Get the three different frequency components of the throughput (scaled by average
                        // throughput).  Our "error" estimate (the amount of noise that might be present in the
                        // frequency band we're really interested in) is the average of the adjacent bands.
                        //
                        throughputWaveComponent = GetWaveComponent(_samples, sampleCount, _wavePeriod) / averageThroughput;
                        throughputErrorEstimate = (GetWaveComponent(_samples, sampleCount, adjacentPeriod1) / averageThroughput).Abs();
                        if (adjacentPeriod2 <= sampleCount)
                        {
                            throughputErrorEstimate = Math.Max(throughputErrorEstimate, (GetWaveComponent(_samples, sampleCount, adjacentPeriod2) / averageThroughput).Abs());
                        }
 
                        //
                        // Do the same for the thread counts, so we have something to compare to.  We don't measure thread count
                        // noise, because there is none; these are exact measurements.
                        //
                        threadWaveComponent = GetWaveComponent(_threadCounts, sampleCount, _wavePeriod) / averageThreadCount;
 
                        //
                        // Update our moving average of the throughput noise.  We'll use this later as feedback to
                        // determine the new size of the thread wave.
                        //
                        if (_averageThroughputNoise == 0)
                            _averageThroughputNoise = throughputErrorEstimate;
                        else
                            _averageThroughputNoise = (_throughputErrorSmoothingFactor * throughputErrorEstimate) + ((1.0 - _throughputErrorSmoothingFactor) * _averageThroughputNoise);
 
                        if (threadWaveComponent.Abs() > 0)
                        {
                            //
                            // Adjust the throughput wave so it's centered around the target wave, and then calculate the adjusted throughput/thread ratio.
                            //
                            ratio = (throughputWaveComponent - (_targetThroughputRatio * threadWaveComponent)) / threadWaveComponent;
                            state = StateOrTransition.ClimbingMove;
                        }
                        else
                        {
                            ratio = new Complex(0, 0);
                            state = StateOrTransition.Stabilizing;
                        }
 
                        //
                        // Calculate how confident we are in the ratio.  More noise == less confident.  This has
                        // the effect of slowing down movements that might be affected by random noise.
                        //
                        double noiseForConfidence = Math.Max(_averageThroughputNoise, throughputErrorEstimate);
                        if (noiseForConfidence > 0)
                            confidence = (threadWaveComponent.Abs() / noiseForConfidence) / _targetSignalToNoiseRatio;
                        else
                            confidence = 1.0; //there is no noise!
 
                    }
                }
 
                //
                // We use just the real part of the complex ratio we just calculated.  If the throughput signal
                // is exactly in phase with the thread signal, this will be the same as taking the magnitude of
                // the complex move and moving that far up.  If they're 180 degrees out of phase, we'll move
                // backward (because this indicates that our changes are having the opposite of the intended effect).
                // If they're 90 degrees out of phase, we won't move at all, because we can't tell whether we're
                // having a negative or positive effect on throughput.
                //
                double move = Math.Min(1.0, Math.Max(-1.0, ratio.Real));
 
                //
                // Apply our confidence multiplier.
                //
                move *= Math.Min(1.0, Math.Max(0.0, confidence));
 
                //
                // Now apply non-linear gain, such that values around zero are attenuated, while higher values
                // are enhanced.  This allows us to move quickly if we're far away from the target, but more slowly
                // if we're getting close, giving us rapid ramp-up without wild oscillations around the target.
                //
                double gain = _maxChangePerSecond * sampleDurationSeconds;
                move = Math.Pow(Math.Abs(move), _gainExponent) * (move >= 0.0 ? 1 : -1) * gain;
                move = Math.Min(move, _maxChangePerSample);
 
                //
                // If the result was positive, and CPU is > 95%, refuse the move.
                //
                PortableThreadPool threadPoolInstance = ThreadPoolInstance;
                if (move > 0.0 && threadPoolInstance._cpuUtilization > CpuUtilizationHigh)
                    move = 0.0;
 
                //
                // Apply the move to our control setting
                //
                _currentControlSetting += move;
 
                //
                // Calculate the new thread wave magnitude, which is based on the moving average we've been keeping of
                // the throughput error.  This average starts at zero, so we'll start with a nice safe little wave at first.
                //
                int newThreadWaveMagnitude = (int)(0.5 + (_currentControlSetting * _averageThroughputNoise * _targetSignalToNoiseRatio * _threadMagnitudeMultiplier * 2.0));
                newThreadWaveMagnitude = Math.Min(newThreadWaveMagnitude, _maxThreadWaveMagnitude);
                newThreadWaveMagnitude = Math.Max(newThreadWaveMagnitude, 1);
 
                //
                // Make sure our control setting is within the ThreadPool's limits. When some threads are blocked due to
                // cooperative blocking, ensure that hill climbing does not decrease the thread count below the expected
                // minimum.
                //
                int maxThreads = threadPoolInstance._maxThreads;
                int minThreads = threadPoolInstance.MinThreadsGoal;
 
                _currentControlSetting = Math.Min(maxThreads - newThreadWaveMagnitude, _currentControlSetting);
                _currentControlSetting = Math.Max(minThreads, _currentControlSetting);
 
                //
                // Calculate the new thread count (control setting + square wave)
                //
                int newThreadCount = (int)(_currentControlSetting + newThreadWaveMagnitude * ((_totalSamples / (_wavePeriod / 2)) % 2));
 
                //
                // Make sure the new thread count doesn't exceed the ThreadPool's limits
                //
                newThreadCount = Math.Min(maxThreads, newThreadCount);
                newThreadCount = Math.Max(minThreads, newThreadCount);
 
                //
                // Record these numbers for posterity
                //
 
                if (NativeRuntimeEventSource.Log.IsEnabled())
                {
                    NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentStats(sampleDurationSeconds, throughput, threadWaveComponent.Real, throughputWaveComponent.Real,
                        throughputErrorEstimate, _averageThroughputNoise, ratio.Real, confidence, _currentControlSetting, (ushort)newThreadWaveMagnitude);
                }
 
 
                //
                // If all of this caused an actual change in thread count, log that as well.
                //
                if (newThreadCount != currentThreadCount)
                {
                    ChangeThreadCount(newThreadCount, state);
                    _secondsElapsedSinceLastChange = 0;
                    _completionsSinceLastChange = 0;
                }
 
                //
                // Return the new thread count and sample interval.  This is randomized to prevent correlations with other periodic
                // changes in throughput.  Among other things, this prevents us from getting confused by Hill Climbing instances
                // running in other processes.
                //
                // If we're at minThreads, and we seem to be hurting performance by going higher, we can't go any lower to fix this.  So
                // we'll simply stay at minThreads much longer, and only occasionally try a higher value.
                //
                int newSampleInterval;
                if (ratio.Real < 0.0 && newThreadCount == minThreads)
                    newSampleInterval = (int)(0.5 + _currentSampleMs * (10.0 * Math.Min(-ratio.Real, 1.0)));
                else
                    newSampleInterval = _currentSampleMs;
 
                return (newThreadCount, newSampleInterval);
            }
 
            private void ChangeThreadCount(int newThreadCount, StateOrTransition state)
            {
                _lastThreadCount = newThreadCount;
 
                if (state != StateOrTransition.CooperativeBlocking) // this can be noisy
                {
                    _currentSampleMs = _randomIntervalGenerator.Next(_sampleIntervalMsLow, _sampleIntervalMsHigh + 1);
                }
 
                double throughput = _secondsElapsedSinceLastChange > 0 ? _completionsSinceLastChange / _secondsElapsedSinceLastChange : 0;
                LogTransition(newThreadCount, throughput, state);
            }
 
            private void LogTransition(int newThreadCount, double throughput, StateOrTransition stateOrTransition)
            {
                // Use the _log array as a circular array for log entries
                int index = (_logStart + _logSize) % LogCapacity;
 
                if (_logSize == LogCapacity)
                {
                    _logStart = (_logStart + 1) % LogCapacity;
                    _logSize--; // hide this slot while we update it
                }
 
                ref LogEntry entry = ref _log[index];
 
                entry.tickCount = Environment.TickCount;
                entry.stateOrTransition = stateOrTransition;
                entry.newControlSetting = newThreadCount;
                entry.lastHistoryCount = (int)(Math.Min(_totalSamples, _samplesToMeasure) / _wavePeriod) * _wavePeriod;
                entry.lastHistoryMean = (float)throughput;
 
                _logSize++;
 
                if (NativeRuntimeEventSource.Log.IsEnabled())
                {
                    NativeRuntimeEventSource.Log.ThreadPoolWorkerThreadAdjustmentAdjustment(
                        throughput,
                        (uint)newThreadCount,
                        (NativeRuntimeEventSource.ThreadAdjustmentReasonMap)stateOrTransition);
                }
            }
 
            public void ForceChange(int newThreadCount, StateOrTransition state)
            {
                if (_lastThreadCount != newThreadCount)
                {
                    _currentControlSetting += newThreadCount - _lastThreadCount;
                    ChangeThreadCount(newThreadCount, state);
                }
            }
 
            private Complex GetWaveComponent(double[] samples, int numSamples, double period)
            {
                Debug.Assert(numSamples >= period); // can't measure a wave that doesn't fit
                Debug.Assert(period >= 2); // can't measure above the Nyquist frequency
                Debug.Assert(numSamples <= samples.Length); // can't measure more samples than we have
 
                //
                // Calculate the sinusoid with the given period.
                // We're using the Goertzel algorithm for this.  See http://en.wikipedia.org/wiki/Goertzel_algorithm.
                //
 
                double w = 2 * Math.PI / period;
                double cos = Math.Cos(w);
                double coeff = 2 * cos;
                double q0, q1 = 0, q2 = 0;
                for (int i = 0; i < numSamples; ++i)
                {
                    q0 = coeff * q1 - q2 + samples[(_totalSamples - numSamples + i) % _samplesToMeasure];
                    q2 = q1;
                    q1 = q0;
                }
                return new Complex(q1 - q2 * cos, q2 * Math.Sin(w)) / numSamples;
            }
        }
    }
}