|
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
// =+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+=+
//
// Implements the algorithm for distributing loop indices to parallel loop workers
//
// =-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-=-
using System.Diagnostics;
using System.Numerics;
using System.Runtime.CompilerServices;
using System.Runtime.InteropServices;
namespace System.Threading.Tasks
{
/// <summary>
/// Represents an index range
/// </summary>
[StructLayout(LayoutKind.Auto)]
internal struct IndexRange
{
// the From and To values for this range. These do not change.
internal long _nFromInclusive;
internal long _nToExclusive;
// The shared index, stored as the offset from nFromInclusive. Using an offset rather than the actual
// value saves us from overflows that can happen due to multiple workers racing to increment this.
// All updates to this field need to be interlocked. To avoid split interlockeds across cache-lines
// in 32-bit processes, in 32-bit processes when the range fits in a 32-bit value, we prefer to use
// a 32-bit field, and just use the first 32-bits of the long. And to minimize false sharing, each
// value is stored in its own heap-allocated object, which is lazily allocated by the thread using
// that range, minimizing the chances it'll be near the objects from other threads.
internal volatile StrongBox<long>? _nSharedCurrentIndexOffset;
// to be set to true by the worker that finishes this range. It's OK to do a non-interlocked write here.
internal bool _bRangeFinished;
}
/// <summary>
/// The RangeWorker struct wraps the state needed by a task that services the parallel loop
/// </summary>
[StructLayout(LayoutKind.Auto)]
internal struct RangeWorker
{
// reference to the IndexRange array allocated by the range manager
internal readonly IndexRange[] _indexRanges;
// index of the current index range that this worker is grabbing chunks from
internal int _nCurrentIndexRange;
// the step for this loop. Duplicated here for quick access (rather than jumping to rangemanager)
internal long _nStep;
// increment value is the current amount that this worker will use
// to increment the shared index of the range it's working on
internal long _nIncrementValue;
// the increment value is doubled each time this worker finds work, and is capped at this value
internal readonly long _nMaxIncrementValue;
// whether to use 32-bits or 64-bits of current index in each range
internal readonly bool _use32BitCurrentIndex;
internal bool IsInitialized { get { return _indexRanges != null; } }
/// <summary>
/// Initializes a RangeWorker struct
/// </summary>
internal RangeWorker(IndexRange[] ranges, int nInitialRange, long nStep, bool use32BitCurrentIndex)
{
_indexRanges = ranges;
_use32BitCurrentIndex = use32BitCurrentIndex;
_nCurrentIndexRange = nInitialRange;
_nStep = nStep;
_nIncrementValue = nStep;
_nMaxIncrementValue = Parallel.DEFAULT_LOOP_STRIDE * nStep;
}
/// <summary>
/// Implements the core work search algorithm that will be used for this range worker.
/// </summary>
///
/// Usage pattern is:
/// 1) the thread associated with this rangeworker calls FindNewWork
/// 2) if we return true, the worker uses the nFromInclusiveLocal and nToExclusiveLocal values
/// to execute the sequential loop
/// 3) if we return false it means there is no more work left. It's time to quit.
///
private bool FindNewWork(out long nFromInclusiveLocal, out long nToExclusiveLocal)
{
// since we iterate over index ranges circularly, we will use the
// count of visited ranges as our exit condition
int numIndexRangesToVisit = _indexRanges.Length;
do
{
// local snap to save array access bounds checks in places where we only read fields
IndexRange currentRange = _indexRanges[_nCurrentIndexRange];
if (!currentRange._bRangeFinished)
{
StrongBox<long>? sharedCurrentIndexOffset = _indexRanges[_nCurrentIndexRange]._nSharedCurrentIndexOffset;
if (sharedCurrentIndexOffset == null)
{
Interlocked.CompareExchange(ref _indexRanges[_nCurrentIndexRange]._nSharedCurrentIndexOffset, new StrongBox<long>(0), null);
sharedCurrentIndexOffset = _indexRanges[_nCurrentIndexRange]._nSharedCurrentIndexOffset!;
}
long nMyOffset;
if (IntPtr.Size == 4 && _use32BitCurrentIndex)
{
// In 32-bit processes, we prefer to use 32-bit interlocked operations, to avoid the possibility of doing
// a 64-bit interlocked when the target value crosses a cache line, as that can be super expensive.
// We use the first 32 bits of the Int64 index in such cases.
unsafe
{
fixed (long* indexPtr = &sharedCurrentIndexOffset.Value)
{
nMyOffset = Interlocked.Add(ref *(int*)indexPtr, (int)_nIncrementValue) - _nIncrementValue;
}
}
}
else
{
nMyOffset = Interlocked.Add(ref sharedCurrentIndexOffset.Value, _nIncrementValue) - _nIncrementValue;
}
if (currentRange._nToExclusive - currentRange._nFromInclusive > nMyOffset)
{
// we found work
nFromInclusiveLocal = currentRange._nFromInclusive + nMyOffset;
nToExclusiveLocal = unchecked(nFromInclusiveLocal + _nIncrementValue);
// Check for going past end of range, or wrapping
if ((nToExclusiveLocal > currentRange._nToExclusive) || (nToExclusiveLocal < currentRange._nFromInclusive))
{
nToExclusiveLocal = currentRange._nToExclusive;
}
// We will double our unit of increment until it reaches the maximum.
if (_nIncrementValue < _nMaxIncrementValue)
{
_nIncrementValue *= 2;
if (_nIncrementValue > _nMaxIncrementValue)
{
_nIncrementValue = _nMaxIncrementValue;
}
}
return true;
}
else
{
// this index range is completed, mark it so that others can skip it quickly
Interlocked.Exchange(ref _indexRanges[_nCurrentIndexRange]._bRangeFinished, true);
}
}
// move on to the next index range, in circular order.
_nCurrentIndexRange = (_nCurrentIndexRange + 1) % _indexRanges.Length;
numIndexRangesToVisit--;
} while (numIndexRangesToVisit > 0);
// we've visited all index ranges possible => there's no work remaining
nFromInclusiveLocal = 0;
nToExclusiveLocal = 0;
return false;
}
internal bool FindNewWork<TInt>(out TInt fromInclusive, out TInt toExclusive) where TInt : struct, IBinaryInteger<TInt>, IMinMaxValue<TInt>
{
Debug.Assert(typeof(TInt) == typeof(int) || typeof(TInt) == typeof(long));
bool success = FindNewWork(out long fromInclusiveInt64, out long toExclusiveInt64);
Debug.Assert(
fromInclusiveInt64 <= long.CreateTruncating(TInt.MaxValue) && fromInclusiveInt64 >= long.CreateTruncating(TInt.MinValue) &&
toExclusiveInt64 <= long.CreateTruncating(TInt.MaxValue) && toExclusiveInt64 >= long.CreateTruncating(TInt.MinValue));
fromInclusive = TInt.CreateTruncating(fromInclusiveInt64);
toExclusive = TInt.CreateTruncating(toExclusiveInt64);
return success;
}
}
/// <summary>
/// Represents the entire loop operation, keeping track of workers and ranges.
/// </summary>
///
/// The usage pattern is:
/// 1) The Parallel loop entry function (ForWorker) creates an instance of this class
/// 2) Every thread joining to service the parallel loop calls RegisterWorker to grab a
/// RangeWorker struct to wrap the state it will need to find and execute work,
/// and they keep interacting with that struct until the end of the loop
internal sealed class RangeManager
{
internal readonly IndexRange[] _indexRanges;
internal readonly bool _use32BitCurrentIndex;
internal int _nCurrentIndexRangeToAssign;
internal long _nStep;
/// <summary>
/// Initializes a RangeManager with the given loop parameters, and the desired number of outer ranges
/// </summary>
internal RangeManager(long nFromInclusive, long nToExclusive, long nStep, int nNumExpectedWorkers)
{
_nCurrentIndexRangeToAssign = 0;
_nStep = nStep;
// Our signed math breaks down w/ nNumExpectedWorkers == 1. So change it to 2.
if (nNumExpectedWorkers == 1)
nNumExpectedWorkers = 2;
//
// calculate the size of each index range
//
ulong uSpan = (ulong)(nToExclusive - nFromInclusive);
ulong uRangeSize = uSpan / (ulong)nNumExpectedWorkers; // rough estimate first
uRangeSize -= uRangeSize % (ulong)nStep; // snap to multiples of nStep
// otherwise index range transitions will derail us from nStep
if (uRangeSize == 0)
{
uRangeSize = (ulong)nStep;
}
//
// find the actual number of index ranges we will need
//
Debug.Assert((uSpan / uRangeSize) < int.MaxValue);
int nNumRanges = (int)(uSpan / uRangeSize);
if (uSpan % uRangeSize != 0)
{
nNumRanges++;
}
// Convert to signed so the rest of the logic works.
// Should be fine so long as uRangeSize < Int64.MaxValue, which we guaranteed by setting #workers >= 2.
long nRangeSize = (long)uRangeSize;
_use32BitCurrentIndex = IntPtr.Size == 4 && nRangeSize <= int.MaxValue;
// allocate the array of index ranges
_indexRanges = new IndexRange[nNumRanges];
long nCurrentIndex = nFromInclusive;
for (int i = 0; i < nNumRanges; i++)
{
// the fromInclusive of the new index range is always on nCurrentIndex
_indexRanges[i]._nFromInclusive = nCurrentIndex;
_indexRanges[i]._nSharedCurrentIndexOffset = null;
_indexRanges[i]._bRangeFinished = false;
// now increment it to find the toExclusive value for our range
nCurrentIndex = unchecked(nCurrentIndex + nRangeSize);
// detect integer overflow or range overage and snap to nToExclusive
if (nCurrentIndex < unchecked(nCurrentIndex - nRangeSize) ||
nCurrentIndex > nToExclusive)
{
// this should only happen at the last index
Debug.Assert(i == nNumRanges - 1);
nCurrentIndex = nToExclusive;
}
// now that the end point of the new range is calculated, assign it.
_indexRanges[i]._nToExclusive = nCurrentIndex;
}
}
/// <summary>
/// The function that needs to be called by each new worker thread servicing the parallel loop
/// in order to get a RangeWorker struct that wraps the state for finding and executing indices
/// </summary>
internal RangeWorker RegisterNewWorker()
{
Debug.Assert(_indexRanges != null && _indexRanges.Length != 0);
int nInitialRange = (Interlocked.Increment(ref _nCurrentIndexRangeToAssign) - 1) % _indexRanges.Length;
return new RangeWorker(_indexRanges, nInitialRange, _nStep, _use32BitCurrentIndex);
}
}
}
|