File: Concurrent\ConcurrentQueue.cs
Web Access
Project: ..\..\..\src\MSBuildTaskHost\MSBuildTaskHost.csproj (MSBuildTaskHost)
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.
 
using System;
using System.Diagnostics;
using System.Runtime.InteropServices;
using System.Threading;
 
#nullable disable
 
namespace Microsoft.Build.Shared.Concurrent
{
    // The following class is back-ported from .NET 4.X CoreFX library because
    // MSBuildTaskHost requires 3.5 .NET Framework. Only important methods (Enqueue, TryDequeue) are kept.
    internal class ConcurrentQueue<T>
    {
        // This implementation provides an unbounded, multi-producer multi-consumer queue
        // that supports the standard Enqueue/TryDequeue operations, as well as support for
        // snapshot enumeration (GetEnumerator, ToArray, CopyTo), peeking, and Count/IsEmpty.
        // It is composed of a linked list of bounded ring buffers, each of which has a head
        // and a tail index, isolated from each other to minimize false sharing.  As long as
        // the number of elements in the queue remains less than the size of the current
        // buffer (Segment), no additional allocations are required for enqueued items.  When
        // the number of items exceeds the size of the current segment, the current segment is
        // "frozen" to prevent further enqueues, and a new segment is linked from it and set
        // as the new tail segment for subsequent enqueues.  As old segments are consumed by
        // dequeues, the head reference is updated to point to the segment that dequeuers should
        // try next.  To support snapshot enumeration, segments also support the notion of
        // preserving for observation, whereby they avoid overwriting state as part of dequeues.
        // Any operation that requires a snapshot results in all current segments being
        // both frozen for enqueues and preserved for observation: any new enqueues will go
        // to new segments, and dequeuers will consume from the existing segments but without
        // overwriting the existing data.
 
        /// <summary>Initial length of the segments used in the queue.</summary>
        private const int InitialSegmentLength = 32;
        /// <summary>
        /// Maximum length of the segments used in the queue.  This is a somewhat arbitrary limit:
        /// larger means that as long as we don't exceed the size, we avoid allocating more segments,
        /// but if we do exceed it, then the segment becomes garbage.
        /// </summary>
        private const int MaxSegmentLength = 1024 * 1024;
 
        /// <summary>
        /// Lock used to protect cross-segment operations, including any updates to <see cref="_tail"/> or <see cref="_head"/>
        /// and any operations that need to get a consistent view of them.
        /// </summary>
        private object _crossSegmentLock;
        /// <summary>The current tail segment.</summary>
        private volatile Segment _tail;
        /// <summary>The current head segment.</summary>
        private volatile Segment _head;
 
        internal static object VolatileReader(ref object o) => Thread.VolatileRead(ref o);
        /// <summary>
        /// Initializes a new instance of the <see cref="ConcurrentQueue{T}"/> class.
        /// </summary>
        public ConcurrentQueue()
        {
            _crossSegmentLock = new object();
            _tail = _head = new Segment(InitialSegmentLength);
        }
 
        /// <summary>Adds an object to the end of the <see cref="ConcurrentQueue{T}"/>.</summary>
        /// <param name="item">
        /// The object to add to the end of the <see cref="ConcurrentQueue{T}"/>.
        /// The value can be a null reference (Nothing in Visual Basic) for reference types.
        /// </param>
        public void Enqueue(T item)
        {
            // Try to enqueue to the current tail.
            if (!_tail.TryEnqueue(item))
            {
                // If we're unable to, we need to take a slow path that will
                // try to add a new tail segment.
                EnqueueSlow(item);
            }
        }
 
        /// <summary>Adds to the end of the queue, adding a new segment if necessary.</summary>
        private void EnqueueSlow(T item)
        {
            while (true)
            {
                Segment tail = _tail;
 
                // Try to append to the existing tail.
                if (tail.TryEnqueue(item))
                {
                    return;
                }
 
                // If we were unsuccessful, take the lock so that we can compare and manipulate
                // the tail.  Assuming another enqueuer hasn't already added a new segment,
                // do so, then loop around to try enqueueing again.
                lock (_crossSegmentLock)
                {
                    if (tail == _tail)
                    {
                        // Make sure no one else can enqueue to this segment.
                        tail.EnsureFrozenForEnqueues();
 
                        // We determine the new segment's length based on the old length.
                        // In general, we double the size of the segment, to make it less likely
                        // that we'll need to grow again.  However, if the tail segment is marked
                        // as preserved for observation, something caused us to avoid reusing this
                        // segment, and if that happens a lot and we grow, we'll end up allocating
                        // lots of wasted space.  As such, in such situations we reset back to the
                        // initial segment length; if these observations are happening frequently,
                        // this will help to avoid wasted memory, and if they're not, we'll
                        // relatively quickly grow again to a larger size.
                        int nextSize = tail._preservedForObservation != 0 ? InitialSegmentLength : Math.Min(tail.Capacity * 2, MaxSegmentLength);
                        var newTail = new Segment(nextSize);
 
                        // Hook up the new tail.
                        tail._nextSegment = newTail;
                        _tail = newTail;
                    }
                }
            }
        }
 
        /// <summary>
        /// Attempts to remove and return the object at the beginning of the <see
        /// cref="ConcurrentQueue{T}"/>.
        /// </summary>
        /// <param name="result">
        /// When this method returns, if the operation was successful, <paramref name="result"/> contains the
        /// object removed. If no object was available to be removed, the value is unspecified.
        /// </param>
        /// <returns>
        /// true if an element was removed and returned from the beginning of the
        /// <see cref="ConcurrentQueue{T}"/> successfully; otherwise, false.
        /// </returns>
        public bool TryDequeue(out T result) =>
            _head.TryDequeue(out result) || // fast-path that operates just on the head segment
            TryDequeueSlow(out result); // slow path that needs to fix up segments
 
        /// <summary>Tries to dequeue an item, removing empty segments as needed.</summary>
        private bool TryDequeueSlow(out T item)
        {
            while (true)
            {
                // Get the current head
                Segment head = _head;
 
                // Try to take.  If we're successful, we're done.
                if (head.TryDequeue(out item))
                {
                    return true;
                }
 
                // Check to see whether this segment is the last. If it is, we can consider
                // this to be a moment-in-time empty condition (even though between the TryDequeue
                // check and this check, another item could have arrived).
                if (head._nextSegment == null)
                {
                    item = default(T);
                    return false;
                }
 
                // At this point we know that head.Next != null, which means
                // this segment has been frozen for additional enqueues. But between
                // the time that we ran TryDequeue and checked for a next segment,
                // another item could have been added.  Try to dequeue one more time
                // to confirm that the segment is indeed empty.
                Debug.Assert(head._frozenForEnqueues);
                if (head.TryDequeue(out item))
                {
                    return true;
                }
 
                // This segment is frozen (nothing more can be added) and empty (nothing is in it).
                // Update head to point to the next segment in the list, assuming no one's beat us to it.
                lock (_crossSegmentLock)
                {
                    if (head == _head)
                    {
                        _head = head._nextSegment;
                    }
                }
            }
        }
 
        /// <summary>
        /// Attempts to return an object from the beginning of the <see cref="ConcurrentQueue{T}"/>
        /// without removing it.
        /// </summary>
        /// <param name="result">
        /// When this method returns, <paramref name="result"/> contains an object from
        /// the beginning of the <see cref="Concurrent.ConcurrentQueue{T}"/> or default(T)
        /// if the operation failed.
        /// </param>
        /// <returns>true if and object was returned successfully; otherwise, false.</returns>
        /// <remarks>
        /// For determining whether the collection contains any items, use of the <see cref="IsEmpty"/>
        /// property is recommended rather than peeking.
        /// </remarks>
        public bool TryPeek(out T result) => TryPeek(out result, resultUsed: true);
 
        /// <summary>Attempts to retrieve the value for the first element in the queue.</summary>
        /// <param name="result">The value of the first element, if found.</param>
        /// <param name="resultUsed">true if the result is neede; otherwise false if only the true/false outcome is needed.</param>
        /// <returns>true if an element was found; otherwise, false.</returns>
        private bool TryPeek(out T result, bool resultUsed)
        {
            // Starting with the head segment, look through all of the segments
            // for the first one we can find that's not empty.
            Segment s = _head;
            while (true)
            {
                // Grab the next segment from this one, before we peek.
                // This is to be able to see whether the value has changed
                // during the peek operation.
                Thread.MemoryBarrier();
                Segment next = s._nextSegment;
 
                // Peek at the segment.  If we find an element, we're done.
                if (s.TryPeek(out result, resultUsed))
                {
                    return true;
                }
 
                // The current segment was empty at the moment we checked.
 
                if (next != null)
                {
                    // If prior to the peek there was already a next segment, then
                    // during the peek no additional items could have been enqueued
                    // to it and we can just move on to check the next segment.
                    Debug.Assert(next == s._nextSegment);
                    s = next;
                }
                else
                {
                    Thread.MemoryBarrier();
                    if (s._nextSegment == null)
                    {
                        // The next segment is null.  Nothing more to peek at.
                        break;
                    }
                }
 
                // The next segment was null before we peeked but non-null after.
                // That means either when we peeked the first segment had
                // already been frozen but the new segment not yet added,
                // or that the first segment was empty and between the time
                // that we peeked and then checked _nextSegment, so many items
                // were enqueued that we filled the first segment and went
                // into the next.  Since we need to peek in order, we simply
                // loop around again to peek on the same segment.  The next
                // time around on this segment we'll then either successfully
                // peek or we'll find that next was non-null before peeking,
                // and we'll traverse to that segment.
            }
 
            result = default(T);
            return false;
        }
 
        /// <summary>
        /// Provides a multi-producer, multi-consumer thread-safe bounded segment.  When the queue is full,
        /// enqueues fail and return false.  When the queue is empty, dequeues fail and return null.
        /// These segments are linked together to form the unbounded <see cref="ConcurrentQueue{T}"/>.
        /// </summary>
        [DebuggerDisplay("Capacity = {Capacity}")]
        private sealed class Segment
        {
            // Segment design is inspired by the algorithm outlined at:
            // http://www.1024cores.net/home/lock-free-algorithms/queues/bounded-mpmc-queue
 
            /// <summary>The array of items in this queue.  Each slot contains the item in that slot and its "sequence number".</summary>
            internal readonly Slot[] _slots;
            /// <summary>Mask for quickly accessing a position within the queue's array.</summary>
            internal readonly int _slotsMask;
            /// <summary>The head and tail positions, with padding to help avoid false sharing contention.</summary>
            /// <remarks>Dequeuing happens from the head, enqueuing happens at the tail.</remarks>
            internal PaddedHeadAndTail _headAndTail; // mutable struct: do not make this readonly
 
            /// <summary>Indicates whether the segment has been marked such that dequeues don't overwrite the removed data.</summary>
            internal byte _preservedForObservation;
            /// <summary>Indicates whether the segment has been marked such that no additional items may be enqueued.</summary>
            internal bool _frozenForEnqueues;
            /// <summary>The segment following this one in the queue, or null if this segment is the last in the queue.</summary>
            internal Segment _nextSegment;
 
            /// <summary>Creates the segment.</summary>
            /// <param name="boundedLength">
            /// The maximum number of elements the segment can contain.  Must be a power of 2.
            /// </param>
            public Segment(int boundedLength)
            {
                // Validate the length
                Debug.Assert(boundedLength >= 2, $"Must be >= 2, got {boundedLength}");
                Debug.Assert((boundedLength & (boundedLength - 1)) == 0, $"Must be a power of 2, got {boundedLength}");
 
                // Initialize the slots and the mask.  The mask is used as a way of quickly doing "% _slots.Length",
                // instead letting us do "& _slotsMask".
                _slots = new Slot[boundedLength];
                _slotsMask = boundedLength - 1;
 
                // Initialize the sequence number for each slot.  The sequence number provides a ticket that
                // allows dequeuers to know whether they can dequeue and enqueuers to know whether they can
                // enqueue.  An enqueuer at position N can enqueue when the sequence number is N, and a dequeuer
                // for position N can dequeue when the sequence number is N + 1.  When an enqueuer is done writing
                // at position N, it sets the sequence number to N + 1 so that a dequeuer will be able to dequeue,
                // and when a dequeuer is done dequeueing at position N, it sets the sequence number to N + _slots.Length,
                // so that when an enqueuer loops around the slots, it'll find that the sequence number at
                // position N is N.  This also means that when an enqueuer finds that at position N the sequence
                // number is < N, there is still a value in that slot, i.e. the segment is full, and when a
                // dequeuer finds that the value in a slot is < N + 1, there is nothing currently available to
                // dequeue. (It is possible for multiple enqueuers to enqueue concurrently, writing into
                // subsequent slots, and to have the first enqueuer take longer, so that the slots for 1, 2, 3, etc.
                // may have values, but the 0th slot may still be being filled... in that case, TryDequeue will
                // return false.)
                for (int i = 0; i < _slots.Length; i++)
                {
                    _slots[i].SequenceNumber = i;
                }
            }
 
            /// <summary>Gets the number of elements this segment can store.</summary>
            internal int Capacity => _slots.Length;
 
            /// <summary>Gets the "freeze offset" for this segment.</summary>
            internal int FreezeOffset => _slots.Length * 2;
 
            /// <summary>
            /// Ensures that the segment will not accept any subsequent enqueues that aren't already underway.
            /// </summary>
            /// <remarks>
            /// When we mark a segment as being frozen for additional enqueues,
            /// we set the <see cref="_frozenForEnqueues"/> bool, but that's mostly
            /// as a small helper to avoid marking it twice.  The real marking comes
            /// by modifying the Tail for the segment, increasing it by this
            /// <see cref="FreezeOffset"/>.  This effectively knocks it off the
            /// sequence expected by future enqueuers, such that any additional enqueuer
            /// will be unable to enqueue due to it not lining up with the expected
            /// sequence numbers.  This value is chosen specially so that Tail will grow
            /// to a value that maps to the same slot but that won't be confused with
            /// any other enqueue/dequeue sequence number.
            /// </remarks>
            internal void EnsureFrozenForEnqueues() // must only be called while queue's segment lock is held
            {
                if (!_frozenForEnqueues) // flag used to ensure we don't increase the Tail more than once if frozen more than once
                {
                    _frozenForEnqueues = true;
 
                    // Increase the tail by FreezeOffset, spinning until we're successful in doing so.
                    while (true)
                    {
                        int tail = Thread.VolatileRead(ref _headAndTail.Tail);
                        if (Interlocked.CompareExchange(ref _headAndTail.Tail, tail + FreezeOffset, tail) == tail)
                        {
                            break;
                        }
                        Thread.SpinWait(1);
                    }
                }
            }
 
            /// <summary>Tries to dequeue an element from the queue.</summary>
            public bool TryDequeue(out T item)
            {
                // Loop in case of contention...
                while (true)
                {
                    // Get the head at which to try to dequeue.
                    int currentHead = Thread.VolatileRead(ref _headAndTail.Head);
                    int slotsIndex = currentHead & _slotsMask;
 
                    // Read the sequence number for the head position.
                    int sequenceNumber = Thread.VolatileRead(ref _slots[slotsIndex].SequenceNumber);
 
                    // We can dequeue from this slot if it's been filled by an enqueuer, which
                    // would have left the sequence number at pos+1.
                    int diff = sequenceNumber - (currentHead + 1);
                    if (diff == 0)
                    {
                        // We may be racing with other dequeuers.  Try to reserve the slot by incrementing
                        // the head.  Once we've done that, no one else will be able to read from this slot,
                        // and no enqueuer will be able to read from this slot until we've written the new
                        // sequence number. WARNING: The next few lines are not reliable on a runtime that
                        // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
                        // but before the Volatile.Write, enqueuers trying to enqueue into this slot would
                        // spin indefinitely.  If this implementation is ever used on such a platform, this
                        // if block should be wrapped in a finally / prepared region.
                        if (Interlocked.CompareExchange(ref _headAndTail.Head, currentHead + 1, currentHead) == currentHead)
                        {
                            // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
                            // trying to dequeue from this slot will end up spinning until we do the subsequent Write.
                            item = _slots[slotsIndex].Item;
                            if (Thread.VolatileRead(ref _preservedForObservation) == 0)
                            {
                                // If we're preserving, though, we don't zero out the slot, as we need it for
                                // enumerations, peeking, ToArray, etc.  And we don't update the sequence number,
                                // so that an enqueuer will see it as full and be forced to move to a new segment.
                                _slots[slotsIndex].Item = default(T);
                                Thread.VolatileWrite(ref _slots[slotsIndex].SequenceNumber, currentHead + _slots.Length);
                            }
                            return true;
                        }
                    }
                    else if (diff < 0)
                    {
                        // The sequence number was less than what we needed, which means this slot doesn't
                        // yet contain a value we can dequeue, i.e. the segment is empty.  Technically it's
                        // possible that multiple enqueuers could have written concurrently, with those
                        // getting later slots actually finishing first, so there could be elements after
                        // this one that are available, but we need to dequeue in order.  So before declaring
                        // failure and that the segment is empty, we check the tail to see if we're actually
                        // empty or if we're just waiting for items in flight or after this one to become available.
                        bool frozen = _frozenForEnqueues;
                        int currentTail = Thread.VolatileRead(ref _headAndTail.Tail);
                        if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
                        {
                            item = default(T);
                            return false;
                        }
 
                        // It's possible it could have become frozen after we checked _frozenForEnqueues
                        // and before reading the tail.  That's ok: in that rare race condition, we just
                        // loop around again.
                    }
 
                    // Lost a race. Spin a bit, then try again.
                    Thread.SpinWait(1);
                }
            }
 
            /// <summary>Tries to peek at an element from the queue, without removing it.</summary>
            public bool TryPeek(out T result, bool resultUsed)
            {
                if (resultUsed)
                {
                    // In order to ensure we don't get a torn read on the value, we mark the segment
                    // as preserving for observation.  Additional items can still be enqueued to this
                    // segment, but no space will be freed during dequeues, such that the segment will
                    // no longer be reusable.
                    _preservedForObservation = 1;
                    Thread.MemoryBarrier();
                }
 
                // Loop in case of contention...
                while (true)
                {
                    // Get the head at which to try to peek.
                    int currentHead = Thread.VolatileRead(ref _headAndTail.Head);
                    int slotsIndex = currentHead & _slotsMask;
 
                    // Read the sequence number for the head position.
                    int sequenceNumber = Thread.VolatileRead(ref _slots[slotsIndex].SequenceNumber);
 
                    // We can peek from this slot if it's been filled by an enqueuer, which
                    // would have left the sequence number at pos+1.
                    int diff = sequenceNumber - (currentHead + 1);
                    if (diff == 0)
                    {
                        result = resultUsed ? _slots[slotsIndex].Item : default(T);
                        return true;
                    }
                    else if (diff < 0)
                    {
                        // The sequence number was less than what we needed, which means this slot doesn't
                        // yet contain a value we can peek, i.e. the segment is empty.  Technically it's
                        // possible that multiple enqueuers could have written concurrently, with those
                        // getting later slots actually finishing first, so there could be elements after
                        // this one that are available, but we need to peek in order.  So before declaring
                        // failure and that the segment is empty, we check the tail to see if we're actually
                        // empty or if we're just waiting for items in flight or after this one to become available.
                        bool frozen = _frozenForEnqueues;
                        int currentTail = Thread.VolatileRead(ref _headAndTail.Tail);
                        if (currentTail - currentHead <= 0 || (frozen && (currentTail - FreezeOffset - currentHead <= 0)))
                        {
                            result = default(T);
                            return false;
                        }
 
                        // It's possible it could have become frozen after we checked _frozenForEnqueues
                        // and before reading the tail.  That's ok: in that rare race condition, we just
                        // loop around again.
                    }
 
                    // Lost a race. Spin a bit, then try again.
                    Thread.SpinWait(1);
                }
            }
 
            /// <summary>
            /// Attempts to enqueue the item.  If successful, the item will be stored
            /// in the queue and true will be returned; otherwise, the item won't be stored, and false
            /// will be returned.
            /// </summary>
            public bool TryEnqueue(T item)
            {
                // Loop in case of contention...
                while (true)
                {
                    // Get the tail at which to try to return.
                    int currentTail = Thread.VolatileRead(ref _headAndTail.Tail);
                    int slotsIndex = currentTail & _slotsMask;
 
                    // Read the sequence number for the tail position.
                    int sequenceNumber = Thread.VolatileRead(ref _slots[slotsIndex].SequenceNumber);
 
                    // The slot is empty and ready for us to enqueue into it if its sequence
                    // number matches the slot.
                    int diff = sequenceNumber - currentTail;
                    if (diff == 0)
                    {
                        // We may be racing with other enqueuers.  Try to reserve the slot by incrementing
                        // the tail.  Once we've done that, no one else will be able to write to this slot,
                        // and no dequeuer will be able to read from this slot until we've written the new
                        // sequence number. WARNING: The next few lines are not reliable on a runtime that
                        // supports thread aborts. If a thread abort were to sneak in after the CompareExchange
                        // but before the Volatile.Write, other threads will spin trying to access this slot.
                        // If this implementation is ever used on such a platform, this if block should be
                        // wrapped in a finally / prepared region.
                        if (Interlocked.CompareExchange(ref _headAndTail.Tail, currentTail + 1, currentTail) == currentTail)
                        {
                            // Successfully reserved the slot.  Note that after the above CompareExchange, other threads
                            // trying to return will end up spinning until we do the subsequent Write.
                            _slots[slotsIndex].Item = item;
                            Thread.VolatileWrite(ref _slots[slotsIndex].SequenceNumber, currentTail + 1);
                            return true;
                        }
                    }
                    else if (diff < 0)
                    {
                        // The sequence number was less than what we needed, which means this slot still
                        // contains a value, i.e. the segment is full.  Technically it's possible that multiple
                        // dequeuers could have read concurrently, with those getting later slots actually
                        // finishing first, so there could be spaces after this one that are available, but
                        // we need to enqueue in order.
                        return false;
                    }
 
                    // Lost a race. Spin a bit, then try again.
                    Thread.SpinWait(1);
                }
            }
 
            /// <summary>Represents a slot in the queue.</summary>
            [StructLayout(LayoutKind.Auto)]
            [DebuggerDisplay("Item = {Item}, SequenceNumber = {SequenceNumber}")]
            internal struct Slot
            {
                /// <summary>The item.</summary>
                public T Item;
                /// <summary>The sequence number for this slot, used to synchronize between enqueuers and dequeuers.</summary>
                public int SequenceNumber;
            }
        }
    }
 
    /// <summary>Padded head and tail indices, to avoid false sharing between producers and consumers.</summary>
    [DebuggerDisplay("Head = {Head}, Tail = {Tail}")]
    [StructLayout(LayoutKind.Explicit, Size = 192)] // padding before/between/after fields based on typical cache line size of 64
    internal struct PaddedHeadAndTail
    {
        [FieldOffset(64)]
        public int Head;
 
        [FieldOffset(128)]
        public int Tail;
    }
}